!820 revert broadcast in train graph related

From: @wangxiaotian22
Reviewed-by: @ji_chen,@sheng-nan
Signed-off-by: @ji_chen
pull/820/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 693c735f70

@ -551,31 +551,11 @@ void GetMaxBatchAllMemorySize(std::map<std::string, vector<int64_t>> &batch_all_
}
}
void BlockMemAssigner::MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node) {
auto node_op_desc = node->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, return);
// if input size just one and from variable, no need to reassign continuous memory
bool is_input_continuous = false;
(void)ge::AttrUtils::GetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT, is_input_continuous);
if (is_input_continuous && (node_op_desc->GetInputsSize() == 1)) {
auto peer_out_anchor = node->GetInDataAnchor(0)->GetPeerOutAnchor();
GE_IF_BOOL_EXEC(peer_out_anchor == nullptr, return);
auto in_node = peer_out_anchor->GetOwnerNode();
GE_IF_BOOL_EXEC(in_node == nullptr, return);
if (in_node->GetType() == VARIABLE || in_node->GetType() == CONSTANT) {
GELOGI("node only one input and from variable, set continuous alloced. node_name:%s", node->GetName().c_str());
(void)ge::AttrUtils::SetBool(node_op_desc, ATTR_NAME_CONTINUOUS_INPUT_ALLOC, true);
}
}
}
void BlockMemAssigner::GetOutAndWorkSpaceMem(vector<int64_t> &all_memory_size) {
vector<int64_t> temp;
std::map<std::string, vector<int64_t>> batch_all_memory_size;
std::map<std::string, int64_t> batch_total_size;
for (const NodePtr &n : compute_graph_->GetAllNodes()) {
MarkContinuousAllocedForOneInputFromVariable(n);
auto node_op_desc = n->GetOpDesc();
GE_IF_BOOL_EXEC(node_op_desc == nullptr, continue);
@ -1081,53 +1061,18 @@ MemoryBlock *BlockMemAssigner::ApplyMemory(size_t block_size, size_t real_size,
return block;
}
void BlockMemAssigner::ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef,
const NodePtr &n) {
const auto node_op_desc = n->GetOpDesc();
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
int32_t reuse_in_index = -1;
if (!GraphUtils::IsRefFromInput(n->GetOutDataAnchor(index), reuse_in_index)) {
isAllOutputRef = false;
break;
} else {
zero_memory_list_.emplace_back(n, kOutput, index);
isOutputHasRef = true;
}
}
}
Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
MemoryBlock *BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges,
const bool is_op_reuse_mem) {
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return INTERNAL_ERROR, "input node is null.");
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(n == nullptr, return nullptr, "input node is null.");
auto node_op_desc = n->GetOpDesc();
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return INTERNAL_ERROR, "node_op_desc is null.");
// continuous output support ref only when all output ref input
bool isAllOutputRef = true;
bool isOutputHasRef = false;
ContinuousOutRefCheck(isAllOutputRef, isOutputHasRef, n);
if (isAllOutputRef) {
GELOGI("continuous output node ref all input, skip continuous alloc, node_name:%s", n->GetName().c_str());
return SUCCESS;
}
if (!isAllOutputRef && isOutputHasRef) {
GELOGE(INTERNAL_ERROR, "continuous output node ref part input, not support this situation, node_name:%s",
n->GetName().c_str());
return INTERNAL_ERROR;
}
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(node_op_desc == nullptr, return nullptr, "node_op_desc is null.");
MemoryBlock *block = nullptr;
int64_t total_size = 0;
int64_t memory_type = RT_MEMORY_HBM;
for (uint32_t index = 0; index < static_cast<uint32_t>(node_op_desc->GetOutputsSize()); index++) {
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
if (output_op_desc == nullptr) {
GELOGE(INTERNAL_ERROR, "Get output desc failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
return nullptr;
}
if (CheckIsZeroMemNodeType(n->GetType())) {
@ -1137,8 +1082,8 @@ Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<in
int64_t size = 0;
if (ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS) {
GELOGE(INTERNAL_ERROR, "Get size failed, node_name:%s, output_index:%u", n->GetName().c_str(), index);
return INTERNAL_ERROR;
GELOGI("Get size failed");
return nullptr;
}
size_t align_size = static_cast<size_t>(size);
AlignMemOffset(align_size);
@ -1161,7 +1106,7 @@ Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<in
}
if (total_size == 0) {
return SUCCESS;
return nullptr;
}
auto block_size = GetBlockSize(total_size, ranges);
@ -1175,11 +1120,8 @@ Status BlockMemAssigner::ApplyContinuousMemory(const NodePtr &n, const vector<in
// hccl task need align header and tail
block->first_continuous_block_ = true;
block->last_continuous_block_ = true;
} else {
GELOGE(INTERNAL_ERROR, "node apply continuous output memory failed. node_name:%s", n->GetName().c_str());
return INTERNAL_ERROR;
}
return SUCCESS;
return block;
}
MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index, const vector<int64_t> &ranges,
@ -1191,8 +1133,9 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
NodeIndexIO node_index_io(n, index, kOut);
int64_t size = 0;
auto output_op_desc = node_op_desc->GetOutputDescPtr(index);
GE_IF_BOOL_EXEC(output_op_desc == nullptr, return nullptr);
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
if (output_op_desc != nullptr) {
GE_IF_BOOL_EXEC(ge::TensorUtils::GetSize(*output_op_desc, size) != SUCCESS, GELOGI("Get size failed"));
}
size_t no_align_size = 0;
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(GetNoAlignSize(*node_op_desc, index, no_align_size) != SUCCESS,
return nullptr, "Get no align size failed");
@ -1203,14 +1146,6 @@ MemoryBlock *BlockMemAssigner::ApplyOutMemory(const NodePtr &n, uint32_t index,
block->AddNodeTypeIndex({n, kOutput, index, true}, size, no_align_size);
block->ref_count_++;
} else {
// if ref input is variable, can not find symbol, must judge alone
int32_t reuse_in_index = -1;
if (GraphUtils::IsRefFromInput(n->GetOutDataAnchor(index), reuse_in_index)) {
zero_memory_list_.emplace_back(n, kOutput, index, false);
GELOGI("ref mode skip out block assign. node_name: %s, index:%d", n->GetName().c_str(), index);
return nullptr;
}
int64_t max_size = size;
int64_t memory_type = RT_MEMORY_HBM;
auto iter1 = anchor_to_symbol_.find(node_index_io.ToString());
@ -1458,7 +1393,8 @@ Status BlockMemAssigner::AssignOutputMemoryWithReuse(const NodePtr &node, vector
for (auto iter = stream_workspace_blocks_.begin(); iter != stream_workspace_blocks_.end();
++iter) { ReleaseMemorys(iter->second[stream_id], reusable_blocks_[iter->first][stream_id]); });
if (IsContinuousOutput(node)) {
return ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
(void)ApplyContinuousMemory(node, ranges, is_op_reuse_mem_);
return SUCCESS;
}
for (uint32_t i = 0; i < static_cast<uint32_t>(op_desc->GetOutputsSize()); i++) {
int64_t size = 0;
@ -1952,8 +1888,9 @@ Status BlockMemAssigner::Assign() {
bool BlockMemAssigner::CheckIsZeroMemNodeType(const string &node_type) const {
return (node_type == VARIABLE) || (node_type == CONSTANT) || (node_type == MULTISHAPE) ||
(node_type == CONSTANTOP) || (node_type == ASSIGNADD) || (node_type == ASSIGNSUB) ||
(node_type == ASSIGN) || (node_type == HVDWAIT);
(node_type == HCOMBROADCAST) || (node_type == CONSTANTOP) ||
(node_type == ASSIGNADD) || (node_type == ASSIGNSUB) || (node_type == ASSIGN) || (node_type == HVDWAIT) ||
(node_type == HVDCALLBACKBROADCAST);
}
bool BlockMemAssigner::GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type) {

@ -420,11 +420,7 @@ class BlockMemAssigner : public MemAssigner {
bool GetWorkSpaceMemoryType(const NodePtr &node, size_t index, int64_t &memory_type);
void ContinuousOutRefCheck(bool &isAllOutputRef, bool &isOutputHasRef, const NodePtr &n);
Status ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
void MarkContinuousAllocedForOneInputFromVariable(const NodePtr &node);
MemoryBlock *ApplyContinuousMemory(const NodePtr &n, const vector<int64_t> &ranges, const bool is_op_reuse_mem);
std::unordered_map<int64_t, std::unordered_map<int64_t, std::vector<MemoryBlock *>>> reusable_blocks_;

@ -2099,6 +2099,12 @@ Status DavinciModel::SyncVarData() {
RT_MEMCPY_HOST_TO_DEVICE));
}
for (auto op_desc : variable_op_list_) {
ret =
VarManager::Instance(session_id_)->SyncVarData(runtime_param_.graph_id, op_desc->GetName(), op_desc, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
op_desc->GetName().c_str());
}
return ret;
}
@ -2571,6 +2577,12 @@ Status DavinciModel::ReturnResult(uint32_t data_id, const bool rslt_flg, const b
///
Status DavinciModel::ReturnNoOutput(uint32_t data_id) {
GELOGI("ReturnNoOutput model id:%u", model_id_);
for (auto op_desc : variable_op_list_) {
Status ret = VarManager::Instance(session_id_)
->SyncBroadCastData2Var(runtime_param_.graph_id, op_desc->GetName(), op_desc, mem_base_);
GE_CHK_BOOL_EXEC(ret == SUCCESS, break, "sync var data ret failed, model id:%u, op name:%s.", model_id_,
op_desc->GetName().c_str());
}
GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null!");
std::vector<ge::OutputTensorInfo> outputs;

@ -93,7 +93,6 @@
#include "graph/passes/unused_args_clean_pass.h"
#include "graph/passes/global_step_insert_pass.h"
#include "graph/passes/memcpy_addr_async_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/build/label_allocator.h"
#include "graph/utils/tensor_adapter.h"
#include "inc/pass_manager.h"
@ -2122,8 +2121,6 @@ Status GraphManager::OptimizeStage1(ge::ComputeGraphPtr &compute_graph) {
new (std::nothrow) TransOpWithoutReshapeFusionPass))
GE_CHK_STATUS_RET(after_merge_passes.AddPass("OptimizeStage1_1::TransOpBreadthFusionPass",
new (std::nothrow) TransOpBreadthFusionPass))
GE_CHK_STATUS_RET(
after_merge_passes.AddPass("OptimizeStage1_1::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass));
GE_TIMESTAMP_START(after_merge_passes);
auto ret = after_merge_passes.Run(compute_graph);

File diff suppressed because it is too large Load Diff

@ -32,28 +32,11 @@ class HcclMemcpyPass : public GraphPass {
private:
NodePtr CreateIdentityNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
NodePtr CreateAssignNode(const ComputeGraphPtr &graph, const OutDataAnchorPtr &out_data_anchor);
std::string CheckDuplicateName(const std::string &node_name);
Status ModifyEdgeConnection(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertIdentityBeforeHccl(const ComputeGraphPtr &graph, const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status InsertAssignAfterBroadcastIfNeed(const ComputeGraphPtr &graph,
const OutDataAnchorPtr &src_out_anchor,
const InDataAnchorPtr &hccl_in_anchor);
Status ContinuousInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status MutableInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
Status P2pmemInputProcess(const ComputeGraphPtr &graph, const NodePtr node);
bool IsDataNode(const std::string& node_type);
std::unordered_map<std::string, uint32_t> node_num_map_;
};
} // namespace ge

@ -51,6 +51,7 @@
#include "graph/passes/for_pass.h"
#include "graph/passes/guarantee_const_pass.h"
#include "graph/passes/hccl_group_pass.h"
#include "graph/passes/hccl_memcpy_pass.h"
#include "graph/passes/identity_pass.h"
#include "graph/passes/infershape_pass.h"
#include "graph/passes/net_output_pass.h"
@ -1732,6 +1733,8 @@ Status GraphPrepare::PrepareOptimize() {
PassManager graph_pass;
try {
(void)graph_pass.AddPass("PrepareOptimize::PrunePass", new PrunePass);
// todo 临时把hccl的memcpy插入放到图准备为了防止其多插memcpy
(void)graph_pass.AddPass("PrepareOptimize::HcclMemcpyPass", new (std::nothrow) HcclMemcpyPass);
} catch (std::bad_alloc &e) {
GELOGE(INTERNAL_ERROR, "Add pass failed, bad memory allocation occurs.");
return INTERNAL_ERROR;

@ -282,4 +282,41 @@ TEST_F(UtestDavinciModel, init_unknown) {
const vector<void *> outputs = { &virtual_addr };
EXPECT_EQ(model.UpdateKnownNodeArgs(inputs, outputs), SUCCESS);
}
TEST_F(UtestDavinciModel, ReturnNoOutput_test) {
DavinciModel model(0, nullptr);
GeTensorDesc tensor(GeShape(), FORMAT_NCHW, DT_FLOAT);
TensorUtils::SetSize(tensor, 512);
OpDescPtr var1 = CreateOpDesc("var1", VARIABLE);
var1->AddInputDesc(tensor);
var1->AddOutputDesc(tensor);
var1->SetInputOffset({1024});
var1->SetOutputOffset({1024});
model.variable_op_list_.push_back(var1);
EXPECT_EQ(model.ReturnNoOutput(1), PARAM_INVALID);
}
TEST_F(UtestDavinciModel, SyncVarData_test) {
DavinciModel model(0, nullptr);
GeTensorDesc tensor(GeShape(), FORMAT_NCHW, DT_FLOAT);
TensorUtils::SetSize(tensor, 512);
OpDescPtr var1 = CreateOpDesc("var1", VARIABLE);
var1->AddInputDesc(tensor);
var1->AddOutputDesc(tensor);
var1->SetInputOffset({1024});
var1->SetOutputOffset({1024});
model.variable_op_list_.push_back(var1);
EXPECT_NE(model.SyncVarData(), SUCCESS);
}
} // namespace ge

Loading…
Cancel
Save