!989 broadcast in train graph related

From: @wangxiaotian22
Reviewed-by: @xchu42,@wqtshg
Signed-off-by:
pull/989/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 5cdec9853d

@ -551,11 +551,31 @@ void GetMaxBatchAllMemorySize(std::map<std::string, vector<int64_t>> &batch_all_
}
}
void BlockMemAssigner::MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node) {
auto node_op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, return);
// if input size just one and from variable, no need to reassign continuous memory
bool is_input_continuous = false;
(void)ge::AttrUtils::GetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT, is_input_continuous);
if (is_input_continuous && (node_op_desc->GetInputsSize() == 1)) {
auto peer_out_anchor = node->GetInDataAnchor(0)->GetPeerOutAnchor();
GE_IF_BOOL_EXEC(peer_out_anchor == nullptr, return);
auto in_node = peer_out_anchor->GetOwnerNode();
GE_IF_BOOL_EXEC(in_node == nullptr, return);
if (in_node->GetType() == VARIABLE || in_node->GetType() == CONSTANT) {
GELOGI("node only one input and from variable, set continuous alloced. node_name:%s", node->GetName().c_str());
(void)ge::AttrUtils::SetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT_ALLOC, true);
}
}
}
void BlockMemAssigner::GetOutAndWorkSpaceMem(vector<int64_t> &all_memory_size) {
vector<int64_t> temp;
std::map<std::string, vector<int64_t>> batch_all_memory_size;
std::map<std::string, int64_t> batch_total_size;
for (const NodePtr &n : compute_graph_->GetAllNodes()) {
MarkContinuousAllocedForOneInputFromVariable(n);
auto node_op_desc = n->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue);
@ -1061,18 +1081,73 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,
return block;
}
MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
const bool is_op_reuse_mem) {
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return nullptr, "input node is null.");
bool IsOutputIndexRef(const OpDescPtr &op_desc, uint32_t index) {
auto output_tensor = op_desc->GetOutputDescPtr(index);
bool dst_reuse_input = false;
(void)ge::TensorUtils::GetReuseInput(*output_tensor, dst_reuse_input);
if (dst_reuse_input) {
return true;
}
bool is_ref = false;
(void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_REFERENCE, is_ref);
if (is_ref) {
string output_name = op_desc->GetOutputNameByIndex(index);
for (const auto &input_name : op_desc->GetAllInputNames()) {
if (output_name == input_name) {
return true;;
}
}
}
return false;
}
void BlockMemAssigner::ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef,
const NodePtr &n) {
const auto node_op_desc = n->GetOpDesc();
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
if (!IsOutputIndexRef(node_op_desc, index)) {
isAllOutputRef = false;
break;
} else {
zero_memory_list_.emplace_back(n, kOutput, index);
isOutputHasRef = true;
}
}
}
Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
const bool is_op_reuse_mem) {
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return INTERNAL_ERROR, "input node is null.");
auto node_op_desc = n->GetOpDesc();
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return nullptr, "node_op_desc is null.");
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return INTERNAL_ERROR, "node_op_desc is null.");
// continuous output support ref only when all output ref input
bool isAllOutputRef = true;
bool isOutputHasRef = false;
ContinuousOutRefCheck(isAllOutputRef, isOutputHasRef, n);
if (isAllOutputRef) {
GELOGI("continuous output node ref all input, skip continuous alloc, node_name:%s", n->GetName().c_str());
return SUCCESS;
}
if (!isAllOutputRef && isOutputHasRef) {
GELOGE(INTERNAL_ERROR, "continuous output node ref part input, not support this situation, node_name:%s",
n->GetName().c_str());
return INTERNAL_ERROR;
}
MemoryBlock *block = nullptr;
int64_t total_size = 0;
int64_t memory_type = RT_MEMORY_HBM;
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
if (output_op_desc == nullptr) {
return nullptr;
GELOGE(INTERNAL_ERROR, "Get output desc failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
}
if (CheckIsZeroMemNodeType(n->GetType())) {
@ -1082,8 +1157,8 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
int64_t size = 0;
if (ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS) {
GELOGI("Get size failed");
return nullptr;
GELOGE(INTERNAL_ERROR, "Get size failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
}
size_t align_size = static_cast<size_t>(size);
AlignMemOffset(align_size);
@ -1106,7 +1181,7 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
}
if (total_size == 0) {
return nullptr;
return SUCCESS;
}
auto block_size = GetBlockSize(total_size, ranges);
@ -1120,8 +1195,11 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
// hccl task need align header and tail
block->first_continuous_block_ = true;
block->last_continuous_block_ = true;
} else {
GELOGE(INTERNAL_ERROR, "node apply continuous output memory failed. node_name:%s", n->GetName().c_str());
return INTERNAL_ERROR;
}
return block;
return SUCCESS;
}
MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index, const vector<int64_t> &ranges,
@ -1133,9 +1211,8 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
NodeIndexIO node_index_io(n, index, kOut);
int64_t size = 0;
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
if (output_op_desc != nullptr) {
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
}
GE_IF_BOOL_EXEC(output_op_desc == nullptr, return nullptr);
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
size_t no_align_size = 0;
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(GetNoAlignSize(*node_op_desc, index, no_align_size) != SUCCESS,
return nullptr, "Get no align size failed");
@ -1146,6 +1223,13 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
block->AddNodeTypeIndex({n, kOutput, index, true}, size, no_align_size);
block->ref_count_++;
} else {
// if ref input is variable, can not find symbol, must judge alone
if (IsOutputIndexRef(node_op_desc, index)) {
zero_memory_list_.emplace_back(n, kOutput, index, false);
GELOGI("ref mode skip out block assign. node_name: %s, index:%d", n->GetName().c_str(), index);
return nullptr;
}
int64_t max_size = size;
int64_t memory_type = RT_MEMORY_HBM;
auto iter1 = anchor_to_symbol_.find(node_index_io.ToString());
@ -1393,8 +1477,7 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector
for (auto iter = stream_workspace_blocks_.begin(); iter != stream_workspace_blocks_.end();
++iter) { ReleaseMemorys(iter->second[stream_id], reusable_blocks_[iter->first][stream_id]); });
if (IsContinuousOutput(node)) {
(void)ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
return SUCCESS;
return ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
}
for (uint32_t i = 0; i < static_cast<uint32_t>(op_desc->GetOutputsSize()); i++) {
int64_t size = 0;
@ -1894,9 +1977,8 @@ Status BlockMemAssigner::Assign() {
bool BlockMemAssigner::CheckIsZeroMemNodeType(const string &node_type) const {
return (node_type == VARIABLE) || (node_type == CONSTANT) || (node_type == MULTISHAPE) ||
(node_type == HCOMBROADCAST) || (node_type == CONSTANTOP) ||
(node_type == ASSIGNADD) || (node_type == ASSIGNSUB) || (node_type == ASSIGN) || (node_type == HVDWAIT) ||
(node_type == HVDCALLBACKBROADCAST);
(node_type == CONSTANTOP) || (node_type == ASSIGNADD) || (node_type == ASSIGNSUB) ||
(node_type == ASSIGN) || (node_type == HVDWAIT);
}
bool BlockMemAssigner::GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type) {

@ -421,7 +421,11 @@ class BlockMemAssigner : public MemAssigner {
bool GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type);
MemoryBlock *ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
void ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef, const NodePtr &n);
Status ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
void MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node);
std::unordered_map<int64_t, std::unordered_map<int64_t, std::vector<MemoryBlock *>>> reusable_blocks_;

