|
|
|
@ -234,6 +234,19 @@ Status TaskGenerator::SaveFusionNodes(map<int64_t, std::vector<NodePtr>> &fusion
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool TaskGenerator::IsSubGraphOfDynamicGraph(const ComputeGraphPtr &graph) const {
|
|
|
|
|
auto parent_graph_ptr = graph->GetParentGraph();
|
|
|
|
|
if (parent_graph_ptr == nullptr) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
auto root_graph_ptr = GraphUtils::FindRootGraph(parent_graph_ptr);
|
|
|
|
|
if (root_graph_ptr == nullptr) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return root_graph_ptr->GetGraphUnknownFlag();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &graph,
|
|
|
|
|
vector<domi::TaskDef> &task_def_list, map<uint32_t, string> &op_name_map) {
|
|
|
|
|
GELOGD("Beign to generate task, graph name is %s.", graph->GetName().c_str());
|
|
|
|
@ -274,7 +287,6 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
};
|
|
|
|
|
GE_MAKE_GUARD(release, callback);
|
|
|
|
|
|
|
|
|
|
uint64_t all_reduce_node_idx = 0;
|
|
|
|
|
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
|
|
|
|
|
OpDescPtr op_desc = node->GetOpDesc();
|
|
|
|
|
GE_CHECK_NOTNULL(op_desc);
|
|
|
|
@ -293,7 +305,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
// Part2: Call
|
|
|
|
|
auto fusion_task_info =
|
|
|
|
|
FusionTaskInfo{run_context, graph, node, op_desc, node_index, ge_lib,
|
|
|
|
|
ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes, all_reduce_node_idx};
|
|
|
|
|
ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes};
|
|
|
|
|
GE_CHK_STATUS_RET(GenerateTaskForFusionNode(fusion_task_info, fusion_nodes, fusion_nodes_seen),
|
|
|
|
|
"Call GenerateTaskForFusionNode node:%s(%s) failed", name.c_str(), type.c_str());
|
|
|
|
|
// continue directly
|
|
|
|
@ -317,8 +329,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
type.c_str());
|
|
|
|
|
// Profiling task
|
|
|
|
|
size_t task_list_size_before = task_def_list.size();
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_node_idx));
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
|
|
|
|
|
int64_t op_id = op_desc->GetId();
|
|
|
|
|
// Compatible with dynamic shape scenes, the default is 0
|
|
|
|
|
int64_t stream_id = 0;
|
|
|
|
@ -338,8 +349,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
// Profiling task
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_node_idx));
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
|
|
|
|
|
size_t task_list_size_after = task_def_list.size();
|
|
|
|
|
// If tasks is reduced
|
|
|
|
|
if (task_list_size_after < task_list_size_before) {
|
|
|
|
@ -382,7 +392,6 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
auto &op_name_map = fusion_task_info.op_name_map;
|
|
|
|
|
auto &profiling_point = fusion_task_info.profiling_point;
|
|
|
|
|
auto &all_reduce_nodes = fusion_task_info.all_reduce_nodes;
|
|
|
|
|
auto &all_reduce_idx = fusion_task_info.all_reduce_node_idx;
|
|
|
|
|
// If op_desc have this attr, call nodes with same group key in a stream together
|
|
|
|
|
if (ge::AttrUtils::GetInt(fusion_op_desc, ATTR_NAME_FUSION_GROUP_KEY, group_key) &&
|
|
|
|
|
(fusion_nodes_seen.count(node.get()) == 0)) {
|
|
|
|
@ -429,8 +438,7 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
return INTERNAL_ERROR;
|
|
|
|
|
}
|
|
|
|
|
// profiling task
|
|
|
|
|
(void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_idx);
|
|
|
|
|
(void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list);
|
|
|
|
|
run_context.stream = run_context.graphStreamList[stream_id];
|
|
|
|
|
GELOGI("Fusion: Call %s to generate fusion_node:[fusion_node_name:%s(%s), id:%ld, stream_id:%ld] task.",
|
|
|
|
|
op_kernel_lib_name.c_str(), fusion_node_name.c_str(), fusion_node_type.c_str(), op_id, stream_id);
|
|
|
|
@ -443,8 +451,7 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
// profiling task
|
|
|
|
|
(void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_idx);
|
|
|
|
|
(void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list);
|
|
|
|
|
size_t task_list_size_after = task_def_list.size();
|
|
|
|
|
// if tasks is reduced
|
|
|
|
|
if (task_list_size_after < task_list_size_before) {
|
|
|
|
@ -850,6 +857,13 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi
|
|
|
|
|
GELOGD("Profiling is not open.");
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subgraph of dynamic graph no need to find index, has been found in parent graph
|
|
|
|
|
if (IsSubGraphOfDynamicGraph(graph)) {
|
|
|
|
|
GELOGI("Graph[%s] is subgraph of dynamic graph, no nned to find index.", graph->GetName().c_str());
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
GELOGI("Start get FP/BP index.");
|
|
|
|
|
std::string fp_point_str;
|
|
|
|
|
std::string bp_point_str;
|
|
|
|
@ -887,9 +901,47 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingArTaskBefore(const OpDescPtr &op_desc, std::vector<uint32_t> &all_reduce_nodes,
|
|
|
|
|
uint32_t node_index, std::vector<domi::TaskDef> &task_def_list,
|
|
|
|
|
bool is_insert_bp_profiling_task) {
|
|
|
|
|
bool is_insert_all_reduce_task = false;
|
|
|
|
|
int64_t ar_log_id = 0xFFFF;
|
|
|
|
|
if (is_insert_bp_profiling_task) {
|
|
|
|
|
(void)ge::AttrUtils::GetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, ar_log_id);
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
}
|
|
|
|
|
if (!is_insert_all_reduce_task) {
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] == node_index) {
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
ar_log_id = i * kProfilingArStep + kProfilingArStartLogid;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is_insert_all_reduce_task) {
|
|
|
|
|
GELOGI("The start allreduce operator is %s, idx %u, log_id %ld", op_desc->GetName().c_str(), node_index, ar_log_id);
|
|
|
|
|
TaskDef ar_task_def;
|
|
|
|
|
ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
|
ar_task_def.set_stream_id(op_desc->GetStreamId());
|
|
|
|
|
LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp();
|
|
|
|
|
if (ar_log_def != nullptr) {
|
|
|
|
|
ar_log_def->set_logid(ar_log_id);
|
|
|
|
|
ar_log_def->set_notify(false);
|
|
|
|
|
}
|
|
|
|
|
task_def_list.push_back(ar_task_def);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point,
|
|
|
|
|
vector<uint32_t> &all_reduce_nodes, uint32_t node_index,
|
|
|
|
|
vector<domi::TaskDef> &task_def_list, uint64_t &all_reduce_node_idx) {
|
|
|
|
|
vector<domi::TaskDef> &task_def_list) {
|
|
|
|
|
const char *profiling_mode = std::getenv(kProfilingMode);
|
|
|
|
|
bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() ||
|
|
|
|
|
ProfilingManager::Instance().ProfilingTrainingTraceOn();
|
|
|
|
@ -932,19 +984,31 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE);
|
|
|
|
|
uint64_t all_reduce_task_idx = 0;
|
|
|
|
|
if (is_all_reduce) {
|
|
|
|
|
(void)InsertProfilingArTaskBefore(op_desc, all_reduce_nodes, node_index,
|
|
|
|
|
task_def_list, is_insert_bp_profiling_task);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingArTaskAfter(const OpDescPtr &op_desc, std::vector<uint32_t> &all_reduce_nodes,
|
|
|
|
|
uint32_t node_index, std::vector<domi::TaskDef> &task_def_list,
|
|
|
|
|
bool is_insert_bp_profiling_task) {
|
|
|
|
|
bool is_insert_all_reduce_task = false;
|
|
|
|
|
if (is_all_reduce && is_insert_bp_profiling_task) {
|
|
|
|
|
all_reduce_task_idx = all_reduce_node_idx;
|
|
|
|
|
int64_t ar_log_id = 0xFFFF;
|
|
|
|
|
if (is_insert_bp_profiling_task) {
|
|
|
|
|
(void)ge::AttrUtils::GetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, ar_log_id);
|
|
|
|
|
ar_log_id += 1;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
}
|
|
|
|
|
if (is_all_reduce) {
|
|
|
|
|
all_reduce_node_idx++;
|
|
|
|
|
}
|
|
|
|
|
if (!is_insert_all_reduce_task) {
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] == node_index) {
|
|
|
|
|
all_reduce_task_idx = i;
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
ar_log_id = i * kProfilingArStep + kProfilingArEndLogid;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -952,28 +1016,24 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is_insert_all_reduce_task) {
|
|
|
|
|
GELOGI("The start allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
GELOGI("The start allreduce operator is %s, idx %u, log_id %ld", op_desc->GetName().c_str(), node_index, ar_log_id);
|
|
|
|
|
TaskDef ar_task_def;
|
|
|
|
|
ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
|
ar_task_def.set_stream_id(op_desc->GetStreamId());
|
|
|
|
|
LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp();
|
|
|
|
|
if (ar_log_def != nullptr) {
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArStartLogid;
|
|
|
|
|
ar_log_def->set_logid(log_id);
|
|
|
|
|
ar_log_def->set_logid(ar_log_id);
|
|
|
|
|
ar_log_def->set_notify(false);
|
|
|
|
|
(void)ge::AttrUtils::SetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, log_id);
|
|
|
|
|
}
|
|
|
|
|
task_def_list.push_back(ar_task_def);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point,
|
|
|
|
|
vector<uint32_t> &all_reduce_nodes, uint32_t node_index,
|
|
|
|
|
vector<domi::TaskDef> &task_def_list, uint64_t all_reduce_node_idx) {
|
|
|
|
|
vector<domi::TaskDef> &task_def_list) {
|
|
|
|
|
GE_CHECK_NOTNULL(op_desc);
|
|
|
|
|
const char *profiling_mode = std::getenv(kProfilingMode);
|
|
|
|
|
bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() ||
|
|
|
|
@ -1018,36 +1078,11 @@ Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const P
|
|
|
|
|
task_def_list.emplace_back(end_task_def);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint32_t all_reduce_task_idx = 0;
|
|
|
|
|
bool is_insert_all_reduce_task = false;
|
|
|
|
|
if (is_all_reduce && is_insert_bp_profiling_task) {
|
|
|
|
|
all_reduce_task_idx = all_reduce_node_idx;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] == node_index) {
|
|
|
|
|
all_reduce_task_idx = i;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (is_all_reduce) {
|
|
|
|
|
(void)InsertProfilingArTaskAfter(op_desc, all_reduce_nodes, node_index,
|
|
|
|
|
task_def_list, is_insert_bp_profiling_task);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is_insert_all_reduce_task) {
|
|
|
|
|
GELOGI("The end allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
TaskDef ar_task_def;
|
|
|
|
|
ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
|
ar_task_def.set_stream_id(op_desc->GetStreamId());
|
|
|
|
|
LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp();
|
|
|
|
|
GE_CHECK_NOTNULL(ar_log_def);
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArEndLogid;
|
|
|
|
|
ar_log_def->set_logid(log_id);
|
|
|
|
|
ar_log_def->set_notify(false);
|
|
|
|
|
task_def_list.emplace_back(ar_task_def);
|
|
|
|
|
}
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|