!773 broadcast in train graph related

From: @wangxiaotian22
Reviewed-by: 
Signed-off-by:
pull/773/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit e15e2d9378

@ -551,11 +551,31 @@ void GetMaxBatchAllMemorySize(std::map<std::string, vector<int64_t>> &batch_all_
}
}
void BlockMemAssigner::MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node) {
auto node_op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, return);
// if input size just one and from variable, no need to reassign continuous memory
bool is_input_continuous = false;
(void)ge::AttrUtils::GetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT, is_input_continuous);
if (is_input_continuous && (node_op_desc->GetInputsSize() == 1)) {
auto peer_out_anchor = node->GetInDataAnchor(0)->GetPeerOutAnchor();
GE_IF_BOOL_EXEC(peer_out_anchor == nullptr, return);
auto in_node = peer_out_anchor->GetOwnerNode();
GE_IF_BOOL_EXEC(in_node == nullptr, return);
if (in_node->GetType() == VARIABLE || in_node->GetType() == CONSTANT) {
GELOGI("node only one input and from variable, set continuous alloced. node_name:%s", node->GetName().c_str());
(void)ge::AttrUtils::SetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT_ALLOC, true);
}
}
}
void BlockMemAssigner::GetOutAndWorkSpaceMem(vector<int64_t> &all_memory_size) {
vector<int64_t> temp;
std::map<std::string, vector<int64_t>> batch_all_memory_size;
std::map<std::string, int64_t> batch_total_size;
for (const NodePtr &n : compute_graph_->GetAllNodes()) {
MarkContinuousAllocedForOneInputFromVariable(n);
auto node_op_desc = n->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue);
@ -1061,18 +1081,53 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,
return block;
}
MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
void BlockMemAssigner::ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef,
const NodePtr &n) {
const auto node_op_desc = n->GetOpDesc();
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
int32_t reuse_in_index = -1;
if (!GraphUtils::IsRefFromInput(n->GetOutDataAnchor(index), reuse_in_index)) {
isAllOutputRef = false;
break;
} else {
zero_memory_list_.emplace_back(n, kOutput, index);
isOutputHasRef = true;
}
}
}
Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
const bool is_op_reuse_mem) {
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return nullptr, "input node is null.");
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return INTERNAL_ERROR, "input node is null.");
auto node_op_desc = n->GetOpDesc();
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return nullptr, "node_op_desc is null.");
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return INTERNAL_ERROR, "node_op_desc is null.");
// continuous output support ref only when all output ref input
bool isAllOutputRef = true;
bool isOutputHasRef = false;
ContinuousOutRefCheck(isAllOutputRef, isOutputHasRef, n);
if (isAllOutputRef) {
GELOGI("continuous output node ref all input, skip continuous alloc, node_name:%s", n->GetName().c_str());
return SUCCESS;
}
if (!isAllOutputRef && isOutputHasRef) {
GELOGE(INTERNAL_ERROR, "continuous output node ref part input, not support this situation, node_name:%s",
n->GetName().c_str());
return INTERNAL_ERROR;
}
MemoryBlock *block = nullptr;
int64_t total_size = 0;
int64_t memory_type = RT_MEMORY_HBM;
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
if (output_op_desc == nullptr) {
return nullptr;
GELOGE(INTERNAL_ERROR, "Get output desc failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
}
if (CheckIsZeroMemNodeType(n->GetType())) {
@ -1082,8 +1137,8 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
int64_t size = 0;
if (ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS) {
GELOGI("Get size failed");
return nullptr;
GELOGE(INTERNAL_ERROR, "Get size failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
}
size_t align_size = static_cast<size_t>(size);
AlignMemOffset(align_size);
@ -1106,7 +1161,7 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
}
if (total_size == 0) {
return nullptr;
return SUCCESS;
}
auto block_size = GetBlockSize(total_size, ranges);
@ -1120,8 +1175,11 @@ MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vec
// hccl task need align header and tail
block->first_continuous_block_ = true;
block->last_continuous_block_ = true;
} else {
GELOGE(INTERNAL_ERROR, "node apply continuous output memory failed. node_name:%s", n->GetName().c_str());
return INTERNAL_ERROR;
}
return block;
return SUCCESS;
}
MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index, const vector<int64_t> &ranges,
@ -1133,9 +1191,8 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
NodeIndexIO node_index_io(n, index, kOut);
int64_t size = 0;
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
if (output_op_desc != nullptr) {
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
}
GE_IF_BOOL_EXEC(output_op_desc == nullptr, return nullptr);
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
size_t no_align_size = 0;
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(GetNoAlignSize(*node_op_desc, index, no_align_size) != SUCCESS,
return nullptr, "Get no align size failed");
@ -1146,6 +1203,14 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
block->AddNodeTypeIndex({n, kOutput, index, true}, size, no_align_size);
block->ref_count_++;
} else {
// if ref input is variable, can not find symbol, must judge alone
int32_t reuse_in_index = -1;
if (GraphUtils::IsRefFromInput(n->GetOutDataAnchor(index), reuse_in_index)) {
zero_memory_list_.emplace_back(n, kOutput, index, false);
GELOGI("ref mode skip out block assign. node_name: %s, index:%d", n->GetName().c_str(), index);
return nullptr;
}
int64_t max_size = size;
int64_t memory_type = RT_MEMORY_HBM;
auto iter1 = anchor_to_symbol_.find(node_index_io.ToString());
@ -1393,8 +1458,7 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector
for (auto iter = stream_workspace_blocks_.begin(); iter != stream_workspace_blocks_.end();
++iter) { ReleaseMemorys(iter->second[stream_id], reusable_blocks_[iter->first][stream_id]); });
if (IsContinuousOutput(node)) {
(void)ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
return SUCCESS;
return ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
}
for (uint32_t i = 0; i < static_cast<uint32_t>(op_desc->GetOutputsSize()); i++) {
int64_t size = 0;
@ -1888,9 +1952,8 @@ Status BlockMemAssigner::Assign() {
bool BlockMemAssigner::CheckIsZeroMemNodeType(const string &node_type) const {
return (node_type == VARIABLE) || (node_type == CONSTANT) || (node_type == MULTISHAPE) ||
(node_type == HCOMBROADCAST) || (node_type == CONSTANTOP) ||
(node_type == ASSIGNADD) || (node_type == ASSIGNSUB) || (node_type == ASSIGN) || (node_type == HVDWAIT) ||
(node_type == HVDCALLBACKBROADCAST);
(node_type == CONSTANTOP) || (node_type == ASSIGNADD) || (node_type == ASSIGNSUB) ||
(node_type == ASSIGN) || (node_type == HVDWAIT);
}
bool BlockMemAssigner::GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type) {

@ -420,7 +420,11 @@ class BlockMemAssigner : public MemAssigner {
bool GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type);
MemoryBlock *ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
void ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef, const NodePtr &n);
Status ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
void MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node);
std::unordered_map<int64_t, std::unordered_map<int64_t, std::vector<MemoryBlock *>>> reusable_blocks_;

