|
|
|
@ -274,6 +274,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
};
|
|
|
|
|
GE_MAKE_GUARD(release, callback);
|
|
|
|
|
|
|
|
|
|
uint64_t all_reduce_node_idx = 0;
|
|
|
|
|
for (auto &node : graph->GetNodes(graph->GetGraphUnknownFlag())) {
|
|
|
|
|
OpDescPtr op_desc = node->GetOpDesc();
|
|
|
|
|
GE_CHECK_NOTNULL(op_desc);
|
|
|
|
@ -292,7 +293,7 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
// Part2: Call
|
|
|
|
|
auto fusion_task_info =
|
|
|
|
|
FusionTaskInfo{run_context, graph, node, op_desc, node_index, ge_lib,
|
|
|
|
|
ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes};
|
|
|
|
|
ops_kernel_manager, task_def_list, op_name_map, profiling_point, all_reduce_nodes, all_reduce_node_idx};
|
|
|
|
|
GE_CHK_STATUS_RET(GenerateTaskForFusionNode(fusion_task_info, fusion_nodes, fusion_nodes_seen),
|
|
|
|
|
"Call GenerateTaskForFusionNode node:%s(%s) failed", name.c_str(), type.c_str());
|
|
|
|
|
// continue directly
|
|
|
|
@ -316,7 +317,8 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
type.c_str());
|
|
|
|
|
// Profiling task
|
|
|
|
|
size_t task_list_size_before = task_def_list.size();
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_node_idx));
|
|
|
|
|
int64_t op_id = op_desc->GetId();
|
|
|
|
|
// Compatible with dynamic shape scenes, the default is 0
|
|
|
|
|
int64_t stream_id = 0;
|
|
|
|
@ -336,8 +338,8 @@ Status TaskGenerator::GenerateTask(RunContext &run_context, ComputeGraphPtr &gra
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
// Profiling task
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list));
|
|
|
|
|
|
|
|
|
|
GE_CHK_STATUS_RET(InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_node_idx));
|
|
|
|
|
size_t task_list_size_after = task_def_list.size();
|
|
|
|
|
// If tasks is reduced
|
|
|
|
|
if (task_list_size_after < task_list_size_before) {
|
|
|
|
@ -380,6 +382,7 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
auto &op_name_map = fusion_task_info.op_name_map;
|
|
|
|
|
auto &profiling_point = fusion_task_info.profiling_point;
|
|
|
|
|
auto &all_reduce_nodes = fusion_task_info.all_reduce_nodes;
|
|
|
|
|
auto &all_reduce_idx = fusion_task_info.all_reduce_node_idx;
|
|
|
|
|
// If op_desc have this attr, call nodes with same group key in a stream together
|
|
|
|
|
if (ge::AttrUtils::GetInt(fusion_op_desc, ATTR_NAME_FUSION_GROUP_KEY, group_key) &&
|
|
|
|
|
(fusion_nodes_seen.count(node.get()) == 0)) {
|
|
|
|
@ -426,7 +429,8 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
return INTERNAL_ERROR;
|
|
|
|
|
}
|
|
|
|
|
// profiling task
|
|
|
|
|
(void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list);
|
|
|
|
|
(void)InsertProfilingTaskBefore(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_idx);
|
|
|
|
|
run_context.stream = run_context.graphStreamList[stream_id];
|
|
|
|
|
GELOGI("Fusion: Call %s to generate fusion_node:[fusion_node_name:%s(%s), id:%ld, stream_id:%ld] task.",
|
|
|
|
|
op_kernel_lib_name.c_str(), fusion_node_name.c_str(), fusion_node_type.c_str(), op_id, stream_id);
|
|
|
|
@ -439,7 +443,8 @@ Status TaskGenerator::GenerateTaskForFusionNode(FusionTaskInfo &fusion_task_info
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
// profiling task
|
|
|
|
|
(void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes, node_index, task_def_list);
|
|
|
|
|
(void)InsertProfilingTaskAfter(op_desc, profiling_point, all_reduce_nodes,
|
|
|
|
|
node_index, task_def_list, all_reduce_idx);
|
|
|
|
|
size_t task_list_size_after = task_def_list.size();
|
|
|
|
|
// if tasks is reduced
|
|
|
|
|
if (task_list_size_after < task_list_size_before) {
|
|
|
|
@ -830,6 +835,11 @@ Status TaskGenerator::GetFpBpIndex(const ComputeGraphPtr &graph, ProfilingPoint
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::FindProfilingNodeIndex(const ComputeGraphPtr &graph, ProfilingPoint &profiling_point,
|
|
|
|
|
std::vector<uint32_t> &all_reduce_nodes) {
|
|
|
|
|
return FindProfilingTaskIndex(graph, profiling_point, all_reduce_nodes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, ProfilingPoint &profiling_point,
|
|
|
|
|
vector<uint32_t> &all_reduce_nodes) const {
|
|
|
|
|
GE_CHECK_NOTNULL(graph);
|
|
|
|
@ -840,7 +850,6 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi
|
|
|
|
|
GELOGD("Profiling is not open.");
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
GELOGI("Start get FP/BP index.");
|
|
|
|
|
std::string fp_point_str;
|
|
|
|
|
std::string bp_point_str;
|
|
|
|
@ -878,18 +887,27 @@ Status TaskGenerator::FindProfilingTaskIndex(const ComputeGraphPtr &graph, Profi
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point,
|
|
|
|
|
vector<uint32_t> &all_reduce_nodes, uint32_t node_index,
|
|
|
|
|
vector<domi::TaskDef> &task_def_list) {
|
|
|
|
|
vector<domi::TaskDef> &task_def_list, uint64_t &all_reduce_node_idx) {
|
|
|
|
|
const char *profiling_mode = std::getenv(kProfilingMode);
|
|
|
|
|
bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() ||
|
|
|
|
|
ProfilingManager::Instance().ProfilingTrainingTraceOn();
|
|
|
|
|
if (!is_profiling || (profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) ||
|
|
|
|
|
(profiling_point.end_index.empty())) {
|
|
|
|
|
bool is_insert_fp_profiling_task = false;
|
|
|
|
|
(void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_FP_PROFILILNG_TASK, is_insert_fp_profiling_task);
|
|
|
|
|
bool is_insert_bp_profiling_task = false;
|
|
|
|
|
(void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task);
|
|
|
|
|
bool no_insert_profiling_task = ((profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) ||
|
|
|
|
|
(profiling_point.end_index.empty())) &&
|
|
|
|
|
(!(is_insert_fp_profiling_task || is_insert_bp_profiling_task));
|
|
|
|
|
if (!is_profiling || no_insert_profiling_task) {
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
if (profiling_point.fp_index == node_index) {
|
|
|
|
|
GELOGD("Insert fp profiling task: %d, insert bp profiling task: %d, fp index: %u, bp index: %u, end index size: %zu",
|
|
|
|
|
is_insert_fp_profiling_task, is_insert_bp_profiling_task, profiling_point.fp_index, profiling_point.bp_index,
|
|
|
|
|
profiling_point.end_index.size());
|
|
|
|
|
|
|
|
|
|
if ((profiling_point.fp_index == node_index) || is_insert_fp_profiling_task) {
|
|
|
|
|
uint64_t jobid_log_id = ge::GetContext().TraceId();
|
|
|
|
|
GELOGI("The first FP operator is %s, idx %u, job_id %lu", op_desc->GetName().c_str(), node_index, jobid_log_id);
|
|
|
|
|
|
|
|
|
@ -913,22 +931,40 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const
|
|
|
|
|
task_def_list.emplace_back(fp_task_def);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] != node_index) {
|
|
|
|
|
continue;
|
|
|
|
|
bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE);
|
|
|
|
|
uint64_t all_reduce_task_idx = 0;
|
|
|
|
|
bool is_insert_all_reduce_task = false;
|
|
|
|
|
if (is_all_reduce && is_insert_bp_profiling_task) {
|
|
|
|
|
all_reduce_task_idx = all_reduce_node_idx;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
}
|
|
|
|
|
if (is_all_reduce) {
|
|
|
|
|
all_reduce_node_idx++;
|
|
|
|
|
}
|
|
|
|
|
if (!is_insert_all_reduce_task) {
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] == node_index) {
|
|
|
|
|
all_reduce_task_idx = i;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is_insert_all_reduce_task) {
|
|
|
|
|
GELOGI("The start allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
TaskDef ar_task_def;
|
|
|
|
|
ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
|
ar_task_def.set_stream_id(op_desc->GetStreamId());
|
|
|
|
|
LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp();
|
|
|
|
|
if (ar_log_def != nullptr) {
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep),
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
auto log_id = i * kProfilingArStep + kProfilingArStartLogid;
|
|
|
|
|
auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArStartLogid;
|
|
|
|
|
ar_log_def->set_logid(log_id);
|
|
|
|
|
ar_log_def->set_notify(false);
|
|
|
|
|
(void)ge::AttrUtils::SetInt(op_desc, ATTR_NAME_INSERT_PROFILILNG_TASK_LOG_ID, log_id);
|
|
|
|
|
}
|
|
|
|
|
task_def_list.push_back(ar_task_def);
|
|
|
|
|
}
|
|
|
|
@ -937,16 +973,27 @@ Status TaskGenerator::InsertProfilingTaskBefore(const OpDescPtr &op_desc, const
|
|
|
|
|
|
|
|
|
|
Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const ProfilingPoint &profiling_point,
|
|
|
|
|
vector<uint32_t> &all_reduce_nodes, uint32_t node_index,
|
|
|
|
|
vector<domi::TaskDef> &task_def_list) {
|
|
|
|
|
vector<domi::TaskDef> &task_def_list, uint64_t all_reduce_node_idx) {
|
|
|
|
|
GE_CHECK_NOTNULL(op_desc);
|
|
|
|
|
const char *profiling_mode = std::getenv(kProfilingMode);
|
|
|
|
|
bool is_profiling = (profiling_mode != nullptr) || ProfilingManager::Instance().ProfilingOn() ||
|
|
|
|
|
ProfilingManager::Instance().ProfilingTrainingTraceOn();
|
|
|
|
|
if (!is_profiling || (profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) ||
|
|
|
|
|
(profiling_point.end_index.empty())) {
|
|
|
|
|
bool is_insert_bp_profiling_task = false;
|
|
|
|
|
(void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_BP_PROFILILNG_TASK, is_insert_bp_profiling_task);
|
|
|
|
|
bool is_insert_end_profiling_task = false;
|
|
|
|
|
(void)ge::AttrUtils::GetBool(op_desc, ATTR_NAME_INSERT_END_PROFILILNG_TASK, is_insert_end_profiling_task);
|
|
|
|
|
bool no_insert_profiling_task = ((profiling_point.fp_index == 0) || (profiling_point.bp_index == 0) ||
|
|
|
|
|
(profiling_point.end_index.empty())) &&
|
|
|
|
|
(!(is_insert_bp_profiling_task || is_insert_end_profiling_task));
|
|
|
|
|
if (!is_profiling || no_insert_profiling_task) {
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
if (profiling_point.bp_index == node_index) {
|
|
|
|
|
GELOGD("Insert bp profiling task: %d, insert end profiling task: %d, fp index: %u, bp index: %u, end index size: %zu",
|
|
|
|
|
is_insert_bp_profiling_task, is_insert_end_profiling_task, profiling_point.fp_index, profiling_point.bp_index,
|
|
|
|
|
profiling_point.end_index.size() );
|
|
|
|
|
|
|
|
|
|
bool is_all_reduce = (op_desc->GetType() == HCOMALLREDUCE || op_desc->GetType() == HVDCALLBACKALLREDUCE);
|
|
|
|
|
if ((profiling_point.bp_index == node_index) || (!is_all_reduce && is_insert_bp_profiling_task)) {
|
|
|
|
|
GELOGI("The last BP operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
TaskDef bp_task_def;
|
|
|
|
|
bp_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
@ -957,7 +1004,9 @@ Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const P
|
|
|
|
|
bp_log_def->set_notify(false);
|
|
|
|
|
task_def_list.emplace_back(bp_task_def);
|
|
|
|
|
}
|
|
|
|
|
if (profiling_point.end_index.find(node_index) != profiling_point.end_index.end()) {
|
|
|
|
|
|
|
|
|
|
if (profiling_point.end_index.find(node_index) != profiling_point.end_index.end() ||
|
|
|
|
|
is_insert_end_profiling_task) {
|
|
|
|
|
GELOGI("The iteration end operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
TaskDef end_task_def;
|
|
|
|
|
end_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
@ -969,20 +1018,32 @@ Status TaskGenerator::InsertProfilingTaskAfter(const OpDescPtr &op_desc, const P
|
|
|
|
|
task_def_list.emplace_back(end_task_def);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint32_t all_reduce_task_idx = 0;
|
|
|
|
|
bool is_insert_all_reduce_task = false;
|
|
|
|
|
if (is_all_reduce && is_insert_bp_profiling_task) {
|
|
|
|
|
all_reduce_task_idx = all_reduce_node_idx;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_reduce_nodes.size(); i++) {
|
|
|
|
|
if (all_reduce_nodes[i] != node_index) {
|
|
|
|
|
continue;
|
|
|
|
|
if (all_reduce_nodes[i] == node_index) {
|
|
|
|
|
all_reduce_task_idx = i;
|
|
|
|
|
is_insert_all_reduce_task = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (is_insert_all_reduce_task) {
|
|
|
|
|
GELOGI("The end allreduce operator is %s, idx %u", op_desc->GetName().c_str(), node_index);
|
|
|
|
|
TaskDef ar_task_def;
|
|
|
|
|
ar_task_def.set_type(RT_MODEL_TASK_PROFILER_TRACE);
|
|
|
|
|
ar_task_def.set_stream_id(op_desc->GetStreamId());
|
|
|
|
|
LogTimeStampDef *ar_log_def = ar_task_def.mutable_log_timestamp();
|
|
|
|
|
GE_CHECK_NOTNULL(ar_log_def);
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(i, kProfilingArStep),
|
|
|
|
|
GE_IF_BOOL_EXEC(TypeUtils::CheckUint64MulOverflow(all_reduce_task_idx, kProfilingArStep),
|
|
|
|
|
GELOGE(FAILED, "Multiply result is out of range.");
|
|
|
|
|
return FAILED);
|
|
|
|
|
auto log_id = i * kProfilingArStep + kProfilingArEndLogid;
|
|
|
|
|
auto log_id = all_reduce_task_idx * kProfilingArStep + kProfilingArEndLogid;
|
|
|
|
|
ar_log_def->set_logid(log_id);
|
|
|
|
|
ar_log_def->set_notify(false);
|
|
|
|
|
task_def_list.emplace_back(ar_task_def);
|
|
|
|
|