atomic memory optimize for multibatch

pull/869/head
TangQunzhang 4 years ago
parent 56b950a09d
commit 2edbcee130

@ -212,6 +212,7 @@ class BlockMemAssigner : public MemAssigner {
void SetOpMemOffset(bool is_zero_copy);
std::string GetMaxBatchLabel() const { return max_batch_label_; }
protected:
///
/// @ingroup domi

@ -906,8 +906,9 @@ Status GraphMemoryAssigner::ReAssignVirtualNodesMemory(map<string, vector<NodePt
}
Status GraphMemoryAssigner::ReAssignAtomicMemory(bool is_loop_graph) {
map<NodePtr, vector<NodePtr>> normal_atomic_and_clean_nodes_map;
vector<NodePtr> connecting_output_atomic_nodes;
// key:dynamic batch, batch name
map<string, map<NodePtr, vector<NodePtr>>> normal_atomic_and_clean_nodes_map;
map<string, vector<NodePtr>> connecting_output_atomic_nodes;
Status status = FilterAtomicNodesForMemoryAssign(normal_atomic_and_clean_nodes_map, connecting_output_atomic_nodes);
if (status != SUCCESS) {
GELOGE(status, "Failed to filter atomic nodes for memory assignment.");
@ -917,45 +918,60 @@ Status GraphMemoryAssigner::ReAssignAtomicMemory(bool is_loop_graph) {
auto mem_iter = memory_offset_.find(RT_MEMORY_HBM);
if (mem_iter == memory_offset_.end()) {
std::string error = "Memory offset does not have memory type" + FmtToStr(RT_MEMORY_HBM);
GE_ERRORLOG_AND_ERRORMSG(FAILED, error.c_str());
GE_ERRORLOG_AND_ERRORMSG(FAILED, error.c_str());
return FAILED;
}
for (auto &iter : normal_atomic_and_clean_nodes_map) {
int64_t atomic_mem_start = static_cast<int64_t>(mem_iter->second.mem_offset_);
GELOGD("Begin to reAssign atomic memory, atomic address memory start = %ld", atomic_mem_start);
int64_t batch_atomic_mem_start = static_cast<int64_t>(mem_iter->second.mem_offset_);
int64_t batch_max_mem_offset = batch_atomic_mem_start;
for (auto &iter_batch : normal_atomic_and_clean_nodes_map) {
mem_iter->second.mem_offset_ = batch_atomic_mem_start;
for (auto &iter : iter_batch.second) {
int64_t atomic_mem_start = static_cast<int64_t>(mem_iter->second.mem_offset_);
GELOGD("Begin to reAssign atomic memory, atomic address memory start = %ld", atomic_mem_start);
for (auto &atomic_node : iter.second) {
vector<int64_t> mem_offset_end;
status = AssignAtomicOutputAndWorkspaceMemory(atomic_node, mem_offset_end);
if (status != SUCCESS) {
GELOGE(status, "Assign atomic output and workspace memory failed, node name is %s.",
atomic_node->GetName().c_str());
return status;
for (auto &atomic_node : iter.second) {
vector<int64_t> mem_offset_end;
status = AssignAtomicOutputAndWorkspaceMemory(atomic_node, mem_offset_end);
if (status != SUCCESS) {
GELOGE(status, "Assign atomic output and workspace memory failed, node name is %s.",
atomic_node->GetName().c_str());
return status;
}
}
}
int64_t atomic_mem_size = static_cast<int64_t>(mem_iter->second.mem_offset_) - atomic_mem_start;
if (atomic_mem_size != 0) {
GE_CHK_STATUS_RET(SetAtomicCleanAttr(iter.first, {atomic_mem_start}, {atomic_mem_size}),
"Failed to set attr for atomic addr clean node %s.", iter.first->GetName().c_str());
int64_t atomic_mem_size = static_cast<int64_t>(mem_iter->second.mem_offset_) - atomic_mem_start;
GE_CHECK_NOTNULL(mem_assigner_);
GE_CHECK_NOTNULL(mem_assigner_->GetPriorityAssinger());
if ((atomic_mem_size != 0) && (iter_batch.first == mem_assigner_->GetPriorityAssinger()->GetMaxBatchLabel())) {
GE_CHK_STATUS_RET(SetAtomicCleanAttr(iter.first, {atomic_mem_start}, {atomic_mem_size}),
"Failed to set attr for atomic addr clean node %s.", iter.first->GetName().c_str());
}
}
batch_max_mem_offset = std::max(batch_max_mem_offset, static_cast<int64_t>(mem_iter->second.mem_offset_));
}
if (AssignConnectNetOutputAtomicMemory(connecting_output_atomic_nodes) != SUCCESS) {
GELOGE(FAILED, "Failed to assign memory of nodes that connect to netoutput.");
return FAILED;
mem_iter->second.mem_offset_ = static_cast<size_t>(batch_max_mem_offset);
batch_atomic_mem_start = batch_max_mem_offset;
for (auto &iter_batch : connecting_output_atomic_nodes) {
mem_iter->second.mem_offset_ = batch_atomic_mem_start;
if (AssignConnectNetOutputAtomicMemory(iter_batch.second) != SUCCESS) {
GELOGE(FAILED, "Failed to assign memory of nodes that connect to netoutput.");
return FAILED;
}
batch_max_mem_offset = std::max(batch_max_mem_offset, static_cast<int64_t>(mem_iter->second.mem_offset_));
}
mem_iter->second.mem_offset_ = static_cast<size_t>(batch_max_mem_offset);
return SUCCESS;
}
Status GraphMemoryAssigner::FilterAtomicNodesForMemoryAssign(map<NodePtr, vector<NodePtr>> &normal_atomic_nodes_map,
vector<NodePtr> &connecting_output_atomic_nodes) {
Status GraphMemoryAssigner::FilterAtomicNodesForMemoryAssign(
map<string, map<NodePtr, vector<NodePtr>>> &normal_atomic_nodes_map,
map<string, vector<NodePtr>> &connecting_output_atomic_nodes) {
GE_CHECK_NOTNULL(compute_graph_);
for (const auto &node : compute_graph_->GetAllNodes()) {
if (node->GetType() == ATOMICADDRCLEAN) {
vector<NodePtr> tmp_normal_atomic_nodes;
map<string, vector<NodePtr>> tmp_normal_atomic_nodes;
const auto &out_control_anchor = node->GetOutControlAnchor();
GE_CHECK_NOTNULL(out_control_anchor);
for (const auto &peer_in_control_anchor : out_control_anchor->GetPeerInControlAnchors()) {
@ -977,23 +993,28 @@ Status GraphMemoryAssigner::FilterAtomicNodesForMemoryAssign(map<NodePtr, vector
return ge::PARAM_INVALID;
}
std::string batch_label;
(void)ge::AttrUtils::GetStr(peer_in_node_desc, ATTR_NAME_BATCH_LABEL, batch_label);
vector<int> is_connecting_output;
// If GetBool fail, attr is_connecting_output is an empty vector.
(void) ge::AttrUtils::GetListInt(peer_in_node_desc, ATTR_NAME_NODE_CONNECT_OUTPUT, is_connecting_output);
if (is_connecting_output.empty()) {
tmp_normal_atomic_nodes.emplace_back(peer_in_node);
tmp_normal_atomic_nodes[batch_label].emplace_back(peer_in_node);
continue;
}
connecting_output_atomic_nodes.emplace_back(peer_in_node);
tmp_normal_atomic_nodes.clear();
connecting_output_atomic_nodes[batch_label].emplace_back(peer_in_node);
tmp_normal_atomic_nodes[batch_label].clear();
break;
}
}
}
}
if (!tmp_normal_atomic_nodes.empty()) {
normal_atomic_nodes_map[node] = tmp_normal_atomic_nodes;
for (auto &it_atomic_node : tmp_normal_atomic_nodes) {
if (!it_atomic_node.second.empty()) {
normal_atomic_nodes_map[it_atomic_node.first][node] = it_atomic_node.second;
}
}
}
}
@ -1206,9 +1227,11 @@ Status GraphMemoryAssigner::AssignAtomicOutputMemory(const ge::NodePtr &node, ve
}
output_list[output_index] = iter->second.mem_offset_;
GELOGI("[IMAS]Atomic output : Set %s name[%s] output[%ld] offset to [%zu] stream_id[%ld] size[%ld] real_size[%ld].",
compute_graph_->GetName().c_str(), op_desc->GetName().c_str(), output_index,
iter->second.mem_offset_, op_desc->GetStreamId(), size, size);
std::string batch_label;
(void)ge::AttrUtils::GetStr(op_desc, ATTR_NAME_BATCH_LABEL, batch_label);
GELOGI("[IMAS]Atomic output : Set %s name[%s] output[%ld] offset to [%zu] stream_id[%ld] size[%ld] real_size[%ld]"
" batch[%s].", compute_graph_->GetName().c_str(), op_desc->GetName().c_str(), output_index,
iter->second.mem_offset_, op_desc->GetStreamId(), size, size, batch_label.c_str());
iter->second.mem_offset_ += size;
AlignMemOffset(MEM_ALIGN_SIZE, RT_MEMORY_HBM);
@ -1281,11 +1304,14 @@ Status GraphMemoryAssigner::AssignOrdinaryAtomicWorkspaceMemory(const ge::OpDesc
}
workspace_vector[workspace_index] = mem_type_iter->second.mem_offset_;
std::string batch_label;
(void)ge::AttrUtils::GetStr(op_desc, ATTR_NAME_BATCH_LABEL, batch_label);
GELOGI(
"[IMAS]Atomic ordinary workspace : Set %s name[%s] workspace[%lu] offset to [%zu] stream_id[%ld] "
"size[%ld] real_size[%ld].",
"size[%ld] real_size[%ld] batch[%s].",
compute_graph_->GetName().c_str(), op_desc->GetName().c_str(), workspace_index,
mem_type_iter->second.mem_offset_, op_desc->GetStreamId(), workspace_size, workspace_size);
mem_type_iter->second.mem_offset_, op_desc->GetStreamId(), workspace_size, workspace_size,
batch_label.c_str());
mem_type_iter->second.mem_offset_ += workspace_size;
mem_offset_end.emplace_back(mem_type_iter->second.mem_offset_);
@ -1319,10 +1345,13 @@ Status GraphMemoryAssigner::AssignFusionAtomicWorkspaceMemory(const ge::OpDescPt
auto workspace_size = info_iter.second;
size_t workspace_offset = mem_type_iter->second.mem_offset_;
std::string batch_label;
(void)ge::AttrUtils::GetStr(op_desc, ATTR_NAME_BATCH_LABEL, batch_label);
GELOGI(
"[IMAS]Atomic fusion workspace : Set %s name[%s] workspace[%lu] offset to [%zu] stream_id[%ld] size[%ld] "
"real_size[%ld].", compute_graph_->GetName().c_str(), op_desc->GetName().c_str(), workspace_index,
mem_type_iter->second.mem_offset_, op_desc->GetStreamId(), workspace_size, workspace_size);
"real_size[%ld] batch[%s].", compute_graph_->GetName().c_str(), op_desc->GetName().c_str(), workspace_index,
mem_type_iter->second.mem_offset_, op_desc->GetStreamId(), workspace_size, workspace_size,
batch_label.c_str());
mem_type_iter->second.mem_offset_ += workspace_size;
mem_offset_end.emplace_back(mem_type_iter->second.mem_offset_);

@ -136,9 +136,9 @@ class GraphMemoryAssigner {
int64_t &output_mem_size, int64_t &batch_dim_num, int64_t &out_size);
ge::Status ReAssignAtomicMemory(bool is_loop_graph);
ge::Status FilterAtomicNodesForMemoryAssign(std::map<NodePtr, vector<NodePtr>> &normal_atomic_nodes_map,
std::vector<NodePtr> &connecting_output_atomic_nodes);
ge::Status FilterAtomicNodesForMemoryAssign(map<string, map<NodePtr, vector<NodePtr>>> &normal_atomic_nodes_map,
map<string, vector<NodePtr>> &connecting_output_atomic_nodes);
ge::Status AssignContinuousInputMemory(const ge::NodePtr &node, int64_t &continuous_mem_start,
int64_t &continuous_mem_size, int64_t memory_type);

Loading…
Cancel
Save