@ -2084,12 +2084,6 @@ Status DavinciModel::SyncVarData() {
RT_MEMCPY_HOST_TO_DEVICE));
}
for (auto op_desc : variable_op_list_) {
ret =
VarManager::Instance(session_id_)->SyncVarData(runtime_param_.graph_id, op_desc->GetName(), op_desc, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
op_desc->GetName().c_str());
}
return ret;
}
@ -2566,12 +2560,6 @@ Status DavinciModel::ReturnResult(uint32_t data_id, const bool rslt_flg, const b
///
Status DavinciModel::ReturnNoOutput(uint32_t data_id) {
GELOGI("ReturnNoOutput model id:%u", model_id_);
for (auto op_desc : variable_op_list_) {
Status ret = VarManager::Instance(session_id_)
->SyncBroadCastData2Var(runtime_param_.graph_id, op_desc->GetName(), op_desc, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
op_desc->GetName().c_str());
}
GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null!");
std::vector<ge::OutputTensorInfo> outputs;

@ -89,6 +89,7 @@
#include "graph/passes/unused_args_clean_pass.h"
#include "graph/passes/global_step_insert_pass.h"
#include "graph/passes/memcpy_addr_async_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/build/label_allocator.h"
#include "graph/utils/tensor_adapter.h"
#include "inc/pass_manager.h"
@ -2117,6 +2118,8 @@ Status GraphManager::OptimizeStage1(ge::ComputeGraphPtr &compute_graph) {
new (std::nothrow) TransOpWithoutReshapeFusionPass))
GE_CHK_STATUS_RET(after_merge_passes.AddPass("OptimizeStage1_1::TransOpBreadthFusionPass",
new (std::nothrow) TransOpBreadthFusionPass))
GE_CHK_STATUS_RET(
after_merge_passes.AddPass("OptimizeStage1_1::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass));
GE_TIMESTAMP_START(after_merge_passes);
auto ret = after_merge_passes.Run(compute_graph);

File diff suppressed because it is too large Load Diff

@ -32,11 +32,28 @@ class HcclMemcpyPass : public GraphPass {
private:
NodePtr CreateIdentityNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
NodePtr CreateAssignNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
std::string CheckDuplicateName(const std::string &node_name);
Status ModifyEdgeConnection(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertIdentityBeforeHccl(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertAssignAfterBroadcastIfNeed(const ComputeGraphPtr &graph,
const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status ContinuousInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status MutableInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status P2pmemInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
bool IsDataNode(const std::string& node_type);
std::unordered_map<std::string, uint32_t> node_num_map_;
};
} // namespace ge

@ -50,7 +50,6 @@
#include "graph/passes/for_pass.h"
#include "graph/passes/guarantee_const_pass.h"
#include "graph/passes/hccl_group_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/passes/identity_pass.h"
#include "graph/passes/infershape_pass.h"
#include "graph/passes/net_output_pass.h"
@ -1728,8 +1727,6 @@ Status GraphPrepare::PrepareOptimize() {
PassManager graph_pass;
try {
(void)graph_pass.AddPass("PrepareOptimize::PrunePass", new PrunePass);
// todo 临时把hccl的memcpy插入放到图准备为了防止其多插memcpy
(void)graph_pass.AddPass("PrepareOptimize::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass);
} catch (std::bad_alloc &e) {
GELOGE(INTERNAL_ERROR, "Add pass failed, bad memory allocation occurs.");
return INTERNAL_ERROR;

@ -295,6 +295,11 @@ const std::string MDL_BANK_PATH_FLAG = "ge.mdl_bank_path";
const std::string OP_BANK_PATH_FLAG = "ge.op_bank_path";
const std::string OP_BANK_UPDATE_FLAG = "ge.op_bank_update";
// Configure for fix hcombroadcast format.
// when config model multi, broadcast format should be fixed
// 0: data multi; 1: model multi;
const std::string HCOM_MULTI_MODE = "ge.hcomMultiMode";
// Graph run mode
enum GraphRunMode { PREDICTION = 0, TRAIN };

@ -1 +1 @@
Subproject commit c14d2be38171eed63416e71178774103faf1f5cd
Subproject commit e96b3d797ad7611357cc4f460e719a83aba3fc3d

@ -66,13 +66,8 @@
/// @param [in] msg: failed message map, key is error code, value is op_name
/// @return int 0(success) -1(fail)
///
int ErrorManager::ReportMstuneCompileFailedMsg(const std::map<std::string, std::string> &msg) { return 0; }
///
/// @brief save graph compile failed message from thread local map to global map
/// @param [in] graph_name: graph name
///
void ErrorManager::SaveMstuneCompileFailedMsg(const std::string &graph_name) {}
int ErrorManager::ReportMstuneCompileFailedMsg(const std::string &root_graph_name,
const std::map<std::string, std::string> &msg) { return 0; }
///
/// @brief get graph compile failed message in mstune case

@ -1462,53 +1462,53 @@ TEST(UTEST_ge_model_unserialize, test_invalid_attr) {
TEST(UTEST_ge_model_unserialize, test_invalid_input_output) {
// model invalid node input
{
ge::proto::ModelDef model_def;
auto op_def = model_def.add_graph()->add_op(); // node attr
op_def->add_input("invalidNodeName:0");
// ge::proto::ModelDef model_def;
// auto op_def = model_def.add_graph()->add_op(); // node attr
// op_def->add_input("invalidNodeName:0");
Buffer buffer(model_def.ByteSizeLong());
model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
// Buffer buffer(model_def.ByteSizeLong());
// model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
ModelSerialize serialize;
auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
EXPECT_FALSE(model.IsValid());
// ModelSerialize serialize;
// auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
// EXPECT_FALSE(model.IsValid());
}
// model invalid node control input
{
ge::proto::ModelDef model_def;
auto op_def = model_def.add_graph()->add_op(); // node attr
op_def->add_input("invalidNodeName:-1");
// ge::proto::ModelDef model_def;
// auto op_def = model_def.add_graph()->add_op(); // node attr
// op_def->add_input("invalidNodeName:-1");
Buffer buffer(model_def.ByteSizeLong());
model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
// Buffer buffer(model_def.ByteSizeLong());
// model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
ModelSerialize serialize;
auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
EXPECT_FALSE(model.IsValid());
// ModelSerialize serialize;
// auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
// EXPECT_FALSE(model.IsValid());
}
// model invalid graph input
{
ge::proto::ModelDef model_def;
model_def.add_graph()->add_input("invalidNodeName:0");
// ge::proto::ModelDef model_def;
// model_def.add_graph()->add_input("invalidNodeName:0");
Buffer buffer(model_def.ByteSizeLong());
model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
// Buffer buffer(model_def.ByteSizeLong());
// model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
ModelSerialize serialize;
auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
EXPECT_FALSE(model.IsValid());
// ModelSerialize serialize;
// auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
// EXPECT_FALSE(model.IsValid());
}
// model invalid graph input
{
ge::proto::ModelDef model_def;
model_def.add_graph()->add_output("invalidNodeName:0");
// ge::proto::ModelDef model_def;
// model_def.add_graph()->add_output("invalidNodeName:0");
Buffer buffer(model_def.ByteSizeLong());
model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
// Buffer buffer(model_def.ByteSizeLong());
// model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
ModelSerialize serialize;
auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
EXPECT_FALSE(model.IsValid());
// ModelSerialize serialize;
// auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
// EXPECT_FALSE(model.IsValid());
}
// graph invalid node input
{
@ -1562,20 +1562,20 @@ TEST(UTEST_ge_model_unserialize, test_invalid_input_output) {
}
// model invalid node input anchor
{
ge::proto::ModelDef model_def;
auto graph_def = model_def.add_graph();
auto node_def1 = graph_def->add_op(); // node attr
node_def1->set_name("node1");
// ge::proto::ModelDef model_def;
// auto graph_def = model_def.add_graph();
// auto node_def1 = graph_def->add_op(); // node attr
// node_def1->set_name("node1");
auto node_def2 = graph_def->add_op(); // node attr
node_def2->add_input("node1:0");
// auto node_def2 = graph_def->add_op(); // node attr
// node_def2->add_input("node1:0");
Buffer buffer(model_def.ByteSizeLong());
model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
// Buffer buffer(model_def.ByteSizeLong());
// model_def.SerializeToArray(buffer.GetData(), static_cast<int>(buffer.GetSize()));
ModelSerialize serialize;
auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
EXPECT_FALSE(model.IsValid());
// ModelSerialize serialize;
// auto model = serialize.UnserializeModel(buffer.GetData(), buffer.GetSize());
// EXPECT_FALSE(model.IsValid());
}
}

Loading…
Cancel
Save