@ -2148,11 +2148,6 @@ Status DavinciModel::SyncVarData() {
RT_MEMCPY_HOST_TO_DEVICE));
}
for (const auto &item : broadcast_variable_) {
ret = VarManager::Instance(session_id_)->SyncVarData(runtime_param_.graph_id, item.first, item.second, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
item.first.c_str());
}
return ret;
}
@ -2636,12 +2631,6 @@ Status DavinciModel::ReturnResult(uint32_t data_id, const bool rslt_flg, const b
///
Status DavinciModel::ReturnNoOutput(uint32_t data_id) {
GELOGI("ReturnNoOutput model id:%u", model_id_);
for (const auto item : broadcast_variable_) {
Status ret = VarManager::Instance(session_id_)
->SyncBroadCastData2Var(runtime_param_.graph_id, item.first, item.second, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
item.first.c_str());
}
GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null!");
std::vector<ge::OutputTensorInfo> outputs;

@ -92,6 +92,7 @@
#include "graph/passes/unused_args_clean_pass.h"
#include "graph/passes/global_step_insert_pass.h"
#include "graph/passes/memcpy_addr_async_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/build/label_allocator.h"
#include "graph/utils/tensor_adapter.h"
#include "inc/pass_manager.h"
@ -2150,6 +2151,8 @@ Status GraphManager::OptimizeStage1(ge::ComputeGraphPtr &compute_graph) {
new (std::nothrow) TransOpWithoutReshapeFusionPass))
GE_CHK_STATUS_RET(after_merge_passes.AddPass("OptimizeStage1_1::TransOpBreadthFusionPass",
new (std::nothrow) TransOpBreadthFusionPass))
GE_CHK_STATUS_RET(
after_merge_passes.AddPass("OptimizeStage1_1::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass));
GE_TIMESTAMP_START(after_merge_passes);
auto ret = after_merge_passes.Run(compute_graph);

File diff suppressed because it is too large Load Diff

@ -32,11 +32,28 @@ class HcclMemcpyPass : public GraphPass {
private:
NodePtr CreateIdentityNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
NodePtr CreateAssignNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
std::string CheckDuplicateName(const std::string &node_name);
Status ModifyEdgeConnection(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertIdentityBeforeHccl(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertAssignAfterBroadcastIfNeed(const ComputeGraphPtr &graph,
const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status ContinuousInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status MutableInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status P2pmemInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
bool IsDataNode(const std::string& node_type);
std::unordered_map<std::string, uint32_t> node_num_map_;
};
} // namespace ge

@ -49,7 +49,6 @@
#include "graph/passes/for_pass.h"
#include "graph/passes/guarantee_const_pass.h"
#include "graph/passes/hccl_group_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/passes/identity_pass.h"
#include "graph/passes/infershape_pass.h"
#include "graph/passes/merge_pass.h"
@ -1895,8 +1894,6 @@ Status GraphPrepare::PrepareOptimize() {
PassManager graph_pass;
try {
(void)graph_pass.AddPass("PrepareOptimize::PrunePass", new PrunePass);
// todo 临时把hccl的memcpy插入放到图准备为了防止其多插memcpy
(void)graph_pass.AddPass("PrepareOptimize::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass);
} catch (std::bad_alloc &e) {
GELOGE(INTERNAL_ERROR, "Add pass failed, bad memory allocation occurs.");
return INTERNAL_ERROR;

@ -334,7 +334,7 @@ TEST_F(UtestDavinciModel, Init_variable_op) {
EXPECT_EQ(model.InitNodes(graph), SUCCESS);
EXPECT_EQ(model.ReturnNoOutput(1), PARAM_INVALID);
EXPECT_NE(model.SyncVarData(), SUCCESS);
EXPECT_EQ(model.SyncVarData(), SUCCESS);
}
TEST_F(UtestDavinciModel, InitRealSizeAndShapeInfo_succ1) {

Loading…
Cancel
Save