From bc652f3721074c30e2ab023c70940cfe37817266 Mon Sep 17 00:00:00 2001 From: isaactalx Date: Wed, 3 Feb 2021 11:15:34 +0800 Subject: [PATCH] dynamic shape support pipeline --- ge/CMakeLists.txt | 1 + ge/executor/CMakeLists.txt | 1 + ge/graph/manager/graph_manager.cc | 1 + ge/graph/optimize/mem_rw_conflict_optimize.cc | 6 + ge/graph/partition/stage_partition.cc | 2 + ge/graph/passes/subgraph_pass.cc | 1 + .../executor/hybrid_execution_context.cc | 8 + ge/hybrid/executor/hybrid_execution_context.h | 9 +- .../executor/hybrid_model_async_executor.cc | 24 +- .../executor/hybrid_model_async_executor.h | 2 + ge/hybrid/executor/hybrid_model_executor.cc | 7 +- ge/hybrid/executor/hybrid_model_executor.h | 1 + .../hybrid_model_pipeline_executor.cc | 284 ++++++++++++++++++ .../executor/hybrid_model_pipeline_executor.h | 88 ++++++ ge/hybrid/executor/hybrid_profiler.cc | 11 +- ge/hybrid/executor/node_state.cc | 79 ++++- ge/hybrid/executor/node_state.h | 18 +- ge/hybrid/executor/rt_callback_manager.cc | 11 +- ge/hybrid/executor/rt_callback_manager.h | 8 +- ge/hybrid/executor/subgraph_executor.cc | 44 ++- ge/hybrid/executor/subgraph_executor.h | 11 +- ge/hybrid/executor/worker/execution_engine.cc | 13 +- .../executor/worker/shape_inference_engine.cc | 8 +- .../executor/worker/shape_inference_engine.h | 2 +- ge/hybrid/model/graph_item.cc | 36 +++ ge/hybrid/model/graph_item.h | 7 + ge/hybrid/model/hybrid_model.cc | 4 +- ge/hybrid/model/hybrid_model_builder.cc | 89 +++--- ge/hybrid/model/hybrid_model_builder.h | 2 +- ge/hybrid/model/node_item.cc | 17 +- ge/hybrid/model/node_item.h | 1 + .../node_executor/aicore/aicore_op_task.cc | 4 + .../aicpu/aicpu_node_executor.cc | 28 +- .../node_executor/aicpu/aicpu_node_executor.h | 4 +- .../node_executor/hccl/hccl_node_executor.cc | 32 +- ge/hybrid/node_executor/task_context.cc | 21 +- ge/hybrid/node_executor/task_context.h | 9 +- 37 files changed, 742 insertions(+), 152 deletions(-) create mode 100644 ge/hybrid/executor/hybrid_model_pipeline_executor.cc create mode 100644 ge/hybrid/executor/hybrid_model_pipeline_executor.h diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 3ae51590..e7547cc9 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -351,6 +351,7 @@ set(TRAIN_SRC_LIST "hybrid/executor/node_done_manager.cc" "hybrid/executor/hybrid_profiler.cc" "hybrid/executor/hybrid_model_executor.cc" + "hybrid/executor/hybrid_model_pipeline_executor.cc" "hybrid/executor/hybrid_model_async_executor.cc" "hybrid/executor/hybrid_execution_context.cc" "hybrid/executor/subgraph_context.cc" diff --git a/ge/executor/CMakeLists.txt b/ge/executor/CMakeLists.txt index 8e56dc50..05d627de 100644 --- a/ge/executor/CMakeLists.txt +++ b/ge/executor/CMakeLists.txt @@ -81,6 +81,7 @@ set(SRC_LIST "../hybrid/executor/node_done_manager.cc" "../hybrid/executor/hybrid_profiler.cc" "../hybrid/executor/hybrid_model_executor.cc" + "../hybrid/executor/hybrid_model_pipeline_executor.cc" "../hybrid/executor/hybrid_model_async_executor.cc" "../hybrid/executor/hybrid_execution_context.cc" "../hybrid/executor/subgraph_context.cc" diff --git a/ge/graph/manager/graph_manager.cc b/ge/graph/manager/graph_manager.cc index 8b57858d..18d316c4 100755 --- a/ge/graph/manager/graph_manager.cc +++ b/ge/graph/manager/graph_manager.cc @@ -3032,6 +3032,7 @@ Status GraphManager::OptimizeSubgraph(const GraphNodePtr &graph_node, ComputeGra return FAILED; } GE_TIMESTAMP_EVENT_END(GraphPartitionDynamicShape, "OptimizeSubgraph::GraphPartitionDynamicShape"); + GE_DUMP(compute_graph, "AfterDynamicShapePartition"); GE_TIMESTAMP_START(GraphPartition); GraphPartitioner &partitioner = GetCompilerStages(graph_node->GetGraphId()).partitioner; ret = partitioner.Partition(compute_graph, GraphPartitioner::kPartitioning); diff --git a/ge/graph/optimize/mem_rw_conflict_optimize.cc b/ge/graph/optimize/mem_rw_conflict_optimize.cc index dfc6c9df..077ed110 100644 --- a/ge/graph/optimize/mem_rw_conflict_optimize.cc +++ b/ge/graph/optimize/mem_rw_conflict_optimize.cc @@ -742,6 +742,12 @@ Status GraphOptimize::HandleMemoryRWConflict(ComputeGraphPtr &compute_graph) { if (node->GetType() == NETOUTPUT && AttrUtils::HasAttr(node->GetOpDesc(), ATTR_NAME_PARENT_NODE_INDEX)) { continue; } + bool identity_reserved = false; + AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_CANNOT_BE_DELETED, identity_reserved); + if (identity_reserved) { + GELOGD("Identity [%s] need to be reserved", node->GetName().c_str()); + continue; + } if (node->GetType() == IDENTITY || node->GetType() == READVARIABLEOP) { // split identity ret = SplitIdentity(node); diff --git a/ge/graph/partition/stage_partition.cc b/ge/graph/partition/stage_partition.cc index f6e49bbd..14776200 100644 --- a/ge/graph/partition/stage_partition.cc +++ b/ge/graph/partition/stage_partition.cc @@ -52,6 +52,7 @@ Status StagePartitioner::Partition() { return SUCCESS; } + GE_DUMP(root_graph_, "BeforeStagePartition"); if (SplitStageLevel() != SUCCESS) { GELOGE(FAILED, "Split graph-stage for graph %s failed.", root_graph_->GetName().c_str()); return FAILED; @@ -74,6 +75,7 @@ Status StagePartitioner::Partition() { "maybe stage_level was not set correctly.", root_graph_->GetName().c_str()); return FAILED; } + GE_DUMP(root_graph_, "AfterStagePartition"); return SUCCESS; } diff --git a/ge/graph/passes/subgraph_pass.cc b/ge/graph/passes/subgraph_pass.cc index 3d83c301..f140644e 100755 --- a/ge/graph/passes/subgraph_pass.cc +++ b/ge/graph/passes/subgraph_pass.cc @@ -460,6 +460,7 @@ Status SubgraphPass::InsertMemcpyNode(const ComputeGraphPtr &graph, const OutDat .AddOutput("y", in_node->GetOpDesc()->GetOutputDesc(0)) .Build(); (void)AttrUtils::SetBool(op_desc, ATTR_NO_NEED_CONSTANT_FOLDING, false); + (void)AttrUtils::SetBool(op_desc, ATTR_NAME_CANNOT_BE_DELETED, true); if (GraphUtils::InsertNodeAfter(out_anchor, in_anchors, graph->AddNode(op_desc)) != GRAPH_SUCCESS) { GELOGE(FAILED, "Insert IDENTITY node %s after %s failed.", name.c_str(), in_node->GetName().c_str()); return FAILED; diff --git a/ge/hybrid/executor/hybrid_execution_context.cc b/ge/hybrid/executor/hybrid_execution_context.cc index 87207e94..50f6287c 100644 --- a/ge/hybrid/executor/hybrid_execution_context.cc +++ b/ge/hybrid/executor/hybrid_execution_context.cc @@ -15,6 +15,7 @@ */ #include "hybrid_execution_context.h" +#include namespace ge { namespace hybrid { @@ -23,7 +24,14 @@ const uint32_t kEndOfSequence = 0x0704000a; const uint32_t kEndOfSequenceNew = 507005; const int32_t kModelAbortNormal = 0x0704000e; const int32_t kModelAbortNormalNew = 507024; + +std::atomic_ulong context_id_gen {}; } // namespace + +GraphExecutionContext::GraphExecutionContext() { + context_id = context_id_gen++; +} + void GraphExecutionContext::SetErrorCode(Status error_code) { std::lock_guard lk(mu); this->status = error_code; diff --git a/ge/hybrid/executor/hybrid_execution_context.h b/ge/hybrid/executor/hybrid_execution_context.h index c398e83d..5e4e4f9a 100644 --- a/ge/hybrid/executor/hybrid_execution_context.h +++ b/ge/hybrid/executor/hybrid_execution_context.h @@ -48,11 +48,15 @@ namespace ge { namespace hybrid { struct GraphExecutionContext { + GraphExecutionContext(); + ~GraphExecutionContext() = default; + void SetErrorCode(Status error_code); Status GetStatus() const; Status Synchronize(rtStream_t rt_stream); uint64_t session_id = 0; + uint64_t context_id = 0; const HybridModel *model = nullptr; const GEThreadLocalContext *ge_context = nullptr; rtStream_t stream = nullptr; @@ -67,6 +71,8 @@ struct GraphExecutionContext { std::atomic_bool is_eos_; long profiling_level = 0; long iteration = 0; + + private: Status status = SUCCESS; mutable std::mutex mu; }; @@ -75,7 +81,8 @@ struct GraphExecutionContext { do { \ if ((context != nullptr) && (context)->profiler != nullptr) { \ if (node_name != nullptr) { \ - context->profiler->RecordEvent(evt_type, "tid:%lu [%s] [%s] " fmt, GeLog::GetTid(), node_name, category, \ + context->profiler->RecordEvent(evt_type, "tid:%lu [%s@%ld] [%s] " fmt, \ + GeLog::GetTid(), node_name, context->iteration, category, \ ##__VA_ARGS__); \ } else { \ context->profiler->RecordEvent(evt_type, "tid:%lu [%s] " fmt, GeLog::GetTid(), category, ##__VA_ARGS__); \ diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index 67c85460..a30f7daf 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -25,6 +25,7 @@ namespace ge { namespace hybrid { namespace { const int kDataOutputIndex = 0; +const size_t kMinimumPiplineStages = 2; } HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) : model_(model), run_flag_(false) { @@ -95,7 +96,17 @@ Status HybridModelAsyncExecutor::Init() { executor_ = std::unique_ptr(new(std::nothrow) HybridModelExecutor(model_, device_id_, stream_)); GE_CHECK_NOTNULL(executor_); GE_CHK_STATUS_RET(executor_->Init(), "Failed to init hybrid engine"); + + GELOGI("HybridModel stage nums:%zu", model_->GetRootGraphItem()->NumGroups()); + if (model_->GetRootGraphItem()->NumGroups() >= kMinimumPiplineStages) { + pipe_executor_ = + std::unique_ptr(new(std::nothrow) HybridModelPipelineExecutor(model_, device_id_)); + GE_CHECK_NOTNULL(pipe_executor_); + GE_CHK_STATUS_RET(pipe_executor_->Init(), "Failed to init hybrid engine"); + } + GE_CHK_STATUS_RET(InitInputDesc(), "Failed to init input tensors"); + return SUCCESS; } @@ -135,7 +146,18 @@ Status HybridModelAsyncExecutor::RunInternal() { CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC); continue, "PreRun failed."); // [No need to check value] - ret = executor_->Execute(args); + if (pipe_executor_ != nullptr) { + GELOGI("HybridModel will execute in pipeline mode"); + auto iter_per_run = std::getenv("ITER_NUM"); + if (iter_per_run) { + args.num_loops = static_cast(strtol(iter_per_run, nullptr, 10)); + } + ret = pipe_executor_->Execute(args); + } else { + GELOGI("HybridModel will execute in singleline mode"); + ge::GetContext().SetSessionId(executor_->GetContext()->session_id); + ret = executor_->Execute(args); + } ret = HandleResult(ret, current_data.index, args, data_wrapper->GetOutput()); if (ret != SUCCESS) { CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_RUNTIME, JOBSUBSTATE_GRAPH_EXEC); diff --git a/ge/hybrid/executor/hybrid_model_async_executor.h b/ge/hybrid/executor/hybrid_model_async_executor.h index a69cc45f..dec7e384 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.h +++ b/ge/hybrid/executor/hybrid_model_async_executor.h @@ -23,6 +23,7 @@ #include "external/ge/ge_api_types.h" #include "graph/load/model_manager/data_inputer.h" #include "hybrid/executor/hybrid_model_executor.h" +#include "hybrid/executor/hybrid_model_pipeline_executor.h" #include "runtime/stream.h" namespace ge { @@ -81,6 +82,7 @@ class HybridModelAsyncExecutor { std::atomic_bool run_flag_; std::unique_ptr data_inputer_; std::unique_ptr executor_; + std::unique_ptr pipe_executor_; std::future future_; uint64_t iterator_count_ = 0; diff --git a/ge/hybrid/executor/hybrid_model_executor.cc b/ge/hybrid/executor/hybrid_model_executor.cc index 9c4bb217..5d1f142e 100755 --- a/ge/hybrid/executor/hybrid_model_executor.cc +++ b/ge/hybrid/executor/hybrid_model_executor.cc @@ -81,13 +81,14 @@ Status HybridModelExecutor::ExecuteGraphInternal(SubgraphExecutor &executor, args.outputs.clear(); HYBRID_CHK_STATUS_RET(executor.GetOutputs(args.outputs, args.output_desc), "Failed to get outputs"); RECORD_MODEL_EXECUTION_EVENT(&context_, "[GetOutput] End"); + context_.iteration +=1; return SUCCESS; } Status HybridModelExecutor::Cleanup() { GELOGD("Start to cleanup."); context_.callback_manager->Destroy(); - RuntimeInferenceContext::DestroyContext(std::to_string(context_.session_id)); + RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id)); GELOGD("Cleanup successfully."); return SUCCESS; } @@ -105,7 +106,7 @@ Status HybridModelExecutor::InitExecutionContext() { GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id); context_.allocator = NpuMemoryAllocator::GetAllocator(device_id_); GE_CHECK_NOTNULL(context_.allocator); - context_.callback_manager = std::unique_ptr(new(std::nothrow)CallbackManager(stream_)); + context_.callback_manager = std::unique_ptr(new(std::nothrow)CallbackManager()); GE_CHECK_NOTNULL(context_.callback_manager); context_.dump_properties = PropertiesManager::Instance().GetDumpProperties(context_.session_id); const char *profiling_level = std::getenv(kEnvProfilingLevel); @@ -126,7 +127,7 @@ Status HybridModelExecutor::InitExecutionContext() { Status HybridModelExecutor::ResetExecutionContext(GraphExecutionContext &context) { GE_CHK_STATUS_RET_NOLOG(context.callback_manager->Init()); - string ctx_id = std::to_string(context.session_id); + string ctx_id = std::to_string(context.context_id); RuntimeInferenceContext::DestroyContext(ctx_id); GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::CreateContext(ctx_id), "Failed to Destroy RuntimeInferenceContext"); return SUCCESS; diff --git a/ge/hybrid/executor/hybrid_model_executor.h b/ge/hybrid/executor/hybrid_model_executor.h index 6b2e52b4..0b2cd1ed 100644 --- a/ge/hybrid/executor/hybrid_model_executor.h +++ b/ge/hybrid/executor/hybrid_model_executor.h @@ -32,6 +32,7 @@ class HybridModelExecutor { std::vector outputs; std::vector output_desc; bool is_eos = false; + int num_loops = 10; }; HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream); diff --git a/ge/hybrid/executor/hybrid_model_pipeline_executor.cc b/ge/hybrid/executor/hybrid_model_pipeline_executor.cc new file mode 100644 index 00000000..9de846bc --- /dev/null +++ b/ge/hybrid/executor/hybrid_model_pipeline_executor.cc @@ -0,0 +1,284 @@ +#include "hybrid_model_pipeline_executor.h" + +#include "common/math/math_util.h" +#include "graph/ge_context.h" +#include "graph/runtime_inference_context.h" + +namespace ge { +namespace hybrid { +namespace { +constexpr int kNumExecutors = 2; +const int kIntBase = 10; +const char *const kEnvProfilingLevel = "HYBRID_PROFILING_LEVEL"; +} + +StageExecutor::StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config) + : id_(id), model_(model), pipe_config_(config) {} + +StageExecutor::~StageExecutor() { GELOGD("~StageExecutor(), id = %d", id_); } + +Status StageExecutor::Init() { + GELOGD("[Executor: %d] Start to init StateExecutor", id_); + context_.rt_context = pipe_config_->rt_context; + GE_CHK_STATUS_RET_NOLOG(InitExecutionContext()); + GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT)); + context_.stream = stream_; + + root_graph_executor_.reset(new (std::nothrow) SubgraphExecutor(model_->GetRootGraphItem(), &context_)); + GE_CHECK_NOTNULL(root_graph_executor_); + + GELOGD("[Executor: %d] Init stage executor successfully", id_); + return SUCCESS; +} + +Status StageExecutor::ResetExecutionContext(GraphExecutionContext &context) { + GE_CHK_STATUS_RET_NOLOG(context.callback_manager->Init()); + string ctx_id = std::to_string(context.context_id); + RuntimeInferenceContext::DestroyContext(ctx_id); + GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::CreateContext(ctx_id), "Failed to Destroy RuntimeInferenceContext"); + return SUCCESS; +} + +Status StageExecutor::Start(const std::vector &inputs, const std::vector &input_desc, + int iteration_count) { + GELOGD("Start"); + GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context)); + int num_loops = iteration_count / pipe_config_->num_executors; + if (id_ < iteration_count % iteration_count) { + num_loops += 1; + } + FMK_INT32_MULCHECK(num_loops, pipe_config_->num_stages); + num_loops *= pipe_config_->num_stages; + GELOGD("[Executor: %d] loop count = %d", id_, num_loops); + + for (int loop_idx = 0; loop_idx < num_loops; ++loop_idx) { + GELOGD("[Executor: %d] Start to wait for task.", id_); + StageTask task_info; + task_queue_.Pop(task_info); + GELOGD("[Executor: %d] Got task, stage = %d, iteration = %ld", id_, task_info.stage, task_info.iteration); + if (task_info.iteration >= pipe_config_->iteration_end) { + GELOGE(INTERNAL_ERROR, "[Executor: %d] Unexpected iteration: %d", id_, task_info.iteration); + return INTERNAL_ERROR; + } + + if (task_info.event != nullptr) { + GELOGD("[%d] Add StreamWaitEvent", id_); + GE_CHK_RT_RET(rtStreamWaitEvent(stream_, task_info.event)); + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] End", task_info.iteration - 1, + task_info.stage); + } + + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] Start", task_info.iteration, + task_info.stage); + + if (task_info.stage == 0) { + GELOGD("[Executor: %d] To ResetExecutionContext", id_); + GE_CHK_STATUS_RET(ResetExecutionContext(context_), "[Executor: %d] Failed to reset context", id_); + context_.iteration = task_info.iteration; + GE_CHK_STATUS_RET_NOLOG(SetInputs(inputs, input_desc)); + } + + RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync Start", task_info.stage); + GE_CHK_STATUS_RET(root_graph_executor_->PartialExecuteAsync(task_info.stage)); + RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync End", task_info.stage); + GELOGD("[Executor: %d] PartialExecuteAsync successfully.", id_); + + // notify next execution unit + StageTask next_task; + next_task.stage = task_info.stage; + next_task.iteration = task_info.iteration + 1; + + auto sync_result = Synchronize(); + if (sync_result != SUCCESS) { + GELOGE(sync_result, "[Executor: %d] Failed to sync result. iteration = %d", id_, task_info.iteration); + + context_.profiler->Dump(std::cout); + context_.callback_manager->Destroy(); + RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id)); + return sync_result; + } + + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] End", task_info.iteration, task_info.stage); + + // if not end stage + if (task_info.stage >= pipe_config_->num_stages - 1) { + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] Schedule End", task_info.iteration); + GELOGD("[Executor: %d] End of iteration [%ld]", id_, task_info.iteration); + context_.callback_manager->Destroy(); + RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id)); + } + next_executor_->ExecuteAsync(next_task); + GELOGD("[Executor: %d] Push item successfully.", id_); + } + + GELOGD("[Executor: %d] Process task ended.", id_); + return SUCCESS; +} + +Status StageExecutor::ExecuteAsync(const StageTask &args) { + (void)task_queue_.Push(args); + return SUCCESS; +} + +Status StageExecutor::Synchronize() { + auto ret = root_graph_executor_->Synchronize(); + RECORD_MODEL_EXECUTION_EVENT(&context_, "[Synchronize] End, ret = %u", ret); + return ret; +} + +HybridModelPipelineExecutor::HybridModelPipelineExecutor(HybridModel *model, uint32_t device_id) + : model_(model), device_id_(device_id) { + config_.num_executors = kNumExecutors; + config_.num_stages = model_->GetRootGraphItem()->NumGroups(); + config_.device_id = device_id_; +} + +Status StageExecutor::InitExecutionContext() { + GE_CHK_RT_RET(rtCtxCreate(&context_.rt_gen_context, RT_CTX_GEN_MODE, 0)); + GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context)); + + context_.model = model_; + context_.session_id = ::ge::GetContext().SessionId(); + GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id); + context_.allocator = NpuMemoryAllocator::GetAllocator(pipe_config_->device_id); + GE_CHECK_NOTNULL(context_.allocator); + context_.callback_manager = std::unique_ptr(new (std::nothrow) CallbackManager()); + GE_CHECK_NOTNULL(context_.callback_manager); + context_.dump_properties = PropertiesManager::Instance().GetDumpProperties(context_.session_id); + if (IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) { + context_.trace_enabled = true; + } + return SUCCESS; +} + +Status StageExecutor::SetInputs(const vector &inputs, const vector &input_desc) { + root_graph_executor_->InitForPartialExecution(inputs, input_desc); + return SUCCESS; +} + +Status StageExecutor::GetOutputs(vector &outputs, vector &output_desc) { + return root_graph_executor_->GetOutputs(outputs, output_desc); +} + +void StageExecutor::Reset() { + task_queue_.Stop(); + task_queue_.Clear(); + task_queue_.Restart(); +} + +Status HybridModelPipelineExecutor::Init() { + const char *profiling_level = std::getenv(kEnvProfilingLevel); + if (profiling_level != nullptr) { + context_.profiling_level = std::strtol(profiling_level, nullptr, kIntBase); + GELOGD("Got profiling level = %ld", context_.profiling_level); + if (context_.profiling_level > 0) { + context_.profiler.reset(new (std::nothrow) HybridProfiler()); + GE_CHECK_NOTNULL(context_.profiler); + } + } + + GELOGD("Number of stages = %d, number of executors = %d", config_.num_stages, config_.num_executors); + GE_CHK_RT_RET(rtCtxGetCurrent(&config_.rt_context)); + GE_CHK_STATUS_RET_NOLOG(InitStageExecutors()); + return SUCCESS; +} + +Status HybridModelPipelineExecutor::InitStageExecutors() { + for (int i = 0; i < config_.num_executors; ++i) { + auto stage_executor = std::unique_ptr(new (std::nothrow) StageExecutor(i, model_, &config_)); + GE_CHECK_NOTNULL(stage_executor); + GE_CHK_STATUS_RET_NOLOG(stage_executor->Init()); + + if (context_.profiler != nullptr) { + // will call unique_ptr::release later + stage_executor->context_.profiler.reset(context_.profiler.get()); + stage_executor->context_.profiling_level = context_.profiling_level; + } + + stage_executors_.emplace_back(std::move(stage_executor)); + } + + // build propagation loop + for (int i = 0; i < config_.num_executors - 1; ++i) { + stage_executors_[i]->SetNext(stage_executors_[i + 1].get()); + } + stage_executors_[config_.num_executors - 1]->SetNext(stage_executors_[0].get()); + return SUCCESS; +} + +Status HybridModelPipelineExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) { + int loop_count = args.num_loops; + GE_CHECK_GE(loop_count, 2); + + auto &inputs = args.inputs; + auto &input_desc = args.input_desc; + // Start schedulers + std::vector> futures; + for (size_t i = 0; i < stage_executors_.size(); ++i) { + GELOGD("Starting executor %zu", i); + auto executor = stage_executors_[i].get(); + executor->Reset(); + auto future = std::async( + [loop_count, executor, inputs, input_desc]() { return executor->Start(inputs, input_desc, loop_count); }); + + futures.emplace_back(std::move(future)); + } + + // Push initial tasks + GELOGD("Start to execute with loops, loop count = %d", loop_count); + config_.iteration_end = iteration_ + loop_count; + for (int i = 0; i < config_.num_stages; ++i) { + StageExecutor::StageTask task_info; + task_info.stage = i; + task_info.iteration = iteration_; + stage_executors_[0]->ExecuteAsync(task_info); + } + + // Wait for end of iterations + bool has_error = false; + for (size_t i = 0; i < stage_executors_.size(); ++i) { + GELOGD("Start to sync result of executor[%zu]", i); + auto ret = futures[i].get(); + if (ret != SUCCESS) { + GELOGE(ret, "[Executor: %zu] Failed to schedule tasks.", i); + has_error = true; + continue; + } + + ret = stage_executors_[i]->Synchronize(); + + if (ret != SUCCESS) { + GELOGE(ret, "[Executor: %zu] Failed to synchronize result.", i); + has_error = true; + continue; + } + } + + // record for profiling analyzer + RECORD_MODEL_EXECUTION_EVENT(&context_, "[Cleanup] End"); + + if (context_.profiler != nullptr) { + context_.profiler->Dump(std::cout); + } + + iteration_ = config_.iteration_end; + + if (has_error) { + GELOGE(FAILED, "Error occurred while execution"); + return FAILED; + } + + auto last_iter_executor_idx = loop_count % stage_executors_.size(); + GE_CHK_STATUS_RET(stage_executors_[last_iter_executor_idx]->GetOutputs(args.outputs, args.output_desc), + "Failed to get output from executor[%d]", last_iter_executor_idx); + return SUCCESS; +} + +HybridModelPipelineExecutor::~HybridModelPipelineExecutor() { + GELOGD("~HybridModelPipelineExecutor()"); + for (auto &executor : stage_executors_) { + (void)executor->context_.profiler.release(); + } +} +} // namespace hybrid +} // namespace ge diff --git a/ge/hybrid/executor/hybrid_model_pipeline_executor.h b/ge/hybrid/executor/hybrid_model_pipeline_executor.h new file mode 100644 index 00000000..3cb1fd23 --- /dev/null +++ b/ge/hybrid/executor/hybrid_model_pipeline_executor.h @@ -0,0 +1,88 @@ +#ifndef GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_ +#define GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_ + +#include "common/blocking_queue.h" +#include "common/thread_pool.h" +#include "hybrid/executor/hybrid_execution_context.h" +#include "hybrid/executor/rt_callback_manager.h" +#include "hybrid/executor/subgraph_executor.h" +#include "hybrid_model_executor.h" + +namespace ge { +namespace hybrid { + +struct PipeExecutionConfig { + uint32_t device_id; + rtContext_t rt_context; + int num_executors; + int num_stages; + long iteration_end; +}; + +class StageExecutor { + public: + struct StageTask { + rtEvent_t event = nullptr; + int stage = 0; + long iteration = 0; + }; + + StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config); + + ~StageExecutor(); + + Status Init(); + + void Reset(); + + Status Start(const std::vector &inputs, const std::vector &input_desc, + int loop_count); + + Status SetInputs(const std::vector &inputs, const std::vector &input_desc); + + Status ExecuteAsync(const StageTask &args); + + Status GetOutputs(std::vector &outputs, std::vector &output_desc); + + Status Synchronize(); + + void SetNext(StageExecutor *next_executor) { next_executor_ = next_executor; } + + private: + friend class HybridModelPipelineExecutor; + static Status ResetExecutionContext(GraphExecutionContext &context); + Status InitExecutionContext(); + + int id_; + HybridModel *model_; + + PipeExecutionConfig *pipe_config_; + BlockingQueue task_queue_; + std::unique_ptr root_graph_executor_; + GraphExecutionContext context_; + StageExecutor *next_executor_; + + rtStream_t stream_ = nullptr; +}; + +class HybridModelPipelineExecutor { + public: + HybridModelPipelineExecutor(HybridModel *model, uint32_t device_id); + ~HybridModelPipelineExecutor(); + Status Init(); + Status InitStageExecutors(); + Status Execute(HybridModelExecutor::ExecuteArgs &args); + + private: + HybridModel *model_; + uint32_t device_id_; + + std::vector> stage_executors_; + PipeExecutionConfig config_; + GraphExecutionContext context_; + long iteration_ = 0; +}; +} // namespace hybrid +} // namespace ge + +#endif // GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_ diff --git a/ge/hybrid/executor/hybrid_profiler.cc b/ge/hybrid/executor/hybrid_profiler.cc index 336a633f..3b6865bb 100644 --- a/ge/hybrid/executor/hybrid_profiler.cc +++ b/ge/hybrid/executor/hybrid_profiler.cc @@ -24,7 +24,7 @@ namespace ge { namespace hybrid { namespace { -const int kMaxEvents = 10000; +const int kMaxEvents = 1024 * 500; const int kEventDescMax = 512; const int kMaxEventTypes = 8; const int kIndent = 8; @@ -46,11 +46,14 @@ void HybridProfiler::RecordEvent(EventType event_type, const char *fmt, ...) { } va_end(args); - std::string event = buf; auto index = counter_++; + if (index >= static_cast(events_.size())) { + GELOGE(INTERNAL_ERROR, "index out of range. index = %d, max event size = %zu", index, events_.size()); + return; + } auto &evt = events_[index]; evt.timestamp = std::chrono::system_clock::now(); - evt.desc = std::move(event); + evt.desc = std::string(buf); evt.event_type = event_type; } @@ -78,7 +81,7 @@ void HybridProfiler::Dump(std::ostream &output_stream) { auto cost_dump = std::chrono::duration_cast(end_dump - start_dump).count(); output_stream << std::setw(kIndent) << elapsed_dump << "\t\t" << cost_dump << "\t\t" << "[Dump profiling]" << std::endl; - events_.clear(); + Reset(); } void HybridProfiler::Reset() { diff --git a/ge/hybrid/executor/node_state.cc b/ge/hybrid/executor/node_state.cc index 00921705..0b9dc1d4 100644 --- a/ge/hybrid/executor/node_state.cc +++ b/ge/hybrid/executor/node_state.cc @@ -34,6 +34,14 @@ ShapeInferenceState::ShapeInferenceState(const NodeItem &node_item) : node_item( GELOGD("[%s] ShapeInferenceState created, pending shape count = %d", node_item.NodeName().c_str(), this->num_pending_shapes_); + + for (int i = 0; i < node_item.num_inputs; ++i){ + input_tensor_desc.emplace_back(std::move(*node_item.MutableInputDesc(i))); + } + + for (int i = 0; i < node_item.num_outputs; ++i){ + output_tensor_desc.emplace_back(std::move(*node_item.MutableOutputDesc(i))); + } } Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target) { @@ -56,11 +64,10 @@ Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target tensor_size); std::lock_guard lk(mu_); - auto tensor_desc = node_item.MutableInputDesc(idx); - GE_CHECK_NOTNULL(tensor_desc); - tensor_desc->SetShape(target.GetShape()); - tensor_desc->SetOriginShape(target.GetOriginShape()); - (void) TensorUtils::SetSize(*tensor_desc, tensor_size); + auto &input_desc = input_tensor_desc[idx]; + input_desc.SetShape(target.GetShape()); + input_desc.SetOriginShape(target.GetOriginShape()); + (void) TensorUtils::SetSize(input_desc, tensor_size); if (--num_pending_shapes_ <= 0) { ready_cv_.notify_all(); } @@ -115,12 +122,27 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex } } + for (size_t i = 0; i < input_tensor_desc.size(); ++i) { + auto dst_tensor_desc = node_item.op_desc->MutableInputDesc(i); + if (dst_tensor_desc == nullptr) { + continue; + } + + auto &tensor_desc = input_tensor_desc[i]; + int64_t tensor_size = -1; + (void) TensorUtils::GetSize(tensor_desc, tensor_size); + + dst_tensor_desc->SetShape(tensor_desc.MutableShape()); + dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape()); + (void) TensorUtils::SetSize(*dst_tensor_desc, tensor_size); + } + for (auto &p : shape_futures) { auto idx = p.first; auto &future = p.second; RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx); - GeTensorDescPtr src_tensor_desc; - GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(src_tensor_desc)); + const GeTensorDesc* src_tensor_desc = nullptr; + GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(&src_tensor_desc)); GE_CHECK_NOTNULL(src_tensor_desc); RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx); @@ -142,10 +164,28 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex return SUCCESS; } -ShapeFuture::ShapeFuture(NodePtr src_node, +const vector &ShapeInferenceState::GetOutputTensorDesc() const { + return output_tensor_desc; +} + +Status ShapeInferenceState::UpdateOutputDesc() { + for (size_t i = 0; i < output_tensor_desc.size(); ++i) { + auto src_tensor_desc = node_item.MutableOutputDesc(i); + GE_CHECK_NOTNULL(src_tensor_desc); + auto &dst_tensor_desc = output_tensor_desc[i]; + dst_tensor_desc.SetShape(src_tensor_desc->MutableShape()); + dst_tensor_desc.SetOriginShape(src_tensor_desc->GetOriginShape()); + int64_t tensor_size = -1; + (void) TensorUtils::GetSize(*src_tensor_desc, tensor_size); + (void) TensorUtils::SetSize(dst_tensor_desc, tensor_size); + } + return SUCCESS; +} + +ShapeFuture::ShapeFuture(NodeState *src_node, uint32_t src_index, SubgraphContext *subgraph_context) - : src_node_(std::move(src_node)), src_index_(src_index), subgraph_context_(subgraph_context) { + : src_node_(src_node), src_index_(src_index), subgraph_context_(subgraph_context) { } NodeState::NodeState(const NodeItem &node_item, SubgraphContext *subgraph_context) @@ -187,6 +227,13 @@ Status NodeState::WaitForPrepareDone() { return SUCCESS; } +Status NodeState::UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape) { + auto self_tensor_desc = op_desc_->MutableOutputDesc(index); + GE_CHECK_NOTNULL(self_tensor_desc); + self_tensor_desc->SetShape(shape); + self_tensor_desc->SetOriginShape(ori_shape); + return SUCCESS; +} void NodeState::SetTaskContext(std::shared_ptr &task_context) { task_context_ = task_context; @@ -198,17 +245,19 @@ std::shared_ptr NodeState::GetTaskContext() { Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) { GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); - HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); - shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape(); - ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape(); + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled"); + auto &output_desc = src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_); + shape = output_desc.GetShape(); + ori_shape = output_desc.GetOriginShape(); GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str()); return SUCCESS; } -Status ShapeFuture::GetTensorDesc(GeTensorDescPtr &tensor_desc) { +Status ShapeFuture::GetTensorDesc(const GeTensorDesc **tensor_desc) { + GE_CHECK_NOTNULL(tensor_desc); GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); - HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); - tensor_desc = src_node_->GetOpDesc()->MutableOutputDesc(src_index_); + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled"); + *tensor_desc = &src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_); return SUCCESS; } } // namespace hybrid diff --git a/ge/hybrid/executor/node_state.h b/ge/hybrid/executor/node_state.h index c68a19ac..ef348a69 100644 --- a/ge/hybrid/executor/node_state.h +++ b/ge/hybrid/executor/node_state.h @@ -30,16 +30,17 @@ class NodeTask; struct GraphExecutionContext; class SubgraphContext; class TaskContext; +class NodeState; class ShapeFuture { public: - ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context); + ShapeFuture(NodeState *src_node, uint32_t src_index, SubgraphContext *subgraph_context); ~ShapeFuture() = default; Status Get(GeShape &ori_shape, GeShape &shape); - Status GetTensorDesc(GeTensorDescPtr &tensor_desc); + Status GetTensorDesc(const GeTensorDesc **tensor_desc); private: - NodePtr src_node_; + NodeState *src_node_; uint32_t src_index_; SubgraphContext *subgraph_context_; }; @@ -53,10 +54,19 @@ struct ShapeInferenceState { Status AwaitShapesReady(const GraphExecutionContext &context); + Status UpdateOutputDesc(); + + const vector &GetOutputTensorDesc() const; + const NodeItem &node_item; private: + friend struct NodeState; std::vector> shape_futures; + // do not directly update op_desc, in case race condition across pipelines + std::vector input_tensor_desc; + std::vector output_tensor_desc; + int num_pending_shapes_ = 0; std::condition_variable ready_cv_; std::mutex mu_; @@ -88,6 +98,8 @@ struct NodeState { return shape_inference_state_; } + Status UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape); + const shared_ptr &GetKernelTask() const { return kernel_task_; } diff --git a/ge/hybrid/executor/rt_callback_manager.cc b/ge/hybrid/executor/rt_callback_manager.cc index 0523a6a5..d3989f31 100644 --- a/ge/hybrid/executor/rt_callback_manager.cc +++ b/ge/hybrid/executor/rt_callback_manager.cc @@ -21,14 +21,11 @@ namespace ge { namespace hybrid { -CallbackManager::CallbackManager(rtStream_t stream) : stream_(stream) { -} - -Status CallbackManager::RegisterCallback(rtCallback_t callback, void *user_data) { +Status CallbackManager::RegisterCallback(rtStream_t stream, rtCallback_t callback, void *user_data) { GELOGD("To register callback"); rtEvent_t event = nullptr; GE_CHK_RT_RET(rtEventCreate(&event)); - auto rt_ret = rtEventRecord(event, stream_); + auto rt_ret = rtEventRecord(event, stream); if (rt_ret != RT_ERROR_NONE) { GELOGE(RT_FAILED, "Failed to invoke rtEventRecord, error code = %d", rt_ret); (void) rtEventDestroy(event); @@ -112,11 +109,11 @@ void CallbackManager::RtCallbackFunc(void *data) { delete callback_func; } -Status CallbackManager::RegisterCallback(const std::function &callback) { +Status CallbackManager::RegisterCallback(rtStream_t stream, const std::function &callback) { auto func = std::unique_ptr>(new(std::nothrow) std::function(callback)); GE_CHECK_NOTNULL(func); GELOGD("Callback registered"); - return RegisterCallback(RtCallbackFunc, func.release()); + return RegisterCallback(stream, RtCallbackFunc, func.release()); } } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/rt_callback_manager.h b/ge/hybrid/executor/rt_callback_manager.h index 1d1fa1cc..9c062134 100644 --- a/ge/hybrid/executor/rt_callback_manager.h +++ b/ge/hybrid/executor/rt_callback_manager.h @@ -30,23 +30,21 @@ namespace ge { namespace hybrid { class CallbackManager { public: - explicit CallbackManager(rtStream_t stream); - + CallbackManager() = default; ~CallbackManager() = default; Status Init(); Status Destroy(); - Status RegisterCallback(rtCallback_t callback, void *user_data); - Status RegisterCallback(const std::function &callback); + Status RegisterCallback(rtStream_t stream, rtCallback_t callback, void *user_data); + Status RegisterCallback(rtStream_t stream, const std::function &callback); private: Status CallbackProcess(rtContext_t context); static void RtCallbackFunc(void *data); BlockingQueue>> callback_queue_; - rtStream_t stream_; std::future ret_future_; }; } // namespace hybrid diff --git a/ge/hybrid/executor/subgraph_executor.cc b/ge/hybrid/executor/subgraph_executor.cc index 8b194233..8fbc6741 100644 --- a/ge/hybrid/executor/subgraph_executor.cc +++ b/ge/hybrid/executor/subgraph_executor.cc @@ -24,6 +24,7 @@ namespace ge { namespace hybrid { namespace { constexpr int kDefaultThreadNum = 4; +constexpr int kDefaultQueueSize = 16; constexpr int kDataInputIndex = 0; } @@ -31,7 +32,8 @@ SubgraphExecutor::SubgraphExecutor(const GraphItem *graph_item, GraphExecutionCo : graph_item_(graph_item), context_(context), force_infer_shape_(force_infer_shape), - pre_run_pool_(kDefaultThreadNum) { + pre_run_pool_(kDefaultThreadNum), + ready_queue_(kDefaultQueueSize) { } SubgraphExecutor::~SubgraphExecutor() { @@ -169,7 +171,7 @@ Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vectorSetKernelTask(node_item->kernel_task); - known_shape_task_context_ = TaskContext::Create(*node_item, context_, subgraph_context_.get()); + known_shape_task_context_ = TaskContext::Create(node_state.get(), context_, subgraph_context_.get()); GE_CHECK_NOTNULL(known_shape_task_context_); HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_), @@ -201,11 +203,11 @@ Status SubgraphExecutor::ExecuteAsync(TaskContext &task_context) { return SUCCESS; } -Status SubgraphExecutor::PrepareNodes() { - GELOGD("[%s] Start to prepare nodes. force infer shape = %s.", +Status SubgraphExecutor::PrepareNodes(int group) { + GELOGD("[%s] Start to prepare nodes. group = %d", graph_item_->GetName().c_str(), - force_infer_shape_ ? "true" : "false"); - auto &all_nodes = graph_item_->GetAllNodes(); + group); + auto &all_nodes = graph_item_->GetAllNodes(group); for (auto all_node : all_nodes) { auto &node_item = *all_node; // for while op @@ -240,7 +242,8 @@ Status SubgraphExecutor::PrepareNodes() { } else { node_state->SetKernelTask(node_item.kernel_task); } - auto unique_task_context = TaskContext::Create(*node_state->GetNodeItem(), context_, subgraph_context_.get()); + auto unique_task_context = + TaskContext::Create(node_state.get(), context_, subgraph_context_.get()); GE_CHECK_NOTNULL(unique_task_context); const auto &task = node_state->GetKernelTask(); if (task == nullptr) { @@ -265,15 +268,17 @@ Status SubgraphExecutor::PrepareNodes() { GELOGD("[%s] Push node [%s] to queue.", graph_item_->GetName().c_str(), node_item.NodeName().c_str()); } + GELOGD("[%s] Done preparing nodes successfully.", graph_item_->GetName().c_str()); return SUCCESS; } -Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) { - const auto &node_item = *node_state.GetNodeItem(); +Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) const { + GetContext().SetSessionId(context_->context_id); HYBRID_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state), - "[%s] Failed to InferShape.", node_state.GetName().c_str()); - HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item), - "[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); + "[%s] Failed to InferShape.", node_state.GetName().c_str()); + GetContext().SetSessionId(context_->session_id); + HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_state), + "[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); return SUCCESS; } @@ -285,7 +290,7 @@ Status SubgraphExecutor::PrepareForExecution(GraphExecutionContext *ctx, NodeSta } else { node_state.SetKernelTask(node_item.kernel_task); } - auto unique_task_context = TaskContext::Create(*node_state.GetNodeItem(), context_, subgraph_context_.get()); + auto unique_task_context = TaskContext::Create(&node_state, context_, subgraph_context_.get()); GE_CHECK_NOTNULL(unique_task_context); const auto &task = node_state.GetKernelTask(); if (task == nullptr) { @@ -336,11 +341,11 @@ Status SubgraphExecutor::LaunchTasks() { } } -Status SubgraphExecutor::ScheduleTasks() { +Status SubgraphExecutor::ScheduleTasks(int group) { GELOGD("[%s] Start to schedule prepare workers.", graph_item_->GetName().c_str()); auto prepare_future = std::async(std::launch::async, [&]() -> Status { GetContext().SetSessionId(context_->session_id); - auto ret = PrepareNodes(); + auto ret = PrepareNodes(group); ready_queue_.Push(nullptr); return ret; }); @@ -481,5 +486,14 @@ Status SubgraphExecutor::EnableOutputZeroCopy(const vector &outputs GELOGD("Done enabling zero copy for outputs successfully."); return SUCCESS; } + +Status SubgraphExecutor::PartialExecuteAsync(int task_group) { + return ScheduleTasks(task_group); +} + +Status SubgraphExecutor::InitForPartialExecution(const vector &inputs, + const vector &input_desc) { + return Init(inputs, input_desc); +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/subgraph_executor.h b/ge/hybrid/executor/subgraph_executor.h index 2b7e9371..b9bfceb4 100644 --- a/ge/hybrid/executor/subgraph_executor.h +++ b/ge/hybrid/executor/subgraph_executor.h @@ -36,6 +36,11 @@ class SubgraphExecutor { SubgraphExecutor(const GraphItem *graph_item, GraphExecutionContext *context, bool force_infer_shape = false); ~SubgraphExecutor(); + Status InitForPartialExecution(const std::vector &inputs, + const std::vector &input_desc); + + Status PartialExecuteAsync(int task_group); + /** * Execute subgraph async, output tensor address(not data) and output tensor descriptions are * valid after this method returned @@ -89,15 +94,15 @@ class SubgraphExecutor { private: Status PrepareForExecution(GraphExecutionContext *ctx, NodeState &node_state); Status EnableOutputZeroCopy(const std::vector &outputs); - static Status InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state); + Status InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) const; Status Init(const std::vector &inputs, const std::vector &input_desc); Status InitInputsForUnknownShape(const std::vector &inputs, const std::vector &input_desc); Status InitInputsForKnownShape(const std::vector &inputs); Status ExecuteAsyncForKnownShape(const std::vector &inputs); - Status ScheduleTasks(); - Status PrepareNodes(); + Status ScheduleTasks(int group = -1); + Status PrepareNodes(int group = -1); Status LaunchTasks(); Status SetOutputsToParentNode(TaskContext &task_context); diff --git a/ge/hybrid/executor/worker/execution_engine.cc b/ge/hybrid/executor/worker/execution_engine.cc index a6386b27..7f4fa78c 100755 --- a/ge/hybrid/executor/worker/execution_engine.cc +++ b/ge/hybrid/executor/worker/execution_engine.cc @@ -125,16 +125,16 @@ Status NodeDoneCallback::PrepareConstInputs(const NodeItem &node_item) { RT_MEMCPY_DEVICE_TO_HOST)); } tensor.SetData(std::move(host_buffer)); - string session_id = std::to_string(context_->GetSessionId()); + string context_id = std::to_string(graph_context_->context_id); RuntimeInferenceContext *runtime_infer_ctx = nullptr; - GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::GetContext(session_id, &runtime_infer_ctx), - "Failed to get RuntimeInferenceContext, session_id = %s", session_id.c_str()); + GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::GetContext(context_id, &runtime_infer_ctx), + "Failed to get RuntimeInferenceContext, context_id = %s", context_id.c_str()); GE_CHK_STATUS_RET(runtime_infer_ctx->SetTensor(node_item.node_id, output_idx, std::move(tensor)), "Failed to SetTensor, node = %s, output_index = %d", node_item.NodeName().c_str(), output_idx); - GELOGD("[%s] Output[%d] cached successfully in session: %s. node_id = %d, shape = [%s]", + GELOGD("[%s] Output[%d] cached successfully in context: %s. node_id = %d, shape = [%s]", node_item.NodeName().c_str(), output_idx, - session_id.c_str(), + context_id.c_str(), node_item.node_id, ge_tensor_desc->GetShape().ToString().c_str()); @@ -332,6 +332,7 @@ Status NodeDoneCallback::OnNodeDone() { if (node_item.shape_inference_type == DEPEND_SHAPE_RANGE || node_item.shape_inference_type == DEPEND_COMPUTE) { // update output tensor sizes GE_CHK_STATUS_RET_NOLOG(ShapeInferenceEngine::CalcOutputTensorSizes(node_item)); + GE_CHK_STATUS_RET_NOLOG(context_->GetNodeState()->GetShapeInferenceState().UpdateOutputDesc()); } // PropagateOutputs for type == DEPEND_COMPUTE if (node_item.shape_inference_type == DEPEND_COMPUTE) { @@ -363,7 +364,7 @@ Status ExecutionEngine::ExecuteAsync(NodeState &node_state, RECORD_EXECUTION_EVENT(&execution_context, task_context->GetNodeName(), "Start"); auto cb = std::shared_ptr(new(std::nothrow) NodeDoneCallback(&execution_context, task_context)); GE_CHECK_NOTNULL(cb); - auto callback = [&, cb]() { + auto callback = [task_context, cb]() { auto ret = cb->OnNodeDone(); if (ret != SUCCESS) { task_context->OnError(ret); diff --git a/ge/hybrid/executor/worker/shape_inference_engine.cc b/ge/hybrid/executor/worker/shape_inference_engine.cc index 46ee6bd6..bb6281e1 100755 --- a/ge/hybrid/executor/worker/shape_inference_engine.cc +++ b/ge/hybrid/executor/worker/shape_inference_engine.cc @@ -109,7 +109,8 @@ Status ShapeInferenceEngine::AwaitDependentNodes(NodeState &node_state) { return SUCCESS; } -Status ShapeInferenceEngine::PropagateOutputShapes(const NodeItem &node_item) { +Status ShapeInferenceEngine::PropagateOutputShapes(NodeState &node_state) { + auto &node_item = *node_state.GetNodeItem(); if (node_item.is_output_shape_static) { return SUCCESS; } @@ -140,9 +141,8 @@ Status ShapeInferenceEngine::PropagateOutputShapes(const NodeItem &node_item) { // in case type 3 and 4, shape will be valid after computing is done auto &infer_state = dst_node_state->GetShapeInferenceState(); if (shape_is_future) { - ShapeFuture future(node_item.node, i, subgraph_context_); - infer_state.UpdateInputShapeFuture(dst_input_index_and_node.first, - std::move(future)); + ShapeFuture future(&node_state, i, subgraph_context_); + infer_state.UpdateInputShapeFuture(dst_input_index_and_node.first, std::move(future)); } else { GE_CHK_STATUS_RET_NOLOG(infer_state.UpdateInputShape(dst_input_index_and_node.first, *output_desc)); } diff --git a/ge/hybrid/executor/worker/shape_inference_engine.h b/ge/hybrid/executor/worker/shape_inference_engine.h index b946577f..39311c9e 100644 --- a/ge/hybrid/executor/worker/shape_inference_engine.h +++ b/ge/hybrid/executor/worker/shape_inference_engine.h @@ -32,7 +32,7 @@ class ShapeInferenceEngine { Status InferShapeForSubgraph(const NodeItem &node_item, const FusedSubgraph &fused_subgraph); - Status PropagateOutputShapes(const NodeItem &node_item); + Status PropagateOutputShapes(NodeState &node_state); static Status CalcOutputTensorSizes(const NodeItem &node_item, bool fallback_with_range = false); diff --git a/ge/hybrid/model/graph_item.cc b/ge/hybrid/model/graph_item.cc index 4e3faf70..91f675a4 100644 --- a/ge/hybrid/model/graph_item.cc +++ b/ge/hybrid/model/graph_item.cc @@ -30,6 +30,19 @@ const vector &hybrid::GraphItem::GetAllNodes() const { return node_items_; } +const vector &GraphItem::GetAllNodes(int group) const { + if (group == -1) { + return GetAllNodes(); + } + + if (group >= static_cast(grouped_node_items_.size())) { + static vector empty_nodes; + return empty_nodes; + } + + return grouped_node_items_[group]; +} + const vector &GraphItem::GetInputNodes() const { return input_nodes_; } @@ -74,5 +87,28 @@ const NodeItem *GraphItem::GetOutputNode() const { const vector> &GraphItem::GetOutputEdges() const { return output_edges_; } + +Status GraphItem::GroupNodes() { + int last_group = INT32_MIN; + std::set seen_groups; + for (auto node : node_items_) { + int group = node->group; + if (group != last_group) { + if (seen_groups.find(group) != seen_groups.end()) { + GELOGE(INTERNAL_ERROR, "Unordered node group found. node = %s, group = %d", node->NodeName().c_str(), group); + return INTERNAL_ERROR; + } else { + last_group = group; + seen_groups.insert(group); + grouped_node_items_.emplace_back(std::vector()); + } + } + + GELOGD("Adding node [%s] to group %d", node->NodeName().c_str(), group); + grouped_node_items_.back().emplace_back(node); + } + + return SUCCESS; +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/model/graph_item.h b/ge/hybrid/model/graph_item.h index 6fab9b44..f034f385 100644 --- a/ge/hybrid/model/graph_item.h +++ b/ge/hybrid/model/graph_item.h @@ -26,7 +26,9 @@ class GraphItem { public: GraphItem() = default; ~GraphItem(); + Status GroupNodes(); const vector &GetAllNodes() const; + const vector &GetAllNodes(int group) const; const vector &GetInputNodes() const; Status GetOutputDescList(std::vector &output_desc_list) const; const vector> &GetOutputEdges() const; @@ -46,6 +48,10 @@ class GraphItem { name_ = name; } + size_t NumGroups() const { + return grouped_node_items_.size(); + } + const NodeItem *GetOutputNode() const; bool IsDynamic() const; @@ -56,6 +62,7 @@ class GraphItem { friend class HybridModelBuilder; std::string name_; std::vector node_items_; + std::vector> grouped_node_items_; std::vector input_nodes_; const NodeItem *output_node_ = nullptr; // diff --git a/ge/hybrid/model/hybrid_model.cc b/ge/hybrid/model/hybrid_model.cc index 4511c2b9..77c9be2b 100644 --- a/ge/hybrid/model/hybrid_model.cc +++ b/ge/hybrid/model/hybrid_model.cc @@ -52,7 +52,7 @@ Status HybridModel::Init(bool is_single_op) { return SUCCESS; } -TensorValue* HybridModel::GetVariable(const string &name) const { +TensorValue *HybridModel::GetVariable(const string &name) const { auto it = variable_tensors_.find(name); if (it == variable_tensors_.end()) { GELOGD("Failed to get variable tensor. var name = [%s]", name.c_str()); @@ -113,7 +113,7 @@ GeModelPtr HybridModel::GetGeModel(const NodePtr &node) const { return it->second; } -const GraphItem* HybridModel::GetRootGraphItem() const { +const GraphItem *HybridModel::GetRootGraphItem() const { return root_graph_item_.get(); } diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index 03e76bc9..cda2fc42 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -287,6 +287,16 @@ Status HybridModelBuilder::ParseDependentInputNodes(NodeItem &node_item, const s src_node_item->NodeName().c_str()); src_node_item->has_observer = true; node_item.dependents_for_execution.emplace_back(src_node); + node_item.has_observer = true; + for (auto &dst_node : ge_node->GetOutNodes()) { + if (dst_node == nullptr) { + continue; + } + + NodeItem *dst_node_item = nullptr; + GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(dst_node, &dst_node_item)); + dst_node_item->dependents_for_execution.emplace_back(ge_node); + } } else if (src_node_item->shape_inference_type == DEPEND_COMPUTE) { GELOGD("[%s] Add input data dependent node [%s] due to inference type = DEPEND_COMPUTE", node_item.NodeName().c_str(), @@ -614,6 +624,15 @@ Status HybridModelBuilder::UnfoldSubgraphs(ComputeGraph &root_graph, ComputeGrap continue; } + if (op_desc->HasAttr(ATTR_STAGE_LEVEL)) { + uint32_t stage_level = UINT32_MAX; + if (AttrUtils::GetInt(node->GetOpDesc(), ATTR_STAGE_LEVEL, stage_level)) { + for (const auto &stage_node : subgraph->GetAllNodes()) { + GELOGD("Set ATTR_STAGE_LEVEL on node %s, stage_level=%u", stage_node->GetName().c_str(), stage_level); + (void)AttrUtils::SetInt(stage_node->GetOpDesc(), ATTR_STAGE_LEVEL, stage_level); + } + } + } GE_CHK_GRAPH_STATUS_RET(UnfoldSubgraph(root_graph, *merged_graph, *subgraph), "[%s] Failed to merge subgraph.", subgraph->GetName().c_str()); @@ -621,6 +640,14 @@ Status HybridModelBuilder::UnfoldSubgraphs(ComputeGraph &root_graph, ComputeGrap // invoke before adding subgraphs. in case modify node id in known-shaped subgraphs. GE_CHK_GRAPH_STATUS_RET(merged_graph->TopologicalSorting(), "Failed to invoke TopologicalSorting on merged graph."); + GE_DUMP(merged_graph, "hybrid_merged_graph_BeforeStageSort"); + merged_graph->TopologicalSorting([](const NodePtr &a, const NodePtr &b) -> bool { + uint32_t a_level = UINT32_MAX; + (void)AttrUtils::GetInt(a->GetOpDesc(), ATTR_STAGE_LEVEL, a_level); + uint32_t b_level = UINT32_MAX; + (void)AttrUtils::GetInt(b->GetOpDesc(), ATTR_STAGE_LEVEL, b_level); + return a_level < b_level; + }); for (auto &remained_subgraph : root_graph.GetAllSubgraphs()) { GELOGD("Adding subgraph [%s] to merged-graph.", remained_subgraph->GetName().c_str()); @@ -675,41 +702,17 @@ Status HybridModelBuilder::UnfoldSubgraph(ComputeGraph &root_graph, } Status HybridModelBuilder::BuildOutputMapping(GraphItem &graph_item, - const NodeItem &node_item, - bool is_root_graph) { - auto output_size = node_item.num_inputs; - graph_item.output_edges_.resize(output_size); - - for (auto &in_data_anchor : node_item.node->GetAllInDataAnchors()) { - auto peer_out_anchor = in_data_anchor->GetPeerOutAnchor(); - GE_CHECK_NOTNULL(peer_out_anchor); - auto src_node = peer_out_anchor->GetOwnerNode(); - GE_CHECK_NOTNULL(src_node); - - auto src_node_item = GetNodeItem(src_node); - GE_CHECK_NOTNULL(src_node_item); - auto output_idx = in_data_anchor->GetIdx(); - auto output_offset = src_node_item->output_start + peer_out_anchor->GetIdx(); - GELOGI("Output[%d], node = %s, output_index = %d, output_offset = %d ", - output_idx, - src_node_item->NodeName().c_str(), - peer_out_anchor->GetIdx(), - output_offset); - - GE_CHECK_LE(output_idx, output_size - 1); - graph_item.output_edges_[output_idx] = {src_node_item, peer_out_anchor->GetIdx()}; - } - - if (!is_root_graph) { - for (uint32_t i = 0; i < static_cast(output_size); ++i) { - uint32_t p_index = i; - // Net output of Subgraph of while do not have parent index - if (AttrUtils::GetInt(node_item.op_desc->GetInputDesc(i), ATTR_NAME_PARENT_NODE_INDEX, p_index)) { - GELOGD("[%s] Parent index not set for input[%u].", node_item.NodeName().c_str(), i); - } - - graph_item.output_index_mapping_.emplace_back(p_index); + const NodeItem &node_item) { + auto output_size = node_item.op_desc->GetAllInputsSize(); + GE_CHECK_LE(output_size, UINT32_MAX); + for (uint32_t i = 0; i < static_cast(output_size); ++i) { + uint32_t p_index = i; + // Net output of Subgraph of while do not have parent index + if (AttrUtils::GetInt(node_item.op_desc->GetInputDesc(i), ATTR_NAME_PARENT_NODE_INDEX, p_index)) { + GELOGD("[%s] Parent index not set for input[%u].", node_item.NodeName().c_str(), i); } + + graph_item.output_index_mapping_.emplace_back(p_index); } return SUCCESS; @@ -732,6 +735,7 @@ Status HybridModelBuilder::LoadGraph() { GE_CHK_STATUS_RET(LoadDynamicSubgraph(*root_graph, true), "Failed to load root graph."); GELOGD("Done loading root graph successfully."); + GE_CHK_STATUS_RET(hybrid_model_.root_graph_item_->GroupNodes(), "Failed to group nodes for root graph"); for (auto &sub_graph : root_graph->GetAllSubgraphs()) { GE_CHECK_NOTNULL(sub_graph); @@ -805,6 +809,7 @@ Status HybridModelBuilder::VarNodeToTensor(const NodePtr &var_node, std::unique_ // var size is only for checking, will not allocate any memory by it tensor.reset(new(std::nothrow)TensorValue(dev_mem, static_cast(var_size))); GE_CHECK_NOTNULL(tensor); + GELOGI("Get var memory addr %p for node %s, size = %lld, mem_type=%u", dev_mem, var_name.c_str(), var_size, mem_type); return SUCCESS; } @@ -1737,8 +1742,14 @@ Status HybridModelBuilder::CreateProfilingNodeBefore(GraphItem &graph_item, cons for (const auto &task_def : task_def_lists) { hybrid_model_.task_defs_[profiling_node].emplace_back(task_def); } + if (op_desc->HasAttr(ATTR_STAGE_LEVEL)) { + uint32_t stage_level = UINT32_MAX; + (void)ge::AttrUtils::GetInt(op_desc, ATTR_STAGE_LEVEL, stage_level); + (void)ge::AttrUtils::SetInt(node_ptr->GetOpDesc(), ATTR_STAGE_LEVEL, stage_level); + } NodeItem *node_item = nullptr; GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(profiling_node, &node_item)); + GE_CHECK_NOTNULL(node_item); node_item->input_start = 0; node_item->output_start = 0; graph_item.node_items_.emplace_back(node_item); @@ -1812,8 +1823,14 @@ Status HybridModelBuilder::CreateProfilingNodeAfter(GraphItem &graph_item, const for (const auto &task_def : task_def_lists) { hybrid_model_.task_defs_[profiling_node].emplace_back(task_def); } + if (op_desc->HasAttr(ATTR_STAGE_LEVEL)) { + uint32_t stage_level = UINT32_MAX; + (void)ge::AttrUtils::GetInt(op_desc, ATTR_STAGE_LEVEL, stage_level); + (void)ge::AttrUtils::SetInt(profiling_node->GetOpDesc(), ATTR_STAGE_LEVEL, stage_level); + } NodeItem *node_item = nullptr; GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(profiling_node, &node_item)); + GE_CHECK_NOTNULL(node_item); node_item->input_start = 0; node_item->output_start = 0; graph_item.node_items_.emplace_back(node_item); @@ -1859,7 +1876,9 @@ Status HybridModelBuilder::LoadDynamicSubgraph(ComputeGraph &graph, bool is_root data_nodes.emplace_back(node_item); } else if (op_type == NETOUTPUT) { graph_item->output_node_ = node_item; - GE_CHK_STATUS_RET_NOLOG(BuildOutputMapping(*graph_item, *node_item, is_root_graph)); + if (!is_root_graph) { + GE_CHK_STATUS_RET_NOLOG(BuildOutputMapping(*graph_item, *node_item)); + } } GE_CHK_STATUS_RET_NOLOG(CreateProfilingNodeBefore(*graph_item, node)); graph_item->node_items_.emplace_back(node_item); diff --git a/ge/hybrid/model/hybrid_model_builder.h b/ge/hybrid/model/hybrid_model_builder.h index 71663a6e..107ace78 100644 --- a/ge/hybrid/model/hybrid_model_builder.h +++ b/ge/hybrid/model/hybrid_model_builder.h @@ -53,7 +53,7 @@ class HybridModelBuilder { std::vector &data_nodes, bool is_root_graph); static Status ResolveRefIo(NodeItem &node_item); - Status BuildOutputMapping(GraphItem &partitioned_call, const NodeItem &node_item, bool is_root_graph); + Status BuildOutputMapping(GraphItem &partitioned_call, const NodeItem &node_item); Status ValidateParams(); Status LoadGraph(); Status LoadGeModel(ComputeGraph &graph, const GeModelPtr &ge_model); diff --git a/ge/hybrid/model/node_item.cc b/ge/hybrid/model/node_item.cc index acc250ef..be1b6ffe 100644 --- a/ge/hybrid/model/node_item.cc +++ b/ge/hybrid/model/node_item.cc @@ -21,8 +21,8 @@ #include "graph/compute_graph.h" #include "graph/debug/ge_attr_define.h" #include "graph/utils/node_utils.h" -#include "hybrid/node_executor/node_executor.h" #include "hybrid/executor/worker/shape_inference_engine.h" +#include "hybrid/node_executor/node_executor.h" namespace ge { namespace hybrid { @@ -146,6 +146,20 @@ Status NodeItem::InitInputsAndOutputs() { GE_CHECK_LE(op_desc->GetOutputsSize(), INT32_MAX); num_inputs = static_cast(op_desc->GetInputsSize()); num_outputs = static_cast(op_desc->GetOutputsSize()); + if (AttrUtils::GetInt(op_desc, ::ge::ATTR_STAGE_LEVEL, group)) { + GELOGD("[%s] Got stage level from op_desc = %d", op_desc->GetName().c_str(), group); + } else { + if (AttrUtils::GetInt(node->GetOwnerComputeGraph(), ::ge::ATTR_STAGE_LEVEL, group)) { + GELOGD("[%s] Got stage level from parent graph = %d", op_desc->GetName().c_str(), group); + } else { + auto parent_node = node->GetOwnerComputeGraph()->GetParentNode(); + if ((parent_node != nullptr) && (AttrUtils::GetInt(parent_node->GetOpDesc(), ::ge::ATTR_STAGE_LEVEL, group))) { + GELOGD("[%s] Got stage level from parent node = %d", op_desc->GetName().c_str(), group); + } else { + GELOGD("[%s] Node do not set stage level", op_desc->GetName().c_str()); + } + } + } ResolveOptionalInputs(); return SUCCESS; } @@ -244,6 +258,7 @@ std::string NodeItem::DebugString() const { ss << ", is_dynamic = " << (is_dynamic ? "True" : "False"); ss << ", is_output_static = " << (is_output_shape_static ? "True" : "False"); ss << ", unknown_shape_op_type = " << shape_inference_type; + ss << ", stage = " << group; ss << ", input_start = " << input_start; ss << ", num_inputs = " << num_inputs; ss << ", output_start = " << output_start; diff --git a/ge/hybrid/model/node_item.h b/ge/hybrid/model/node_item.h index a34227c5..300744d1 100644 --- a/ge/hybrid/model/node_item.h +++ b/ge/hybrid/model/node_item.h @@ -74,6 +74,7 @@ struct NodeItem { NodePtr node; OpDesc *op_desc; int node_id = -1; + int group = -1; int num_inputs = 0; int num_outputs = 0; int input_start = -1; diff --git a/ge/hybrid/node_executor/aicore/aicore_op_task.cc b/ge/hybrid/node_executor/aicore/aicore_op_task.cc index a34bba22..6ab62f3f 100644 --- a/ge/hybrid/node_executor/aicore/aicore_op_task.cc +++ b/ge/hybrid/node_executor/aicore/aicore_op_task.cc @@ -17,6 +17,7 @@ #include "hybrid/node_executor/aicore/aicore_op_task.h" #include "framework/common/taskdown_common.h" #include "framework/common/debug/log.h" +#include "graph/ge_context.h" #include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/node_executor/aicore/aicore_task_builder.h" #include "graph/load/model_manager/tbe_handle_store.h" @@ -198,9 +199,12 @@ Status AiCoreOpTask::UpdateTilingInfo(TaskContext &context) { tiling_info.clear_atomic = true; auto execution_context = context.GetExecutionContext(); + + GetContext().SetSessionId(execution_context->context_id); RECORD_EXECUTION_EVENT(execution_context, context.GetNodeName(), "[CalcTilingInfo] Start"); GE_CHK_STATUS_RET(CalcTilingInfo(node, tiling_info)); RECORD_EXECUTION_EVENT(execution_context, context.GetNodeName(), "[CalcTilingInfo] End"); + GetContext().SetSessionId(execution_context->session_id); // update op args by tiling info block_dim_ = static_cast(tiling_info.block_dim); diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc index ca9e908b..d42c5cf7 100755 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc @@ -74,7 +74,7 @@ Status AicpuNodeTaskBase::InitExtInfo(const std::string &kernel_ext_info, int64_ return SUCCESS; } -Status AicpuNodeTaskBase::UpdateOutputShapeFromExtInfo() { +Status AicpuNodeTaskBase::UpdateOutputShapeFromExtInfo(TaskContext &task_context) { if (node_item_->num_outputs == 0) { GELOGD("Task [%s] output_num is 0, no need update output shape.", node_name_.c_str()); return SUCCESS; @@ -91,19 +91,19 @@ Status AicpuNodeTaskBase::UpdateOutputShapeFromExtInfo() { // not support update data type now, just for param DataType data_type; aicpu_ext_handle_.GetOutputShapeAndType(i, shape, data_type); - auto output_desc = node_item_->MutableOutputDesc(i); - GE_CHECK_NOTNULL(output_desc); - GE_CHK_STATUS_RET(UpdateShapeToOutputDesc(shape, i, output_desc), + GE_CHK_STATUS_RET(UpdateShapeToOutputDesc(task_context, shape, i), "Update node %s [%d]th output shape failed.", node_name_.c_str(), i); } return SUCCESS; } -Status AicpuNodeTaskBase::UpdateShapeToOutputDesc(const GeShape &shape_new, - int32_t output_index, GeTensorDescPtr &output_desc) { +Status AicpuNodeTaskBase::UpdateShapeToOutputDesc(TaskContext &task_context, + const GeShape &shape_new, + int32_t output_index) { + auto output_desc = task_context.MutableOutputDesc(output_index); + GE_CHECK_NOTNULL(output_desc); auto shape_old = output_desc->GetShape(); - output_desc->SetShape(shape_new); GELOGD("Update node[%s] out[%d] shape from %s to %s.", node_name_.c_str(), output_index, shape_old.ToString().c_str(), shape_new.ToString().c_str()); @@ -111,9 +111,9 @@ Status AicpuNodeTaskBase::UpdateShapeToOutputDesc(const GeShape &shape_new, auto origin_format = output_desc->GetOriginFormat(); auto format = output_desc->GetFormat(); if (origin_format == format) { - output_desc->SetOriginShape(shape_new); - return SUCCESS; + return task_context.GetNodeState()->UpdateOutputShapes(output_index, shape_new, shape_new); } + // if format is not same need convert shape std::vector origin_dims_new; auto trans_ret = formats::TransShape(format, shape_new.GetDims(), @@ -122,7 +122,8 @@ Status AicpuNodeTaskBase::UpdateShapeToOutputDesc(const GeShape &shape_new, "Node[%s] out[%d] originFormat[%d] is not same as format[%d], but TransShape failed, shape=%s.", node_name_.c_str(), output_index, origin_format, format, shape_new.ToString().c_str()); auto origin_shape_new = GeShape(origin_dims_new); - output_desc->SetOriginShape(origin_shape_new); + GE_CHK_STATUS_RET(task_context.GetNodeState()->UpdateOutputShapes(output_index, shape_new, origin_shape_new), + "Node[%s] failed to update update shape, index = %d", node_name_.c_str(), output_index); GELOGD("Node[%s] out[%d] originFormat[%d] is not same as format[%d], need update from %s ro %s.", node_name_.c_str(), output_index, origin_format, format, origin_shape_old.ToString().c_str(), origin_shape_new.ToString().c_str()); @@ -513,7 +514,6 @@ Status AicpuTfNodeTask::UpdateShapeByHbmBuffer(TaskContext &context, node_name_.c_str(), node_item_->num_outputs, out_shape_hbm.size()); for (auto i = 0; i < node_item_->num_outputs; ++i) { const auto &result_summary = output_summary_host_[i]; - auto output_desc = node_item_->MutableOutputDesc(i); std::vector shape_dims; if (result_summary.shape_data_size > 0) { const auto &shape_hbm = out_shape_hbm[i]; @@ -531,7 +531,7 @@ Status AicpuTfNodeTask::UpdateShapeByHbmBuffer(TaskContext &context, GELOGD("Node[%s] [%d]th output dim[%u]=%ld.", node_name_.c_str(), i, dim_idx, shape_addr[dim_idx]); } } - GE_CHK_STATUS_RET(UpdateShapeToOutputDesc(GeShape(shape_dims), i, output_desc), + GE_CHK_STATUS_RET(UpdateShapeToOutputDesc(context, GeShape(shape_dims), i), "Node[%s] update [%d]th output shape failed.", node_name_.c_str(), i); } @@ -634,7 +634,7 @@ Status AicpuTfNodeTask::TaskCallback(TaskContext &context) { // check need update shape, call update shape. if (unknown_type_ == DEPEND_SHAPE_RANGE) { // check result - callback_ret = UpdateOutputShapeFromExtInfo(); + callback_ret = UpdateOutputShapeFromExtInfo(context); } else if (unknown_type_ == DEPEND_COMPUTE) { callback_ret = UpdateShapeAndDataByResultSummary(context); } @@ -781,7 +781,7 @@ Status AicpuNodeTask::TaskCallback(TaskContext &context) { // check need update shape, call update shape. if (node_item_->is_dynamic && unknown_type_ == DEPEND_SHAPE_RANGE) { // check result - callback_ret = UpdateOutputShapeFromExtInfo(); + callback_ret = UpdateOutputShapeFromExtInfo(context); } else { GELOGD("Node[%s] unknown shape type is %d no need update output shape.", node_name_.c_str(), unknown_type_); diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h index b9cc8256..19fb5ec7 100644 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h @@ -49,9 +49,9 @@ class AicpuNodeTaskBase : public NodeTask { virtual Status UpdateExtInfo(); - virtual Status UpdateOutputShapeFromExtInfo(); + virtual Status UpdateOutputShapeFromExtInfo(TaskContext &task_context); - Status UpdateShapeToOutputDesc(const GeShape &shape_new, int32_t output_index, GeTensorDescPtr &output_desc); + Status UpdateShapeToOutputDesc(TaskContext &task_context, const GeShape &shape_new, int32_t output_index); virtual Status LaunchTask(TaskContext &context) = 0; diff --git a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc index 5387a176..c9f20381 100644 --- a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc +++ b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc @@ -22,6 +22,8 @@ #include "graph/manager/util/hcom_util.h" #include "graph/runtime_inference_context.h" #include "graph/utils/type_utils.h" +#include "graph/types.h" +#include "hccl/hcom.h" #include "hybrid/executor/hybrid_execution_context.h" namespace ge { @@ -96,13 +98,13 @@ Status HcclNodeTask::ExecuteAsync(TaskContext &context, std::function do GE_CHK_STATUS_RET(HcomOmeUtil::GetHcclRootId(op_desc, root_id), "GetHcclRootId failed"); } op_info.root = root_id; - auto callback = [this, op_desc](HcclResult status) { + auto callback = [op_desc, done_callback](HcclResult status) { if (status != HCCL_SUCCESS) { GELOGE(HCCL_E_INTERNAL, "node %s call HcomExecEnqueueOperation failed, ret: 0x%X", op_desc->GetName().c_str(), status); } - std::lock_guard lock(this->hccl_mutex_); - this->cond_.notify_all(); + + done_callback(); GELOGI("node %s hccl callback success.", op_desc->GetName().c_str()); }; int32_t count = 0; @@ -119,11 +121,6 @@ Status HcclNodeTask::ExecuteAsync(TaskContext &context, std::function do return HCCL_E_INTERNAL; } - // pending until hccl finished - std::unique_lock ulock(hccl_mutex_); - cond_.wait(ulock); - - GE_CHK_STATUS_RET_NOLOG(context.RegisterCallback(done_callback)); GELOGI("[%s] HcclNodeTask::ExecuteAsync success.", context.GetNodeName()); return SUCCESS; } @@ -165,7 +162,8 @@ Status RdmaNodeTask::Init(TaskContext &context) { Status RdmaNodeTask::ExtractTensor(TaskContext &context, vector &addr_infos) { RuntimeInferenceContext *ctx = nullptr; - GE_CHK_STATUS_RET(RuntimeInferenceContext::GetContext(std::to_string(context.GetSessionId()), &ctx)); + GE_CHK_STATUS_RET( + RuntimeInferenceContext::GetContext(std::to_string(context.GetExecutionContext()->context_id), &ctx)); ge::Tensor remote_tensor; GE_CHK_STATUS_RET(ctx->GetTensor(remote_index_.first, remote_index_.second, remote_tensor)); @@ -282,12 +280,13 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function do return SUCCESS; } - auto callback = [this](HcclResult status) { + TaskContext *p_ctx = &context; + auto callback = [p_ctx, done_callback](HcclResult status) { if (status != HCCL_SUCCESS) { - GELOGE(HCCL_E_INTERNAL, "Call HcomExecInitialize failed, ret: 0x%X", status); + GELOGE(HCCL_E_INTERNAL, "Call HcomExcutorInitialize failed, ret: 0x%X", status); + p_ctx->SetStatus(FAILED); } - std::lock_guard lock(this->hccl_mutex_); - this->cond_.notify_all(); + done_callback(); GELOGI("rdma callback success."); }; @@ -297,15 +296,10 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function do } HcclResult hccl_ret = HcomExecEnqueueRemoteAccess(context.GetNodeItem().NodeType(), addr_infos, callback); if (hccl_ret != HCCL_SUCCESS) { - GELOGE(HCCL_E_INTERNAL, "Call HcomExecInitialize failed, ret: 0x%X", hccl_ret); + GELOGE(HCCL_E_INTERNAL, "Call HcomExcutorInitialize failed, ret: 0x%X", hccl_ret); return HCCL_E_INTERNAL; } - // pending until hccl finished - std::unique_lock ulock(hccl_mutex_); - cond_.wait(ulock); - - (void)context.RegisterCallback(done_callback); GELOGI("[%s] RdmaNodeTask::ExecuteAsync success.", context.GetNodeName()); return SUCCESS; } diff --git a/ge/hybrid/node_executor/task_context.cc b/ge/hybrid/node_executor/task_context.cc index bc318124..e3cf5ae1 100644 --- a/ge/hybrid/node_executor/task_context.cc +++ b/ge/hybrid/node_executor/task_context.cc @@ -27,10 +27,12 @@ namespace ge { namespace hybrid { TaskContext::TaskContext(GraphExecutionContext *execution_context, - const NodeItem *node_item, + NodeState *node_state, SubgraphContext *subgraph_context) - : node_item_(node_item), execution_context_(execution_context), subgraph_context_(subgraph_context) { -} + : node_state_(node_state), + node_item_(node_state->GetNodeItem()), + execution_context_(execution_context), + subgraph_context_(subgraph_context) {} TaskContext::~TaskContext() { GELOGD("[%s] TaskContext destroyed.", node_item_->NodeName().c_str()); @@ -47,9 +49,10 @@ TaskContext::~TaskContext() { } } -std::unique_ptr TaskContext::Create(const NodeItem &node_item, +std::unique_ptr TaskContext::Create(NodeState *node_state, GraphExecutionContext *execution_context, SubgraphContext *subgraph_context) { + const NodeItem &node_item = *node_state->GetNodeItem(); GELOGI("[%s] To create task context, input start = %d, num_inputs = %d, output start = %d, num_outputs = %d.", node_item.NodeName().c_str(), node_item.input_start, @@ -65,7 +68,7 @@ std::unique_ptr TaskContext::Create(const NodeItem &node_item, } auto task_context = std::unique_ptr( - new(std::nothrow)TaskContext(execution_context, &node_item, subgraph_context)); + new(std::nothrow)TaskContext(execution_context, node_state, subgraph_context)); if (task_context == nullptr) { GELOGE(MEMALLOC_FAILED, "[%s] Failed to create instance of TaskContext.", node_item.NodeName().c_str()); return nullptr; @@ -154,7 +157,7 @@ Status TaskContext::RegisterCallback(const std::function &callback_fun) GELOGW("[%s] Callback is NULL", GetNodeName()); return SUCCESS; } - auto ret = execution_context_->callback_manager->RegisterCallback(callback_fun); + auto ret = execution_context_->callback_manager->RegisterCallback(GetStream(), callback_fun); if (ret != SUCCESS) { GELOGE(ret, "[%s] Failed to register callback", GetNodeName()); execution_context_->callback_manager->Destroy(); @@ -309,7 +312,7 @@ Status TaskContext::SetOutput(int index, const TensorValue &tensor) { return SUCCESS; } -rtStream_t TaskContext::GetStream() { +rtStream_t TaskContext::GetStream() const { return execution_context_->stream; } @@ -536,6 +539,10 @@ Status TaskContext::SaveProfilingTaskDescInfo(uint32_t task_id, uint32_t stream return SUCCESS; } +NodeState *TaskContext::GetNodeState() const { + return node_state_; +} + Status TaskContext::SaveProfilingGraphDescInfo(uint32_t task_id, uint32_t stream_id) { if (ProfilingManager::Instance().ProfilingModelExecuteOn()) { const NodeItem &node_item = GetNodeItem(); diff --git a/ge/hybrid/node_executor/task_context.h b/ge/hybrid/node_executor/task_context.h index e7ee4fc8..dc4ff058 100644 --- a/ge/hybrid/node_executor/task_context.h +++ b/ge/hybrid/node_executor/task_context.h @@ -25,6 +25,7 @@ #include "framework/common/ge_types.h" #include "hybrid/common/tensor_value.h" #include "hybrid/common/npu_memory_allocator.h" +#include "hybrid/executor/node_state.h" #include "hybrid/executor/rt_callback_manager.h" #include "hybrid/model/node_item.h" @@ -35,7 +36,7 @@ class SubgraphContext; class TaskContext { public: - static std::unique_ptr Create(const NodeItem &node_item, + static std::unique_ptr Create(NodeState *node_state, GraphExecutionContext *execution_context, SubgraphContext *subgraph_context); @@ -45,6 +46,7 @@ class TaskContext { int NumOutputs() const; size_t NumWorkspaces() const; const NodeItem &GetNodeItem() const; + NodeState *GetNodeState() const; const char *GetNodeName() const; TensorValue *MutableInput(int index); ConstGeTensorDescPtr GetInputDesc(int index) const; @@ -58,7 +60,7 @@ class TaskContext { const TensorValue *GetOutput(int index) const; TensorValue *MutableOutput(int index); TensorValue *GetVariable(const std::string &name); - rtStream_t GetStream(); + rtStream_t GetStream() const; int64_t GetSessionId() const; uint64_t GetIterationNumber() const; @@ -119,12 +121,13 @@ class TaskContext { private: TaskContext(GraphExecutionContext *execution_context, - const NodeItem *node_item, + NodeState *node_state, SubgraphContext *subgraph_context); static string TensorDesc2String(const GeTensorDesc &desc); Status AllocateTensor(const GeTensorDesc &tensor_desc, TensorValue &tensor, AllocationAttr *attr); + NodeState *node_state_ = nullptr; const NodeItem *node_item_ = nullptr; bool force_infer_shape_ = false; GraphExecutionContext *execution_context_;