!1353 Adding dependencies by parallel groups

From: @xchu42
Reviewed-by: @ji_chen,@wqtshg
Signed-off-by: @ji_chen
pull/1353/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 689dab39c7

File diff suppressed because it is too large Load Diff

@ -57,14 +57,17 @@ class HybridModelBuilder {
Status ValidateParams();
Status LoadGraph();
Status LoadGeModel(ComputeGraph &graph, const GeModelPtr &ge_model);
Status LoadTask(NodeItem &node_item);
Status LoadTasks();
Status IdentifyVariableOutputs(NodeItem &node_item);
Status IdentifySameInputs(NodeItem &node_item);
Status BuildNodeItem(const NodePtr &node, NodeItem &node_item);
Status GetOrCreateNodeItem(const NodePtr &node, NodeItem **node_item);
Status ParseForceInfershapeNodes(const NodePtr &node, NodeItem &node_item);
Status CollectParallelGroups(NodeItem *node_item);
Status ParseDependentInputNodes(NodeItem &node_item, const std::vector<string> &dependencies);
Status ParseDependentForFusedSubgraph(NodeItem &node_item);
Status ParseDependentForFusedSubgraph(NodeItem &node_item, std::set<ge::NodePtr> &dependencies);
Status ParseDependentByParallelGroup();
Status IndexTaskDefs();
Status IndexTaskDefs(const ComputeGraphPtr &sub_graph, const GeModelPtr &ge_model);
Status IndexSpecialNodes();
@ -97,12 +100,14 @@ class HybridModelBuilder {
NodeItem *MutableNodeItem(const NodePtr &node);
GeRootModelPtr ge_root_model_;
ComputeGraphPtr root_graph_;
std::map<std::string, GeModelPtr> subgraph_models_;
std::map<std::string, NodePtr> constant_op_nodes_;
std::map<std::string, std::set<NodeItem *>> parallel_group_to_nodes_;
std::map<NodeItem *, std::set<std::string>> node_to_parallel_groups_;
HybridModel &hybrid_model_;
std::map<NodePtr, std::vector<std::pair<int, NodePtr>>> node_ref_inputs_;
int node_index = 0;
RuntimeParam &runtime_param_;
VarManager *var_manager_ = nullptr;

@ -251,6 +251,10 @@ bool NodeItem::IsControlOp() const {
return ge::hybrid::IsControlOp(op_desc->GetType());
}
bool NodeItem::IsHcclOp() const {
return NodeExecutorManager::GetInstance().ResolveExecutorType(*node) == NodeExecutorManager::ExecutorType::HCCL;
}
std::string NodeItem::DebugString() const {
std::stringstream ss;
ss << "Node: ";

@ -67,6 +67,8 @@ struct NodeItem {
bool IsControlOp() const;
bool IsHcclOp() const;
void SetToDynamic();
std::string DebugString() const;

@ -95,13 +95,6 @@ Status KnownNodeTask::UpdateArgs(TaskContext &context) {
Status KnownNodeTask::Init(TaskContext &context) {
// allocate output mem
GE_CHK_STATUS_RET(context.AllocateOutputs(), "known node task allocate output failed.");
// init davinicmodel
if (!load_flag_) {
davinci_model_->InitRuntimeParams();
GE_CHK_STATUS_RET(davinci_model_->InitVariableMem(), "init variable mem failed.");
}
// allocate mem base
void *buffer = nullptr;
if (davinci_model_->TotalMemSize() != 0) {
@ -129,23 +122,31 @@ Status KnownNodeTask::Init(TaskContext &context) {
void *global_step = context.GetExecutionContext()->global_step;
davinci_model_->SetKnownShapeGlobalStep(global_step);
}
int32_t device_id = 0;
rtError_t rt_ret = rtGetDevice(&device_id);
if (rt_ret != RT_ERROR_NONE || device_id < 0) {
GELOGE(rt_ret, "Call rtGetDevice failed, ret = 0x%X, device_id = %d.", rt_ret, device_id);
return RT_ERROR_TO_GE_STATUS(rt_ret);
}
davinci_model_->SetDeviceId(device_id);
GE_CHK_STATUS_RET(davinci_model_->Init(), "KnownNodeExecutor::InitDavinciModel failed.");
load_flag_ = true;
} else {
GE_CHK_STATUS_RET(ModelManager::GetInstance()->DestroyAicpuKernel(davinci_model_->GetSessionId(),
davinci_model_->Id(), davinci_model_->SubModelId()), "KnownNodeTask::Init destroy aicpu kernel failed.");
}
GE_CHK_STATUS_RET(ModelManager::GetInstance()->DestroyAicpuKernel(davinci_model_->GetSessionId(),
davinci_model_->Id(), davinci_model_->SubModelId()),
"KnownNodeTask::Init destroy aicpu kernel failed.");
GELOGI("[%s] KnownNodeExecutor::Init success.", context.GetNodeName());
return SUCCESS;
}
Status KnownNodeTask::InitDavinciModel() {
GELOGD("[Init][Model] start");
davinci_model_->InitRuntimeParams();
GE_CHK_STATUS_RET(davinci_model_->InitVariableMem(), "init variable mem failed");
int32_t device_id = 0;
GE_CHK_RT_RET(rtGetDevice(&device_id));
davinci_model_->SetDeviceId(static_cast<uint32_t>(device_id));
GE_CHK_STATUS_RET(DoInitDavinciModel(), "[Init][Model] Failed to init davinci model.");
GELOGD("[Init][Model] success");
return SUCCESS;
}
Status KnownNodeTask::DoInitDavinciModel() {
return davinci_model_->Init();
}
Status KnownNodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const {
GELOGD("[%s] KnownNodeExecutor::PrepareTask in.", context.GetNodeName());
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeExecutorPrepareTask] Start");
@ -182,9 +183,11 @@ Status KnownNodeExecutor::LoadTask(const HybridModel &model, const NodePtr &node
GE_CHK_STATUS_RET(davinci_model->Assign(ge_model), "KnownNodeExecutor::LoadTask davincimodel assign failed.");
task = MakeShared<KnownNodeTask>(davinci_model);
GE_CHECK_NOTNULL(task);
auto known_node_task = MakeShared<KnownNodeTask>(davinci_model);
GE_CHECK_NOTNULL(known_node_task);
GE_CHK_STATUS_RET_NOLOG(known_node_task->InitDavinciModel());
GELOGI("[%s] KnownNodeExecutor::LoadTask success.", node->GetName().c_str());
task = std::move(known_node_task);
return SUCCESS;
}

@ -31,11 +31,15 @@ class KnownNodeTask : public NodeTask {
: davinci_model_(davinci_model)
{}
~KnownNodeTask() {}
~KnownNodeTask() = default;
Status UpdateArgs(TaskContext &context) override;
Status ExecuteAsync(TaskContext &context, std::function<void()> done_callback) override;
Status Init(TaskContext &context) override;
Status InitDavinciModel();
protected:
virtual Status DoInitDavinciModel();
private:
std::shared_ptr<DavinciModel> davinci_model_ = nullptr;
bool load_flag_ = false;
@ -47,8 +51,6 @@ class KnownNodeExecutor : public NodeExecutor {
Status PrepareTask(NodeTask &task, TaskContext &context) const;
Status ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const;
~KnownNodeExecutor() {}
private:
std::shared_ptr<DavinciModel> davinci_model_ = nullptr;
};
} // namespace hybrid
} // namespace ge

@ -797,6 +797,7 @@ set(PROFILING_MNG_TEST_FILES
set(HYBRID_TEST_FILES
"hybrid/ge_hybrid_unittest.cc"
"hybrid/known_node_executor_unittest.cc"
)
set(OTHERS_TEST_FILES

@ -19,10 +19,12 @@
#include <vector>
#include "runtime/rt.h"
#include "graph/utils/node_utils.h"
#define protected public
#define private public
#include "hybrid/model/hybrid_model_builder.h"
#include "hybrid/model/hybrid_model.h"
#include "hybrid/node_executor/node_executor.h"
#include "model/ge_model.h"
#include "model/ge_root_model.h"
#include "hybrid/node_executor/aicore/aicore_op_task.h"
@ -51,7 +53,9 @@ class UtestGeHybrid : public testing::Test {
protected:
void SetUp() {}
void TearDown() {}
void TearDown() {
NpuMemoryAllocator::allocators_.clear();
}
};
static ge::OpDescPtr CreateOpDesc(string name = "", string type = "") {
@ -245,7 +249,7 @@ TEST_F(UtestGeHybrid, init_weight_success) {
ASSERT_EQ(ret,PARAM_INVALID);
}
TEST_F(UtestGeHybrid, hybrid_model_executor) {
TEST_F(UtestGeHybrid, hybrid_model_executor) {
ComputeGraphPtr compute_graph = MakeShared<ComputeGraph>("abc");
GeRootModelPtr root_model = MakeShared<ge::GeRootModel>(compute_graph);
HybridModel model(root_model);
@ -256,3 +260,71 @@ TEST_F(UtestGeHybrid, init_weight_success) {
HybridModelExecutor executor(model_ptr, device_id, stream);
executor.Init();
}
TEST_F(UtestGeHybrid, test_parse_parallel_group) {
NodeExecutorManager::GetInstance().engine_mapping_.emplace("ops_kernel_info_hccl",
NodeExecutorManager::ExecutorType::HCCL);
ComputeGraphPtr compute_graph = MakeShared<ComputeGraph>("test");
OpDescPtr op_desc = CreateOpDesc("AllReduce", "AllReduce");
op_desc->SetId(0);
ge::AttrUtils::SetStr(op_desc, ATTR_NAME_PARALLEL_GROUP, "group_1");
auto node = compute_graph->AddNode(op_desc);
std::unique_ptr<NodeItem> node_item;
NodeItem::Create(node, node_item);
node_item->node_id = 0;
op_desc->SetOpKernelLibName("ops_kernel_info_hccl");
GeRootModelPtr root_model = MakeShared<ge::GeRootModel>(compute_graph);
HybridModel model(root_model);
HybridModelBuilder builder(model);
builder.root_graph_ = compute_graph;
ASSERT_EQ(builder.CollectParallelGroups(node_item.get()), SUCCESS);
ASSERT_EQ(builder.node_to_parallel_groups_.size(), 1);
ASSERT_EQ(builder.parallel_group_to_nodes_.size(), 1);
OpDescPtr op_desc_1 = CreateOpDesc("subgraph", "PartitionedCall");
op_desc_1->AddSubgraphName("subgraph");
auto node_1 = compute_graph->AddNode(op_desc_1);
ComputeGraphPtr subgraph = MakeShared<ComputeGraph>("subgraph");
ASSERT_EQ(NodeUtils::SetSubgraph(*node_1, 0, subgraph), GRAPH_SUCCESS);
std::unique_ptr<NodeItem> node_item_1;
NodeItem::Create(node_1, node_item_1);
node_item_1->node_id = 1;
ASSERT_EQ(builder.CollectParallelGroups(node_item_1.get()), SUCCESS);
ASSERT_EQ(builder.node_to_parallel_groups_.size(), 1);
ASSERT_EQ(builder.parallel_group_to_nodes_.size(), 1);
OpDescPtr op_desc_2 = CreateOpDesc("sub_node_1", "AllReduce");
ge::AttrUtils::SetStr(op_desc_2, ATTR_NAME_PARALLEL_GROUP, "group_1");
auto node_2 = subgraph->AddNode(op_desc_2);
ASSERT_TRUE(node_2 != nullptr);
OpDescPtr op_desc_3 = CreateOpDesc("sub_node_2", "AllReduce2");
ge::AttrUtils::SetStr(op_desc_3, ATTR_NAME_PARALLEL_GROUP, "group_2");
auto node_3 = subgraph->AddNode(op_desc_3);
ASSERT_TRUE(node_3 != nullptr);
ASSERT_EQ(builder.CollectParallelGroups(node_item_1.get()), SUCCESS);
ASSERT_EQ(builder.node_to_parallel_groups_.size(), 2);
ASSERT_EQ(builder.parallel_group_to_nodes_.size(), 2);
ASSERT_EQ(builder.parallel_group_to_nodes_["group_1"].size(), 2);
ASSERT_EQ(builder.parallel_group_to_nodes_["group_2"].size(), 1);
ASSERT_FALSE(node_item->has_observer);
ASSERT_TRUE(node_item_1->dependents_for_execution.empty());
ASSERT_EQ(builder.ParseDependentByParallelGroup(), SUCCESS);
ASSERT_TRUE(node_item->has_observer);
ASSERT_EQ(node_item_1->dependents_for_execution.size(), 1);
ASSERT_EQ(node_item_1->dependents_for_execution[0], node);
// repeat parse
ASSERT_EQ(builder.ParseDependentByParallelGroup(), SUCCESS);
ASSERT_TRUE(node_item->has_observer);
ASSERT_EQ(node_item_1->dependents_for_execution.size(), 1);
ASSERT_EQ(node_item_1->dependents_for_execution[0], node);
}

@ -0,0 +1,62 @@
/**
* Copyright 2019-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <vector>
#include <memory>
#define protected public
#define private public
#include "hybrid/node_executor/compiledsubgraph/known_node_executor.h"
#undef private
#undef protected
#include "graph/manager/graph_mem_allocator.h"
using namespace std;
using namespace testing;
using namespace ge;
using namespace hybrid;
class UnknownNodeExecutorTest : public testing::Test {
protected:
void SetUp() {}
void TearDown() {}
};
namespace {
class KnownNodeTaskMock : public KnownNodeTask {
public:
KnownNodeTaskMock(std::shared_ptr<DavinciModel> davinci_model): KnownNodeTask(davinci_model) {};
~KnownNodeTaskMock() override = default;
MOCK_METHOD0(DoInitDavinciModel, Status());
};
}
TEST_F(UnknownNodeExecutorTest, test_init_davinci_model) {
auto davinci_model = std::make_shared<DavinciModel>(0, nullptr);
davinci_model->SetDeviceId(0);
davinci_model->SetKnownNode(true);
auto ge_model = make_shared<GeModel>();
AttrUtils::SetInt(ge_model, ATTR_MODEL_VAR_SIZE, 0);
AttrUtils::SetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, 1024);
davinci_model->Assign(ge_model);
KnownNodeTaskMock mock(davinci_model);
EXPECT_CALL(mock, DoInitDavinciModel).WillOnce(::testing::Return(SUCCESS));
ASSERT_EQ(mock.InitDavinciModel(), SUCCESS);
}
Loading…
Cancel
Save