diff --git a/ge/common/profiling/profiling_manager.cc b/ge/common/profiling/profiling_manager.cc index 92417286..aad2bbe3 100644 --- a/ge/common/profiling/profiling_manager.cc +++ b/ge/common/profiling/profiling_manager.cc @@ -302,6 +302,8 @@ FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY void ProfilingManager::Profilin } data.append(" model_id:").append(std::to_string(model_id)); + data.append(" task_id:").append(std::to_string(graph.task_id)); + data.append(" stream_id:").append(std::to_string(graph.stream_id)); data.append("\n"); GraphDescReport(device_id, data); diff --git a/ge/common/types.cc b/ge/common/types.cc index 1cc70347..268e7caa 100644 --- a/ge/common/types.cc +++ b/ge/common/types.cc @@ -480,6 +480,9 @@ REGISTER_OPTYPE_DEFINE(HVDWAIT, "HorovodWait"); // aicpu op for online_infer dynamic_dims REGISTER_OPTYPE_DEFINE(GETDYNAMICDIMS, "GetDynamicDims"); +// profiling training trace node +REGISTER_OPTYPE_DEFINE(PROFILINGTRAININGTRACE, "ProfilingTrainingTrace"); + const std::string MODEL_ATTR_TASKS = "tasks"; const std::string MODEL_ATTR_TASK_GEN_BASE_ADDR = "task_gen_base_addr"; const std::string MODEL_ATTR_TASK_GEN_WEIGHT_ADDR = "task_gen_weight_addr"; diff --git a/ge/graph/build/graph_builder.cc b/ge/graph/build/graph_builder.cc index dce40c3e..143d5550 100644 --- a/ge/graph/build/graph_builder.cc +++ b/ge/graph/build/graph_builder.cc @@ -421,6 +421,52 @@ static Status GenerateTaskForConstant(const std::shared_ptr &graph return SUCCESS; } +Status GraphBuilder::MarkFpBpProfilingTaskAttr(ComputeGraphPtr &com_graph) { + bool original_unknown_shape_flag = com_graph->GetGraphUnknownFlag(); + com_graph->SetGraphUnknownFlag(false); + + GELOGD("Start to mark profiling task attr for fp and bp."); + TaskGenerator task_generator; + ProfilingPoint profiling_point; + std::vector all_reduce_node_index; + Status ret = task_generator.FindProfilingNodeIndex(com_graph, profiling_point, all_reduce_node_index); + com_graph->SetGraphUnknownFlag(original_unknown_shape_flag); + if (ret != SUCCESS) { + GELOGW("Find profiling node index failed."); + } + if (profiling_point.fp_index == 0 || profiling_point.bp_index == 0 || profiling_point.end_index.empty()) { + GELOGD("No need to mark fp bp profiling task attr."); + return SUCCESS; + } + // mark profiling task attr for node + uint32_t node_index = 0; + for (const auto &node : com_graph->GetAllNodes()) { + OpDescPtr op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(node->GetOpDesc()); + node_index++; + if (profiling_point.fp_index == node_index) { + GELOGI("The first fp node of dynamic graph is %s, idx %u", op_desc->GetName().c_str(), node_index); + (void)ge::AttrUtils::SetBool(op_desc, ATTR_NAME_INSERT_FP_PROFILILNG_TASK, true); + } + if (profiling_point.bp_index == node_index) { + GELOGI("The bp node of dynamic graph is %s, idx %u", op_desc->GetName().c_str(), node_index); + (void)ge::AttrUtils::SetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, true); + } + for (size_t i = 0; i < all_reduce_node_index.size(); i++) { + if (all_reduce_node_index[i] == node_index) { + GELOGI("The all reduce node of dynamic graph is %s, idx %u", op_desc->GetName().c_str(), node_index); + (void)ge::AttrUtils::SetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, true); + continue; + } + } + if (profiling_point.end_index.find(node_index) != profiling_point.end_index.end()) { + GELOGI("The end node of dynamic graph is %s, idx %u", op_desc->GetName().c_str(), node_index); + (void)ge::AttrUtils::SetBool(op_desc, ATTR_NAME_INSERT_END_PROFILILNG_TASK, true); + } + } + return SUCCESS; +} + Status GraphBuilder::BuildForDynamicShapeGraph(ComputeGraphPtr &comp_graph, std::vector &subgraph_ptr_list, GeRootModelPtr &ge_root_model_ptr, GeModelPtr &ge_model_ptr, @@ -437,6 +483,12 @@ Status GraphBuilder::BuildForDynamicShapeGraph(ComputeGraphPtr &comp_graph, } } + // Set fp bp profiling task attr for graph + if (MarkFpBpProfilingTaskAttr(comp_graph) != SUCCESS) { + GELOGE(FAILED, "Set fp bp profiling task attr for graph."); + return FAILED; + } + auto all_graphs = comp_graph->GetAllSubgraphs(); if (all_graphs.empty()) { all_graphs.push_back(comp_graph); diff --git a/ge/graph/build/graph_builder.h b/ge/graph/build/graph_builder.h index b828a80d..524b60e0 100644 --- a/ge/graph/build/graph_builder.h +++ b/ge/graph/build/graph_builder.h @@ -60,6 +60,7 @@ class GraphBuilder { Status UpdateParentNodeOutputSize(const ge::ComputeGraphPtr &graph, ge::NodePtr &parent_node_ptr); Status CalcDynShapeRootGraphDataSize(const ge::OpDescPtr &op_desc); Status SecondPartition(ge::ComputeGraphPtr &comp_graph, vector &subgraph_ptr_list); + Status MarkFpBpProfilingTaskAttr(ComputeGraphPtr &com_graph); Status BuildForDynamicShapeGraph(ComputeGraphPtr &comp_graph, std::vector &subgraph_ptr_list, GeRootModelPtr &ge_root_model_ptr, GeModelPtr &ge_model_ptr, uint64_t session_id = INVALID_SESSION_ID); diff --git a/ge/graph/build/task_generator.cc b/ge/graph/build/task_generator.cc index 7e45ad61..21e82d11 100755 --- a/ge/graph/build/task_generator.cc +++ b/ge/graph/build/task_generator.cc @@ -274,6 +274,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra }; GE_MAKE_GUARD(release, callback); + uint64_t all_reduce_node_idx = 0; for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) { OpDescPtr op_desc = node->GetOpDesc(); GE_CHECK_NOTNULL(op_desc); @@ -292,7 +293,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra // Part2: Call auto fusion_task_info = FusionTaskInfo{run_context, graph, node, op_desc, node_index, ge_lib, - ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes}; + ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes, all_reduce_node_idx}; GE_CHK_STATUS_RET(GenerateTaskForFusionNode(fusion_task_info, fusion_nodes, fusion_nodes_seen), "Call GenerateTaskForFusionNode node:%s(%s) failed", name.c_str(), type.c_str()); // continue directly @@ -316,7 +317,8 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra type.c_str()); // Profiling task size_t task_list_size_before = task_def_list.size(); - GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list)); + GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, + node_index, task_def_list, all_reduce_node_idx)); int64_t op_id = op_desc->GetId(); // Compatible with dynamic shape scenes, the default is 0 int64_t stream_id = 0; @@ -336,8 +338,8 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra return ret; } // Profiling task - GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list)); - + GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, + node_index, task_def_list, all_reduce_node_idx)); size_t task_list_size_after = task_def_list.size(); // If tasks is reduced if (task_list_size_after < task_list_size_before) { @@ -380,6 +382,7 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info auto &op_name_map = fusion_task_info.op_name_map; auto &profiling_point = fusion_task_info.profiling_point; auto &all_reduce_nodes = fusion_task_info.all_reduce_nodes; + auto &all_reduce_idx = fusion_task_info.all_reduce_node_idx; // If op_desc have this attr, call nodes with same group key in a stream together if (ge::AttrUtils::GetInt(fusion_op_desc, ATTR_NAME_FUSION_GROUP_KEY, group_key) && (fusion_nodes_seen.count(node.get()) == 0)) { @@ -426,7 +429,8 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info return INTERNAL_ERROR; } // profiling task - (void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list); + (void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, + node_index, task_def_list, all_reduce_idx); run_context.stream = run_context.graphStreamList[stream_id]; GELOGI("Fusion: Call %s to generate fusion_node:[fusion_node_name:%s(%s), id:%ld, stream_id:%ld] task.", op_kernel_lib_name.c_str(), fusion_node_name.c_str(), fusion_node_type.c_str(), op_id, stream_id); @@ -439,7 +443,8 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info return ret; } // profiling task - (void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list); + (void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, + node_index, task_def_list, all_reduce_idx); size_t task_list_size_after = task_def_list.size(); // if tasks is reduced if (task_list_size_after < task_list_size_before) { @@ -830,6 +835,11 @@ Status TaskGenerator::GetFpBpIndex(const ComputeGraphPtr &graph, ProfilingPoint return SUCCESS; } +Status TaskGenerator::FindProfilingNodeIndex(const ComputeGraphPtr &graph, ProfilingPoint &profiling_point, + std::vector &all_reduce_nodes) { + return FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes); +} + Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, ProfilingPoint &profiling_point, vector &all_reduce_nodes) const { GE_CHECK_NOTNULL(graph); @@ -840,7 +850,6 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi GELOGD("Profiling is not open."); return SUCCESS; } - GELOGI("Start get FP/BP index."); std::string fp_point_str; std::string bp_point_str; @@ -878,18 +887,27 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi return SUCCESS; } - Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point, vector &all_reduce_nodes, uint32_t node_index, - vector &task_def_list) { + vector &task_def_list, uint64_t &all_reduce_node_idx) { const char *profiling_mode = std::getenv(kProfilingMode); bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() || ProfilingManager::Instance().ProfilingTrainingTraceOn(); - if (!is_profiling || (profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) || - (profiling_point.end_index.empty())) { + bool is_insert_fp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_FP_PROFILILNG_TASK, is_insert_fp_profiling_task); + bool is_insert_bp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task); + bool no_insert_profiling_task = ((profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) || + (profiling_point.end_index.empty())) && + (!(is_insert_fp_profiling_task || is_insert_bp_profiling_task)); + if (!is_profiling || no_insert_profiling_task) { return SUCCESS; } - if (profiling_point.fp_index == node_index) { + GELOGD("Insert fp profiling task: %d, insert bp profiling task: %d, fp index: %u, bp index: %u, end index size: %zu", + is_insert_fp_profiling_task, is_insert_bp_profiling_task, profiling_point.fp_index, profiling_point.bp_index, + profiling_point.end_index.size()); + + if ((profiling_point.fp_index == node_index) || is_insert_fp_profiling_task) { uint64_t jobid_log_id = ge::GetContext().TraceId(); GELOGI("The first FP operator is %s, idx %u, job_id %lu", op_desc->GetName().c_str(), node_index, jobid_log_id); @@ -913,22 +931,40 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const task_def_list.emplace_back(fp_task_def); } - for (size_t i = 0; i < all_reduce_nodes.size(); i++) { - if (all_reduce_nodes[i] != node_index) { - continue; + bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE); + uint64_t all_reduce_task_idx = 0; + bool is_insert_all_reduce_task = false; + if (is_all_reduce && is_insert_bp_profiling_task) { + all_reduce_task_idx = all_reduce_node_idx; + is_insert_all_reduce_task = true; + } + if (is_all_reduce) { + all_reduce_node_idx++; + } + if (!is_insert_all_reduce_task) { + for (size_t i = 0; i < all_reduce_nodes.size(); i++) { + if (all_reduce_nodes[i] == node_index) { + all_reduce_task_idx = i; + is_insert_all_reduce_task = true; + break; + } } + } + + if (is_insert_all_reduce_task) { GELOGI("The start allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index); TaskDef ar_task_def; ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); ar_task_def.set_stream_id(op_desc->GetStreamId()); LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp(); if (ar_log_def != nullptr) { - GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep), + GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep), GELOGE(FAILED, "Multiply result is out of range."); return FAILED); - auto log_id = i * kProfilingArStep + kProfilingArStartLogid; + auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArStartLogid; ar_log_def->set_logid(log_id); ar_log_def->set_notify(false); + (void)ge::AttrUtils::SetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, log_id); } task_def_list.push_back(ar_task_def); } @@ -937,16 +973,27 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point, vector &all_reduce_nodes, uint32_t node_index, - vector &task_def_list) { + vector &task_def_list, uint64_t all_reduce_node_idx) { GE_CHECK_NOTNULL(op_desc); const char *profiling_mode = std::getenv(kProfilingMode); bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() || ProfilingManager::Instance().ProfilingTrainingTraceOn(); - if (!is_profiling || (profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) || - (profiling_point.end_index.empty())) { + bool is_insert_bp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task); + bool is_insert_end_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_END_PROFILILNG_TASK, is_insert_end_profiling_task); + bool no_insert_profiling_task = ((profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) || + (profiling_point.end_index.empty())) && + (!(is_insert_bp_profiling_task || is_insert_end_profiling_task)); + if (!is_profiling || no_insert_profiling_task) { return SUCCESS; } - if (profiling_point.bp_index == node_index) { + GELOGD("Insert bp profiling task: %d, insert end profiling task: %d, fp index: %u, bp index: %u, end index size: %zu", + is_insert_bp_profiling_task, is_insert_end_profiling_task, profiling_point.fp_index, profiling_point.bp_index, + profiling_point.end_index.size() ); + + bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE); + if ((profiling_point.bp_index == node_index) || (!is_all_reduce && is_insert_bp_profiling_task)) { GELOGI("The last BP operator is %s, idx %u", op_desc->GetName().c_str(), node_index); TaskDef bp_task_def; bp_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); @@ -957,7 +1004,9 @@ Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const P bp_log_def->set_notify(false); task_def_list.emplace_back(bp_task_def); } - if (profiling_point.end_index.find(node_index) != profiling_point.end_index.end()) { + + if (profiling_point.end_index.find(node_index) != profiling_point.end_index.end() || + is_insert_end_profiling_task) { GELOGI("The iteration end operator is %s, idx %u", op_desc->GetName().c_str(), node_index); TaskDef end_task_def; end_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); @@ -969,20 +1018,32 @@ Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const P task_def_list.emplace_back(end_task_def); } + uint32_t all_reduce_task_idx = 0; + bool is_insert_all_reduce_task = false; + if (is_all_reduce && is_insert_bp_profiling_task) { + all_reduce_task_idx = all_reduce_node_idx; + is_insert_all_reduce_task = true; + } + for (size_t i = 0; i < all_reduce_nodes.size(); i++) { - if (all_reduce_nodes[i] != node_index) { - continue; + if (all_reduce_nodes[i] == node_index) { + all_reduce_task_idx = i; + is_insert_all_reduce_task = true; + break; } + } + + if (is_insert_all_reduce_task) { GELOGI("The end allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index); TaskDef ar_task_def; ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); ar_task_def.set_stream_id(op_desc->GetStreamId()); LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp(); GE_CHECK_NOTNULL(ar_log_def); - GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep), + GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep), GELOGE(FAILED, "Multiply result is out of range."); return FAILED); - auto log_id = i * kProfilingArStep + kProfilingArEndLogid; + auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArEndLogid; ar_log_def->set_logid(log_id); ar_log_def->set_notify(false); task_def_list.emplace_back(ar_task_def); diff --git a/ge/graph/build/task_generator.h b/ge/graph/build/task_generator.h index c93b2007..5970954c 100755 --- a/ge/graph/build/task_generator.h +++ b/ge/graph/build/task_generator.h @@ -51,6 +51,7 @@ struct FusionTaskInfo { std::map &op_name_map; ProfilingPoint &profiling_point; vector all_reduce_nodes; + uint64_t all_reduce_node_idx; }; class TaskGenerator { @@ -76,6 +77,8 @@ class TaskGenerator { /// Status GetTaskInfo(Model &model, ComputeGraphPtr &graph, uint64_t session_id, RunContext &run_context); + Status FindProfilingNodeIndex(const ComputeGraphPtr &graph, ProfilingPoint &profiling_point, + std::vector &all_reduce_nodes); private: Status UpdateAnchorStatus(const NodePtr &node); @@ -126,10 +129,10 @@ class TaskGenerator { std::vector &all_reduce_nodes) const; Status InsertProfilingTaskBefore(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point, std::vector &all_reduce_nodes, uint32_t node_index, - std::vector &task_def_list); + std::vector &task_def_list, uint64_t &all_reduce_node_idx); Status InsertProfilingTaskAfter(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point, std::vector &all_reduce_nodes, uint32_t node_index, - std::vector &task_def_list); + std::vector &task_def_list, uint64_t all_reduce_node_idx); static bool IsProfPoint(const OpDescPtr &op, const std::string &name); diff --git a/ge/graph/load/new_model_manager/davinci_model.cc b/ge/graph/load/new_model_manager/davinci_model.cc index 1e8192a5..78f6f8bf 100755 --- a/ge/graph/load/new_model_manager/davinci_model.cc +++ b/ge/graph/load/new_model_manager/davinci_model.cc @@ -3102,6 +3102,8 @@ Status DavinciModel::DistributeTask() { task_desc_info.stream_id = task->GetStreamId(); task_desc_info.shape_type = "static"; task_desc_info.cur_iter_num = 0; + profiler_report_op_info_[task_desc_info.op_name] = + std::pair(task_desc_info.task_id, task_desc_info.stream_id); task_desc_info_.emplace_back(task_desc_info); if (flag) { if (task->GetSktTaskID() != 0xFFFFFFFF) { @@ -3109,6 +3111,8 @@ Status DavinciModel::DistributeTask() { string op_name = "super_kernel_" + to_string(task_index); task_desc_info.op_name = op_name; task_desc_info.task_id = task->GetSktTaskID(); + profiler_report_op_info_[task_desc_info.op_name] = + std::pair(task_desc_info.task_id, task_desc_info.stream_id); task_desc_info_.emplace_back(task_desc_info); } } @@ -3980,7 +3984,15 @@ Status DavinciModel::GetComputeGraphInfo(vector &graph_des compute_graph_info.output_format = op_desc.output_format; compute_graph_info.output_shape = op_desc.output_shape; compute_graph_info.output_data_type = op_desc.output_data_type; - + uint32_t task_id = 0; + uint32_t stream_id = 0; + auto iter = profiler_report_op_info_.find(op_desc.op_name); + if (iter != profiler_report_op_info_.end()) { + task_id = iter->second.first; + stream_id = iter->second.second; + } + compute_graph_info.task_id = task_id; + compute_graph_info.stream_id = stream_id; graph_desc_info.emplace_back(compute_graph_info); } return SUCCESS; diff --git a/ge/graph/load/new_model_manager/davinci_model.h b/ge/graph/load/new_model_manager/davinci_model.h index b5f546f1..4d5d2252 100755 --- a/ge/graph/load/new_model_manager/davinci_model.h +++ b/ge/graph/load/new_model_manager/davinci_model.h @@ -962,6 +962,8 @@ class DavinciModel { // for profiling task and graph info vector task_desc_info_; + std::map> profiler_report_op_info_; + int64_t maxDumpOpNum_; // for data dump DataDumper data_dumper_; diff --git a/ge/hybrid/executor/worker/execution_engine.cc b/ge/hybrid/executor/worker/execution_engine.cc index 21dd8e4b..e9c6ef29 100755 --- a/ge/hybrid/executor/worker/execution_engine.cc +++ b/ge/hybrid/executor/worker/execution_engine.cc @@ -221,6 +221,8 @@ Status NodeDoneCallback::GetGraphDescInfo(const NodePtr node, const HybridModel tmp_compute_graph_info.output_shape.emplace_back(output_desc.GetShape().GetDims()); tmp_compute_graph_info.output_data_type.emplace_back(output_desc.GetDataType()); } + tmp_compute_graph_info.task_id = context_->GetTaskId(); + tmp_compute_graph_info.stream_id = context_->GetStreamId(); compute_graph_info.emplace_back(tmp_compute_graph_info); GELOGD("GetComputeGraphInfo of node [%s] end.", node->GetName().c_str()); } diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index 46c9c39b..32fc495a 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -35,11 +35,22 @@ namespace ge { namespace hybrid { +using domi::LogTimeStampDef; +using domi::TaskDef; namespace { const uint32_t kSubgraphIndex = 0U; const uint32_t kVarOutputIndex = 0U; +const uint64_t kProfilingFpStartLogid = 1U; +const uint64_t kProfilingBpEndLogid = 2U; +const uint64_t kProfilingIterEndLogid = 65535U; const int kBytes = 8; const char *const kOwnerGraphIsUnknown = "OwnerGraphIsUnknown"; +const char *const kProfilingGraph = "ProfilingGraph"; +const char *const kProfilingFpNode = "ProfilingFpNode"; +const char *const kProfilingBpNode = "ProfilingBpNode"; +const char *const kProfilingEndNode = "ProfilingEndNode"; +const char *const kProfilingArNode = "ProfilingAllReduceNode"; +const char *const kEngineNameRts = "DNN_VM_RTS_OP_STORE"; Status SetOutputNameAttr(ComputeGraph &graph) { vector output_names; @@ -1531,6 +1542,188 @@ Status HybridModelBuilder::RecoverGraphUnknownFlag() { return SUCCESS; } +Status HybridModelBuilder::GenerateFpProfilingTask(const OpDescPtr &op_desc, vector &task_def_list) { + uint64_t jobid_log_id = ge::GetContext().TraceId(); + GELOGD("The first FP operator is %s,, job_id %lu", op_desc->GetName().c_str(), jobid_log_id); + + TaskDef job_task_def; + job_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); + job_task_def.set_stream_id(op_desc->GetStreamId()); + LogTimeStampDef *job_log_def = job_task_def.mutable_log_timestamp(); + if (job_log_def != nullptr) { + job_log_def->set_logid(jobid_log_id); + job_log_def->set_notify(false); + } + task_def_list.emplace_back(job_task_def); + TaskDef fp_task_def; + fp_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); + fp_task_def.set_stream_id(op_desc->GetStreamId()); + LogTimeStampDef *fp_log_def = fp_task_def.mutable_log_timestamp(); + if (fp_log_def != nullptr) { + fp_log_def->set_logid(kProfilingFpStartLogid); + fp_log_def->set_notify(false); + } + task_def_list.emplace_back(fp_task_def); + + return SUCCESS; +} + +Status HybridModelBuilder::GenerateArProfilingTask(const OpDescPtr &op_desc, int64_t log_id, + vector &task_def_list) { + TaskDef ar_task_def; + ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); + ar_task_def.set_stream_id(op_desc->GetStreamId()); + LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp(); + if (ar_log_def != nullptr) { + ar_log_def->set_logid(log_id); + ar_log_def->set_notify(false); + } + task_def_list.emplace_back(ar_task_def); + + return SUCCESS; +} + +Status HybridModelBuilder::GenerateBpProfilingTask(const OpDescPtr &op_desc, vector &task_def_list) { + TaskDef bp_task_def; + bp_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); + bp_task_def.set_stream_id(op_desc->GetStreamId()); + LogTimeStampDef *bp_log_def = bp_task_def.mutable_log_timestamp(); + GE_CHECK_NOTNULL(bp_log_def); + bp_log_def->set_logid(kProfilingBpEndLogid); + bp_log_def->set_notify(false); + task_def_list.emplace_back(bp_task_def); + + return SUCCESS; +} + +Status HybridModelBuilder::GenerateEndProfilingTask(const OpDescPtr &op_desc, vector &task_def_list) { + TaskDef end_task_def; + end_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE); + end_task_def.set_stream_id(op_desc->GetStreamId()); + LogTimeStampDef *end_log_def = end_task_def.mutable_log_timestamp(); + GE_CHECK_NOTNULL(end_log_def); + end_log_def->set_logid(kProfilingIterEndLogid); + end_log_def->set_notify(true); + task_def_list.emplace_back(end_task_def); + + return SUCCESS; +} + +Status HybridModelBuilder::CreateProfilingNodeBefore(GraphItem &graph_item, const NodePtr &node) { + GE_CHECK_NOTNULL(node); + const OpDescPtr &op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + const auto &compute_graph = MakeShared(kProfilingGraph); + GE_CHECK_NOTNULL(compute_graph); + + NodePtr node_ptr = nullptr; + vector task_def_list; + // create fp node + bool is_insert_fp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_FP_PROFILILNG_TASK, is_insert_fp_profiling_task); + if (is_insert_fp_profiling_task) { + (void)GenerateFpProfilingTask(op_desc, task_def_list); + auto fp_desc = MakeShared(kProfilingFpNode, PROFILINGTRAININGTRACE); + GE_CHECK_NOTNULL(fp_desc); + fp_desc->SetOpKernelLibName(kEngineNameRts); + node_ptr = compute_graph->AddNode(fp_desc); + GELOGD("Create fp profiling node success before."); + } + // creat all reduce start node + bool is_insert_bp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task); + bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE); + if (is_all_reduce && is_insert_bp_profiling_task) { + int64_t log_id = 0; + (void)ge::AttrUtils::GetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, log_id); + GELOGD("All reduce node profiling task log id: %ld before", log_id); + (void) GenerateArProfilingTask(op_desc, log_id, task_def_list); + string op_name = string(kProfilingArNode) + std::to_string(log_id); + auto ar_desc_start = MakeShared(op_name, PROFILINGTRAININGTRACE); + GE_CHECK_NOTNULL(ar_desc_start); + ar_desc_start->SetOpKernelLibName(kEngineNameRts); + node_ptr = compute_graph->AddNode(ar_desc_start); + GELOGD("Create all reduce start profiling node success before."); + } + + if (node_ptr != nullptr) { + for (const auto &task_def : task_def_list) { + hybrid_model_.task_defs_[node_ptr].emplace_back(task_def); + } + NodeItem *node_item = nullptr; + GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(node_ptr, &node_item)); + node_item->input_start = 0; + node_item->output_start = 0; + graph_item.node_items_.emplace_back(node_item); + } else { + GELOGD("No need to create profiling node before."); + } + + return SUCCESS; +} + +Status HybridModelBuilder::CreateProfilingNodeAfter(GraphItem &graph_item, const NodePtr &node) { + GE_CHECK_NOTNULL(node); + const OpDescPtr &op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + const auto &compute_graph = MakeShared(kProfilingGraph); + GE_CHECK_NOTNULL(compute_graph); + + NodePtr node_ptr = nullptr; + vector task_def_list; + // Create all reduce end node + bool is_insert_bp_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task); + bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE); + if (is_all_reduce && is_insert_bp_profiling_task) { + int64_t log_id = 0; + (void)ge::AttrUtils::GetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, log_id); + GELOGD("All reduce node profiling task log id: %ld after", log_id); + (void) GenerateArProfilingTask(op_desc, log_id + 1, task_def_list); + string op_name = string(kProfilingArNode) + std::to_string(log_id + 1); + auto ar_desc_end = MakeShared(op_name, PROFILINGTRAININGTRACE); + GE_CHECK_NOTNULL(ar_desc_end); + ar_desc_end->SetOpKernelLibName(kEngineNameRts); + node_ptr = compute_graph->AddNode(ar_desc_end); + GELOGD("Create all reduce end profiling node success after."); + } + // create bp node + if (!is_all_reduce && is_insert_bp_profiling_task) { + (void) GenerateBpProfilingTask(op_desc, task_def_list); + auto bp_op_desc = MakeShared(kProfilingBpNode, PROFILINGTRAININGTRACE); + GE_CHECK_NOTNULL(bp_op_desc); + bp_op_desc->SetOpKernelLibName(kEngineNameRts); + node_ptr = compute_graph->AddNode(bp_op_desc); + GELOGD("Create bp profiling node success after."); + } + // create end node + bool is_insert_end_profiling_task = false; + (void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_END_PROFILILNG_TASK, is_insert_end_profiling_task); + if (is_insert_end_profiling_task) { + (void)GenerateEndProfilingTask(op_desc, task_def_list); + auto end_desc = MakeShared(kProfilingEndNode, PROFILINGTRAININGTRACE); + GE_CHECK_NOTNULL(end_desc); + end_desc->SetOpKernelLibName(kEngineNameRts); + node_ptr = compute_graph->AddNode(end_desc); + GELOGD("Create end profiling node success after."); + } + + if (node_ptr != nullptr) { + for (const auto &task_def : task_def_list) { + hybrid_model_.task_defs_[node_ptr].emplace_back(task_def); + } + NodeItem *node_item = nullptr; + GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(node_ptr, &node_item)); + node_item->input_start = 0; + node_item->output_start = 0; + graph_item.node_items_.emplace_back(node_item); + } else { + GELOGD("No need to create profiling node after."); + } + + return SUCCESS; +} + Status HybridModelBuilder::LoadDynamicSubgraph(ComputeGraph &graph, bool is_root_graph) { GELOGD("Start to load subgraph [%s]", graph.GetName().c_str()); // for known partitioned call, load all nodes @@ -1567,8 +1760,9 @@ Status HybridModelBuilder::LoadDynamicSubgraph(ComputeGraph &graph, bool is_root graph_item->output_node_ = node_item; GE_CHK_STATUS_RET_NOLOG(BuildOutputMapping(*graph_item, *node_item, is_root_graph)); } - + GE_CHK_STATUS_RET_NOLOG(CreateProfilingNodeBefore(*graph_item, node)); graph_item->node_items_.emplace_back(node_item); + GE_CHK_STATUS_RET_NOLOG(CreateProfilingNodeAfter(*graph_item, node)); // parse var outputs GE_CHK_STATUS_RET_NOLOG(ParseVarOutputs(*node_item)); GELOGD("NodeItem created: %s", node_item->DebugString().c_str()); diff --git a/ge/hybrid/model/hybrid_model_builder.h b/ge/hybrid/model/hybrid_model_builder.h index a11faae2..55a19b6c 100644 --- a/ge/hybrid/model/hybrid_model_builder.h +++ b/ge/hybrid/model/hybrid_model_builder.h @@ -79,6 +79,12 @@ class HybridModelBuilder { Status LoadKnownShapedSubgraph(ComputeGraph &graph, NodeItem *parent_node_item); Status RecoverGraphUnknownFlag(); Status CheckAicpuOpList(); + Status CreateProfilingNodeBefore(GraphItem &graph_item, const NodePtr &node); + Status CreateProfilingNodeAfter(GraphItem &graph_item, const NodePtr &node); + Status GenerateFpProfilingTask(const OpDescPtr &op_desc, vector &task_def_list); + Status GenerateBpProfilingTask(const OpDescPtr &op_desc, vector &task_def_list); + Status GenerateEndProfilingTask(const OpDescPtr &op_desc, vector &task_def_list); + Status GenerateArProfilingTask(const OpDescPtr &op_desc, int64_t log_id, vector &task_def_list); const char* GetGraphName() const { return hybrid_model_.model_name_.c_str(); diff --git a/ge/hybrid/node_executor/rts/rts_node_executor.cc b/ge/hybrid/node_executor/rts/rts_node_executor.cc index 18b875fd..90b623e0 100644 --- a/ge/hybrid/node_executor/rts/rts_node_executor.cc +++ b/ge/hybrid/node_executor/rts/rts_node_executor.cc @@ -18,6 +18,7 @@ #include "common/debug/log.h" #include "common/ge/ge_util.h" #include "graph/utils/tensor_utils.h" +#include "hybrid/model/hybrid_model.h" #include "runtime/rt.h" namespace ge { @@ -79,12 +80,44 @@ Status IdentityNNodeTask::ExecuteAsync(TaskContext &context, std::function done_callback) { + for (const auto &task_def : task_defs_) { + auto log_time_stamp_def = task_def.log_timestamp(); + uint64_t log_id = log_time_stamp_def.logid(); + bool notify = log_time_stamp_def.notify(); + uint32_t flat = log_time_stamp_def.flat(); + + GELOGD("ProfilingTraceTask execute async start. logid = %lu, notify = %d.", log_id, notify); + rtError_t rt_ret = rtProfilerTrace(log_id, notify, flat, context.GetStream()); + if (rt_ret != RT_ERROR_NONE) { + GELOGE(RT_FAILED, "Call rt api failed, ret: 0x%X", rt_ret); + return RT_ERROR_TO_GE_STATUS(rt_ret); + } + GELOGD("[%s] ProfilingTraceTask[%lu] execute success.", context.GetNodeName(), log_id); + } + + return SUCCESS; +}; + Status RtsNodeExecutor::LoadTask(const HybridModel &model, const NodePtr &node, shared_ptr &task) const { + GE_CHECK_NOTNULL(node); + auto op_type = node->GetType(); if (op_type == IDENTITY) { task = MakeShared(); } else if (op_type == IDENTITYN) { task = MakeShared(); + } else if (op_type == PROFILINGTRAININGTRACE) { + auto *task_defs = model.GetTaskDefs(node); + if (task_defs == nullptr || task_defs->empty()) { + GELOGE(INTERNAL_ERROR, "Profiling node has no task to execute."); + return INTERNAL_ERROR; + } + task = MakeShared(*task_defs); } else { GELOGE(INTERNAL_ERROR, "[%s] Unsupported RTS op type: %s", node->GetName().c_str(), op_type.c_str()); return INTERNAL_ERROR; diff --git a/ge/hybrid/node_executor/rts/rts_node_executor.h b/ge/hybrid/node_executor/rts/rts_node_executor.h index 2576b73b..df487d6c 100644 --- a/ge/hybrid/node_executor/rts/rts_node_executor.h +++ b/ge/hybrid/node_executor/rts/rts_node_executor.h @@ -18,6 +18,7 @@ #define GE_HYBRID_NODE_EXECUTOR_RTS_RTS_NODE_EXECUTOR_H_ #include "hybrid/node_executor/node_executor.h" +#include "proto/task.pb.h" namespace ge { namespace hybrid { @@ -35,6 +36,18 @@ class IdentityNNodeTask : public IdentityNodeTask { Status ExecuteAsync(TaskContext &context, std::function done_callback) override; }; +class ProfilingTraceNodeTask : public NodeTask { + public: + explicit ProfilingTraceNodeTask(const std::vector &task_defs) : task_defs_(task_defs) {} + ~ProfilingTraceNodeTask() override = default; + + Status UpdateArgs(TaskContext &context) override; + Status ExecuteAsync(TaskContext &context, std::function done_callback) override; + + private: + std::vector task_defs_; +}; + class RtsNodeExecutor : public NodeExecutor { public: Status LoadTask(const HybridModel &model, const NodePtr &node, shared_ptr &task) const override; diff --git a/ge/hybrid/node_executor/task_context.h b/ge/hybrid/node_executor/task_context.h index 0e85a8e3..8ba4fb90 100644 --- a/ge/hybrid/node_executor/task_context.h +++ b/ge/hybrid/node_executor/task_context.h @@ -123,7 +123,7 @@ class TaskContext { Status status_ = SUCCESS; std::vector workspaces_; uint64_t iteration_ = 0; - uint32_t task_id_= 0; + uint32_t task_id_ = 0; uint32_t stream_id_ = 0; }; } // namespace hybrid diff --git a/inc/framework/common/ge_types.h b/inc/framework/common/ge_types.h index 4267aec4..685e03fd 100644 --- a/inc/framework/common/ge_types.h +++ b/inc/framework/common/ge_types.h @@ -263,6 +263,8 @@ struct ComputeGraphDescInfo { std::vector output_format; std::vector> output_shape; std::vector output_data_type; + uint32_t task_id; + uint32_t stream_id; }; struct OpDescInfo { diff --git a/inc/framework/common/types.h b/inc/framework/common/types.h index 99c2ea03..e3baa816 100644 --- a/inc/framework/common/types.h +++ b/inc/framework/common/types.h @@ -529,6 +529,9 @@ REGISTER_OPTYPE_DECLARE(HVDWAIT, "HorovodWait"); // aicpu op for online_infer dynamic_dims REGISTER_OPTYPE_DECLARE(GETDYNAMICDIMS, "GetDynamicDims"); +// profiling training trace node +REGISTER_OPTYPE_DECLARE(PROFILINGTRAININGTRACE, "ProfilingTrainingTrace"); + enum InputMode { INPUT = 0, CONST_INPUT }; // Definition of the processing status enum of the process module