!1064 dynamic shape supprot pipeline

From: @isaacxr
Reviewed-by: @sheng-nan,@xchu42
Signed-off-by: @ji_chen
pull/1064/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit dfd119571e

@ -351,6 +351,7 @@ set(TRAIN_SRC_LIST
"hybrid/executor/node_done_manager.cc"
"hybrid/executor/hybrid_profiler.cc"
"hybrid/executor/hybrid_model_executor.cc"
"hybrid/executor/hybrid_model_pipeline_executor.cc"
"hybrid/executor/hybrid_model_async_executor.cc"
"hybrid/executor/hybrid_execution_context.cc"
"hybrid/executor/subgraph_context.cc"

@ -81,6 +81,7 @@ set(SRC_LIST
"../hybrid/executor/node_done_manager.cc"
"../hybrid/executor/hybrid_profiler.cc"
"../hybrid/executor/hybrid_model_executor.cc"
"../hybrid/executor/hybrid_model_pipeline_executor.cc"
"../hybrid/executor/hybrid_model_async_executor.cc"
"../hybrid/executor/hybrid_execution_context.cc"
"../hybrid/executor/subgraph_context.cc"

@ -3032,6 +3032,7 @@ Status GraphManager::OptimizeSubgraph(const GraphNodePtr &graph_node, ComputeGra
return FAILED;
}
GE_TIMESTAMP_EVENT_END(GraphPartitionDynamicShape, "OptimizeSubgraph::GraphPartitionDynamicShape");
GE_DUMP(compute_graph, "AfterDynamicShapePartition");
GE_TIMESTAMP_START(GraphPartition);
GraphPartitioner &partitioner = GetCompilerStages(graph_node->GetGraphId()).partitioner;
ret = partitioner.Partition(compute_graph, GraphPartitioner::kPartitioning);

@ -742,6 +742,12 @@ Status GraphOptimize::HandleMemoryRWConflict(ComputeGraphPtr &compute_graph) {
if (node->GetType() == NETOUTPUT && AttrUtils::HasAttr(node->GetOpDesc(), ATTR_NAME_PARENT_NODE_INDEX)) {
continue;
}
bool identity_reserved = false;
AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_CANNOT_BE_DELETED, identity_reserved);
if (identity_reserved) {
GELOGD("Identity [%s] need to be reserved", node->GetName().c_str());
continue;
}
if (node->GetType() == IDENTITY || node->GetType() == READVARIABLEOP) {
// split identity
ret = SplitIdentity(node);

@ -52,6 +52,7 @@ Status StagePartitioner::Partition() {
return SUCCESS;
}
GE_DUMP(root_graph_, "BeforeStagePartition");
if (SplitStageLevel() != SUCCESS) {
GELOGE(FAILED, "Split graph-stage for graph %s failed.", root_graph_->GetName().c_str());
return FAILED;
@ -74,6 +75,7 @@ Status StagePartitioner::Partition() {
"maybe stage_level was not set correctly.", root_graph_->GetName().c_str());
return FAILED;
}
GE_DUMP(root_graph_, "AfterStagePartition");
return SUCCESS;
}

@ -460,6 +460,7 @@ Status SubgraphPass::InsertMemcpyNode(const ComputeGraphPtr &graph, const OutDat
.AddOutput("y", in_node->GetOpDesc()->GetOutputDesc(0))
.Build();
(void)AttrUtils::SetBool(op_desc, ATTR_NO_NEED_CONSTANT_FOLDING, false);
(void)AttrUtils::SetBool(op_desc, ATTR_NAME_CANNOT_BE_DELETED, true);
if (GraphUtils::InsertNodeAfter(out_anchor, in_anchors, graph->AddNode(op_desc)) != GRAPH_SUCCESS) {
GELOGE(FAILED, "Insert IDENTITY node %s after %s failed.", name.c_str(), in_node->GetName().c_str());
return FAILED;

@ -15,6 +15,7 @@
*/
#include "hybrid_execution_context.h"
#include <atomic>
namespace ge {
namespace hybrid {
@ -23,7 +24,14 @@ const uint32_t kEndOfSequence = 0x0704000a;
const uint32_t kEndOfSequenceNew = 507005;
const int32_t kModelAbortNormal = 0x0704000e;
const int32_t kModelAbortNormalNew = 507024;
std::atomic_ulong context_id_gen {};
} // namespace
GraphExecutionContext::GraphExecutionContext() {
context_id = context_id_gen++;
}
void GraphExecutionContext::SetErrorCode(Status error_code) {
std::lock_guard<std::mutex> lk(mu);
this->status = error_code;

@ -48,11 +48,15 @@
namespace ge {
namespace hybrid {
struct GraphExecutionContext {
GraphExecutionContext();
~GraphExecutionContext() = default;
void SetErrorCode(Status error_code);
Status GetStatus() const;
Status Synchronize(rtStream_t rt_stream);
uint64_t session_id = 0;
uint64_t context_id = 0;
const HybridModel *model = nullptr;
const GEThreadLocalContext *ge_context = nullptr;
rtStream_t stream = nullptr;
@ -67,6 +71,8 @@ struct GraphExecutionContext {
std::atomic_bool is_eos_;
long profiling_level = 0;
long iteration = 0;
private:
Status status = SUCCESS;
mutable std::mutex mu;
};
@ -75,7 +81,8 @@ struct GraphExecutionContext {
do { \
if ((context != nullptr) && (context)->profiler != nullptr) { \
if (node_name != nullptr) { \
context->profiler->RecordEvent(evt_type, "tid:%lu [%s] [%s] " fmt, GeLog::GetTid(), node_name, category, \
context->profiler->RecordEvent(evt_type, "tid:%lu [%s@%ld] [%s] " fmt, \
GeLog::GetTid(), node_name, context->iteration, category, \
##__VA_ARGS__); \
} else { \
context->profiler->RecordEvent(evt_type, "tid:%lu [%s] " fmt, GeLog::GetTid(), category, ##__VA_ARGS__); \

@ -25,6 +25,7 @@ namespace ge {
namespace hybrid {
namespace {
const int kDataOutputIndex = 0;
const size_t kMinimumPiplineStages = 2;
}
HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model)
: model_(model), run_flag_(false) {
@ -95,7 +96,17 @@ Status HybridModelAsyncExecutor::Init() {
executor_ = std::unique_ptr<HybridModelExecutor>(new(std::nothrow) HybridModelExecutor(model_, device_id_, stream_));
GE_CHECK_NOTNULL(executor_);
GE_CHK_STATUS_RET(executor_->Init(), "Failed to init hybrid engine");
GELOGI("HybridModel stage nums:%zu", model_->GetRootGraphItem()->NumGroups());
if (model_->GetRootGraphItem()->NumGroups() >= kMinimumPiplineStages) {
pipe_executor_ =
std::unique_ptr<HybridModelPipelineExecutor>(new(std::nothrow) HybridModelPipelineExecutor(model_, device_id_));
GE_CHECK_NOTNULL(pipe_executor_);
GE_CHK_STATUS_RET(pipe_executor_->Init(), "Failed to init hybrid engine");
}
GE_CHK_STATUS_RET(InitInputDesc(), "Failed to init input tensors");
return SUCCESS;
}
@ -135,7 +146,18 @@ Status HybridModelAsyncExecutor::RunInternal() {
CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC);
continue, "PreRun failed."); // [No need to check value]
ret = executor_->Execute(args);
if (pipe_executor_ != nullptr) {
GELOGI("HybridModel will execute in pipeline mode");
auto iter_per_run = std::getenv("ITER_NUM");
if (iter_per_run) {
args.num_loops = static_cast<int>(strtol(iter_per_run, nullptr, 10));
}
ret = pipe_executor_->Execute(args);
} else {
GELOGI("HybridModel will execute in singleline mode");
ge::GetContext().SetSessionId(executor_->GetContext()->session_id);
ret = executor_->Execute(args);
}
ret = HandleResult(ret, current_data.index, args, data_wrapper->GetOutput());
if (ret != SUCCESS) {
CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_RUNTIME, JOBSUBSTATE_GRAPH_EXEC);

@ -23,6 +23,7 @@
#include "external/ge/ge_api_types.h"
#include "graph/load/model_manager/data_inputer.h"
#include "hybrid/executor/hybrid_model_executor.h"
#include "hybrid/executor/hybrid_model_pipeline_executor.h"
#include "runtime/stream.h"
namespace ge {
@ -81,6 +82,7 @@ class HybridModelAsyncExecutor {
std::atomic_bool run_flag_;
std::unique_ptr<DataInputer> data_inputer_;
std::unique_ptr<HybridModelExecutor> executor_;
std::unique_ptr<HybridModelPipelineExecutor> pipe_executor_;
std::future<Status> future_;
uint64_t iterator_count_ = 0;

@ -81,13 +81,14 @@ Status HybridModelExecutor::ExecuteGraphInternal(SubgraphExecutor &executor,
args.outputs.clear();
HYBRID_CHK_STATUS_RET(executor.GetOutputs(args.outputs, args.output_desc), "Failed to get outputs");
RECORD_MODEL_EXECUTION_EVENT(&context_, "[GetOutput] End");
context_.iteration +=1;
return SUCCESS;
}
Status HybridModelExecutor::Cleanup() {
GELOGD("Start to cleanup.");
context_.callback_manager->Destroy();
RuntimeInferenceContext::DestroyContext(std::to_string(context_.session_id));
RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
GELOGD("Cleanup successfully.");
return SUCCESS;
}
@ -105,7 +106,7 @@ Status HybridModelExecutor::InitExecutionContext() {
GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id);
context_.allocator = NpuMemoryAllocator::GetAllocator(device_id_);
GE_CHECK_NOTNULL(context_.allocator);
context_.callback_manager = std::unique_ptr<CallbackManager>(new(std::nothrow)CallbackManager(stream_));
context_.callback_manager = std::unique_ptr<CallbackManager>(new(std::nothrow)CallbackManager());
GE_CHECK_NOTNULL(context_.callback_manager);
context_.dump_properties = PropertiesManager::Instance().GetDumpProperties(context_.session_id);
const char *profiling_level = std::getenv(kEnvProfilingLevel);
@ -126,7 +127,7 @@ Status HybridModelExecutor::InitExecutionContext() {
Status HybridModelExecutor::ResetExecutionContext(GraphExecutionContext &context) {
GE_CHK_STATUS_RET_NOLOG(context.callback_manager->Init());
string ctx_id = std::to_string(context.session_id);
string ctx_id = std::to_string(context.context_id);
RuntimeInferenceContext::DestroyContext(ctx_id);
GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::CreateContext(ctx_id), "Failed to Destroy RuntimeInferenceContext");
return SUCCESS;

@ -32,6 +32,7 @@ class HybridModelExecutor {
std::vector<TensorValue> outputs;
std::vector<ConstGeTensorDescPtr> output_desc;
bool is_eos = false;
int num_loops = 10;
};
HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream);

File diff suppressed because it is too large Load Diff

@ -0,0 +1,88 @@
#ifndef GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_
#define GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_
#include "common/blocking_queue.h"
#include "common/thread_pool.h"
#include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/executor/rt_callback_manager.h"
#include "hybrid/executor/subgraph_executor.h"
#include "hybrid_model_executor.h"
namespace ge {
namespace hybrid {
struct PipeExecutionConfig {
uint32_t device_id;
rtContext_t rt_context;
int num_executors;
int num_stages;
long iteration_end;
};
class StageExecutor {
public:
struct StageTask {
rtEvent_t event = nullptr;
int stage = 0;
long iteration = 0;
};
StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config);
~StageExecutor();
Status Init();
void Reset();
Status Start(const std::vector<TensorValue> &inputs, const std::vector<ConstGeTensorDescPtr> &input_desc,
int loop_count);
Status SetInputs(const std::vector<TensorValue> &inputs, const std::vector<ConstGeTensorDescPtr> &input_desc);
Status ExecuteAsync(const StageTask &args);
Status GetOutputs(std::vector<TensorValue> &outputs, std::vector<ConstGeTensorDescPtr> &output_desc);
Status Synchronize();
void SetNext(StageExecutor *next_executor) { next_executor_ = next_executor; }
private:
friend class HybridModelPipelineExecutor;
static Status ResetExecutionContext(GraphExecutionContext &context);
Status InitExecutionContext();
int id_;
HybridModel *model_;
PipeExecutionConfig *pipe_config_;
BlockingQueue<StageTask> task_queue_;
std::unique_ptr<SubgraphExecutor> root_graph_executor_;
GraphExecutionContext context_;
StageExecutor *next_executor_;
rtStream_t stream_ = nullptr;
};
class HybridModelPipelineExecutor {
public:
HybridModelPipelineExecutor(HybridModel *model, uint32_t device_id);
~HybridModelPipelineExecutor();
Status Init();
Status InitStageExecutors();
Status Execute(HybridModelExecutor::ExecuteArgs &args);
private:
HybridModel *model_;
uint32_t device_id_;
std::vector<std::unique_ptr<StageExecutor>> stage_executors_;
PipeExecutionConfig config_;
GraphExecutionContext context_;
long iteration_ = 0;
};
} // namespace hybrid
} // namespace ge
#endif // GE_HYBRID_EXECUTOR_HYBRID_MODEL_PIPELINE_EXECUTOR_H_

@ -24,7 +24,7 @@
namespace ge {
namespace hybrid {
namespace {
const int kMaxEvents = 10000;
const int kMaxEvents = 1024 * 500;
const int kEventDescMax = 512;
const int kMaxEventTypes = 8;
const int kIndent = 8;
@ -46,11 +46,14 @@ void HybridProfiler::RecordEvent(EventType event_type, const char *fmt, ...) {
}
va_end(args);
std::string event = buf;
auto index = counter_++;
if (index >= static_cast<int>(events_.size())) {
GELOGE(INTERNAL_ERROR, "index out of range. index = %d, max event size = %zu", index, events_.size());
return;
}
auto &evt = events_[index];
evt.timestamp = std::chrono::system_clock::now();
evt.desc = std::move(event);
evt.desc = std::string(buf);
evt.event_type = event_type;
}
@ -78,7 +81,7 @@ void HybridProfiler::Dump(std::ostream &output_stream) {
auto cost_dump = std::chrono::duration_cast<std::chrono::microseconds>(end_dump - start_dump).count();
output_stream << std::setw(kIndent) << elapsed_dump << "\t\t" << cost_dump
<< "\t\t" << "[Dump profiling]" << std::endl;
events_.clear();
Reset();
}
void HybridProfiler::Reset() {

@ -34,6 +34,14 @@ ShapeInferenceState::ShapeInferenceState(const NodeItem &node_item) : node_item(
GELOGD("[%s] ShapeInferenceState created, pending shape count = %d",
node_item.NodeName().c_str(),
this->num_pending_shapes_);
for (int i = 0; i < node_item.num_inputs; ++i){
input_tensor_desc.emplace_back(std::move(*node_item.MutableInputDesc(i)));
}
for (int i = 0; i < node_item.num_outputs; ++i){
output_tensor_desc.emplace_back(std::move(*node_item.MutableOutputDesc(i)));
}
}
Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target) {
@ -56,11 +64,10 @@ Status ShapeInferenceState::UpdateInputShape(int idx, const GeTensorDesc &target
tensor_size);
std::lock_guard<std::mutex> lk(mu_);
auto tensor_desc = node_item.MutableInputDesc(idx);
GE_CHECK_NOTNULL(tensor_desc);
tensor_desc->SetShape(target.GetShape());
tensor_desc->SetOriginShape(target.GetOriginShape());
(void) TensorUtils::SetSize(*tensor_desc, tensor_size);
auto &input_desc = input_tensor_desc[idx];
input_desc.SetShape(target.GetShape());
input_desc.SetOriginShape(target.GetOriginShape());
(void) TensorUtils::SetSize(input_desc, tensor_size);
if (--num_pending_shapes_ <= 0) {
ready_cv_.notify_all();
}
@ -115,12 +122,27 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex
}
}
for (size_t i = 0; i < input_tensor_desc.size(); ++i) {
auto dst_tensor_desc = node_item.op_desc->MutableInputDesc(i);
if (dst_tensor_desc == nullptr) {
continue;
}
auto &tensor_desc = input_tensor_desc[i];
int64_t tensor_size = -1;
(void) TensorUtils::GetSize(tensor_desc, tensor_size);
dst_tensor_desc->SetShape(tensor_desc.MutableShape());
dst_tensor_desc->SetOriginShape(tensor_desc.GetOriginShape());
(void) TensorUtils::SetSize(*dst_tensor_desc, tensor_size);
}
for (auto &p : shape_futures) {
auto idx = p.first;
auto &future = p.second;
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx);
GeTensorDescPtr src_tensor_desc;
GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(src_tensor_desc));
const GeTensorDesc* src_tensor_desc = nullptr;
GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(&src_tensor_desc));
GE_CHECK_NOTNULL(src_tensor_desc);
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx);
@ -142,10 +164,28 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex
return SUCCESS;
}
ShapeFuture::ShapeFuture(NodePtr src_node,
const vector<GeTensorDesc> &ShapeInferenceState::GetOutputTensorDesc() const {
return output_tensor_desc;
}
Status ShapeInferenceState::UpdateOutputDesc() {
for (size_t i = 0; i < output_tensor_desc.size(); ++i) {
auto src_tensor_desc = node_item.MutableOutputDesc(i);
GE_CHECK_NOTNULL(src_tensor_desc);
auto &dst_tensor_desc = output_tensor_desc[i];
dst_tensor_desc.SetShape(src_tensor_desc->MutableShape());
dst_tensor_desc.SetOriginShape(src_tensor_desc->GetOriginShape());
int64_t tensor_size = -1;
(void) TensorUtils::GetSize(*src_tensor_desc, tensor_size);
(void) TensorUtils::SetSize(dst_tensor_desc, tensor_size);
}
return SUCCESS;
}
ShapeFuture::ShapeFuture(NodeState *src_node,
uint32_t src_index,
SubgraphContext *subgraph_context)
: src_node_(std::move(src_node)), src_index_(src_index), subgraph_context_(subgraph_context) {
: src_node_(src_node), src_index_(src_index), subgraph_context_(subgraph_context) {
}
NodeState::NodeState(const NodeItem &node_item, SubgraphContext *subgraph_context)
@ -187,6 +227,13 @@ Status NodeState::WaitForPrepareDone() {
return SUCCESS;
}
Status NodeState::UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape) {
auto self_tensor_desc = op_desc_->MutableOutputDesc(index);
GE_CHECK_NOTNULL(self_tensor_desc);
self_tensor_desc->SetShape(shape);
self_tensor_desc->SetOriginShape(ori_shape);
return SUCCESS;
}
void NodeState::SetTaskContext(std::shared_ptr<TaskContext> &task_context) {
task_context_ = task_context;
@ -198,17 +245,19 @@ std::shared_ptr<TaskContext> NodeState::GetTaskContext() {
Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) {
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled");
shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape();
ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape();
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
auto &output_desc = src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
shape = output_desc.GetShape();
ori_shape = output_desc.GetOriginShape();
GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str());
return SUCCESS;
}
Status ShapeFuture::GetTensorDesc(GeTensorDescPtr &tensor_desc) {
Status ShapeFuture::GetTensorDesc(const GeTensorDesc **tensor_desc) {
GE_CHECK_NOTNULL(tensor_desc);
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled");
tensor_desc = src_node_->GetOpDesc()->MutableOutputDesc(src_index_);
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_->GetNodeItem()->node), "cancelled");
*tensor_desc = &src_node_->GetShapeInferenceState().GetOutputTensorDesc().at(src_index_);
return SUCCESS;
}
} // namespace hybrid

@ -30,16 +30,17 @@ class NodeTask;
struct GraphExecutionContext;
class SubgraphContext;
class TaskContext;
class NodeState;
class ShapeFuture {
public:
ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context);
ShapeFuture(NodeState *src_node, uint32_t src_index, SubgraphContext *subgraph_context);
~ShapeFuture() = default;
Status Get(GeShape &ori_shape, GeShape &shape);
Status GetTensorDesc(GeTensorDescPtr &tensor_desc);
Status GetTensorDesc(const GeTensorDesc **tensor_desc);
private:
NodePtr src_node_;
NodeState *src_node_;
uint32_t src_index_;
SubgraphContext *subgraph_context_;
};
@ -53,10 +54,19 @@ struct ShapeInferenceState {
Status AwaitShapesReady(const GraphExecutionContext &context);
Status UpdateOutputDesc();
const vector<GeTensorDesc> &GetOutputTensorDesc() const;
const NodeItem &node_item;
private:
friend struct NodeState;
std::vector<std::pair<int, ShapeFuture>> shape_futures;
// do not directly update op_desc, in case race condition across pipelines
std::vector<GeTensorDesc> input_tensor_desc;
std::vector<GeTensorDesc> output_tensor_desc;
int num_pending_shapes_ = 0;
std::condition_variable ready_cv_;
std::mutex mu_;
@ -88,6 +98,8 @@ struct NodeState {
return shape_inference_state_;
}
Status UpdateOutputShapes(int index, const GeShape &shape, const GeShape &ori_shape);
const shared_ptr<NodeTask> &GetKernelTask() const {
return kernel_task_;
}

@ -21,14 +21,11 @@
namespace ge {
namespace hybrid {
CallbackManager::CallbackManager(rtStream_t stream) : stream_(stream) {
}
Status CallbackManager::RegisterCallback(rtCallback_t callback, void *user_data) {
Status CallbackManager::RegisterCallback(rtStream_t stream, rtCallback_t callback, void *user_data) {
GELOGD("To register callback");
rtEvent_t event = nullptr;
GE_CHK_RT_RET(rtEventCreate(&event));
auto rt_ret = rtEventRecord(event, stream_);
auto rt_ret = rtEventRecord(event, stream);
if (rt_ret != RT_ERROR_NONE) {
GELOGE(RT_FAILED, "Failed to invoke rtEventRecord, error code = %d", rt_ret);
(void) rtEventDestroy(event);
@ -112,11 +109,11 @@ void CallbackManager::RtCallbackFunc(void *data) {
delete callback_func;
}
Status CallbackManager::RegisterCallback(const std::function<void()> &callback) {
Status CallbackManager::RegisterCallback(rtStream_t stream, const std::function<void()> &callback) {
auto func = std::unique_ptr<std::function<void()>>(new(std::nothrow) std::function<void()>(callback));
GE_CHECK_NOTNULL(func);
GELOGD("Callback registered");
return RegisterCallback(RtCallbackFunc, func.release());
return RegisterCallback(stream, RtCallbackFunc, func.release());
}
} // namespace hybrid
} // namespace ge

@ -30,23 +30,21 @@ namespace ge {
namespace hybrid {
class CallbackManager {
public:
explicit CallbackManager(rtStream_t stream);
CallbackManager() = default;
~CallbackManager() = default;
Status Init();
Status Destroy();
Status RegisterCallback(rtCallback_t callback, void *user_data);
Status RegisterCallback(const std::function<void()> &callback);
Status RegisterCallback(rtStream_t stream, rtCallback_t callback, void *user_data);
Status RegisterCallback(rtStream_t stream, const std::function<void()> &callback);
private:
Status CallbackProcess(rtContext_t context);
static void RtCallbackFunc(void *data);
BlockingQueue<std::pair<rtEvent_t, std::pair<rtCallback_t, void *>>> callback_queue_;
rtStream_t stream_;
std::future<Status> ret_future_;
};
} // namespace hybrid

@ -24,6 +24,7 @@ namespace ge {
namespace hybrid {
namespace {
constexpr int kDefaultThreadNum = 4;
constexpr int kDefaultQueueSize = 16;
constexpr int kDataInputIndex = 0;
}
@ -31,7 +32,8 @@ SubgraphExecutor::SubgraphExecutor(const GraphItem *graph_item, GraphExecutionCo
: graph_item_(graph_item),
context_(context),
force_infer_shape_(force_infer_shape),
pre_run_pool_(kDefaultThreadNum) {
pre_run_pool_(kDefaultThreadNum),
ready_queue_(kDefaultQueueSize) {
}
SubgraphExecutor::~SubgraphExecutor() {
@ -169,7 +171,7 @@ Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vector<TensorValue
GE_CHECK_NOTNULL(node_state);
node_state->SetKernelTask(node_item->kernel_task);
known_shape_task_context_ = TaskContext::Create(*node_item, context_, subgraph_context_.get());
known_shape_task_context_ = TaskContext::Create(node_state.get(), context_, subgraph_context_.get());
GE_CHECK_NOTNULL(known_shape_task_context_);
HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_),
@ -201,11 +203,11 @@ Status SubgraphExecutor::ExecuteAsync(TaskContext &task_context) {
return SUCCESS;
}
Status SubgraphExecutor::PrepareNodes() {
GELOGD("[%s] Start to prepare nodes. force infer shape = %s.",
Status SubgraphExecutor::PrepareNodes(int group) {
GELOGD("[%s] Start to prepare nodes. group = %d",
graph_item_->GetName().c_str(),
force_infer_shape_ ? "true" : "false");
auto &all_nodes = graph_item_->GetAllNodes();
group);
auto &all_nodes = graph_item_->GetAllNodes(group);
for (auto all_node : all_nodes) {
auto &node_item = *all_node;
// for while op
@ -240,7 +242,8 @@ Status SubgraphExecutor::PrepareNodes() {
} else {
node_state->SetKernelTask(node_item.kernel_task);
}
auto unique_task_context = TaskContext::Create(*node_state->GetNodeItem(), context_, subgraph_context_.get());
auto unique_task_context =
TaskContext::Create(node_state.get(), context_, subgraph_context_.get());
GE_CHECK_NOTNULL(unique_task_context);
const auto &task = node_state->GetKernelTask();
if (task == nullptr) {
@ -265,15 +268,17 @@ Status SubgraphExecutor::PrepareNodes() {
GELOGD("[%s] Push node [%s] to queue.", graph_item_->GetName().c_str(), node_item.NodeName().c_str());
}
GELOGD("[%s] Done preparing nodes successfully.", graph_item_->GetName().c_str());
return SUCCESS;
}
Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) {
const auto &node_item = *node_state.GetNodeItem();
Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) const {
GetContext().SetSessionId(context_->context_id);
HYBRID_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state),
"[%s] Failed to InferShape.", node_state.GetName().c_str());
HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item),
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str());
"[%s] Failed to InferShape.", node_state.GetName().c_str());
GetContext().SetSessionId(context_->session_id);
HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_state),
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str());
return SUCCESS;
}
@ -285,7 +290,7 @@ Status SubgraphExecutor::PrepareForExecution(GraphExecutionContext *ctx, NodeSta
} else {
node_state.SetKernelTask(node_item.kernel_task);
}
auto unique_task_context = TaskContext::Create(*node_state.GetNodeItem(), context_, subgraph_context_.get());
auto unique_task_context = TaskContext::Create(&node_state, context_, subgraph_context_.get());
GE_CHECK_NOTNULL(unique_task_context);
const auto &task = node_state.GetKernelTask();
if (task == nullptr) {
@ -336,11 +341,11 @@ Status SubgraphExecutor::LaunchTasks() {
}
}
Status SubgraphExecutor::ScheduleTasks() {
Status SubgraphExecutor::ScheduleTasks(int group) {
GELOGD("[%s] Start to schedule prepare workers.", graph_item_->GetName().c_str());
auto prepare_future = std::async(std::launch::async, [&]() -> Status {
GetContext().SetSessionId(context_->session_id);
auto ret = PrepareNodes();
auto ret = PrepareNodes(group);
ready_queue_.Push(nullptr);
return ret;
});
@ -481,5 +486,14 @@ Status SubgraphExecutor::EnableOutputZeroCopy(const vector<TensorValue> &outputs
GELOGD("Done enabling zero copy for outputs successfully.");
return SUCCESS;
}
Status SubgraphExecutor::PartialExecuteAsync(int task_group) {
return ScheduleTasks(task_group);
}
Status SubgraphExecutor::InitForPartialExecution(const vector<TensorValue> &inputs,
const vector<ConstGeTensorDescPtr> &input_desc) {
return Init(inputs, input_desc);
}
} // namespace hybrid
} // namespace ge

@ -36,6 +36,11 @@ class SubgraphExecutor {
SubgraphExecutor(const GraphItem *graph_item, GraphExecutionContext *context, bool force_infer_shape = false);
~SubgraphExecutor();
Status InitForPartialExecution(const std::vector<TensorValue> &inputs,
const std::vector<ConstGeTensorDescPtr> &input_desc);
Status PartialExecuteAsync(int task_group);
/**
* Execute subgraph async, output tensor address(not data) and output tensor descriptions are
* valid after this method returned
@ -89,15 +94,15 @@ class SubgraphExecutor {
private:
Status PrepareForExecution(GraphExecutionContext *ctx, NodeState &node_state);
Status EnableOutputZeroCopy(const std::vector<TensorValue> &outputs);
static Status InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state);
Status InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) const;
Status Init(const std::vector<TensorValue> &inputs,
const std::vector<ConstGeTensorDescPtr> &input_desc);
Status InitInputsForUnknownShape(const std::vector<TensorValue> &inputs,
const std::vector<ConstGeTensorDescPtr> &input_desc);
Status InitInputsForKnownShape(const std::vector<TensorValue> &inputs);
Status ExecuteAsyncForKnownShape(const std::vector<TensorValue> &inputs);
Status ScheduleTasks();
Status PrepareNodes();
Status ScheduleTasks(int group = -1);
Status PrepareNodes(int group = -1);
Status LaunchTasks();
Status SetOutputsToParentNode(TaskContext &task_context);

@ -125,16 +125,16 @@ Status NodeDoneCallback::PrepareConstInputs(const NodeItem &node_item) {
RT_MEMCPY_DEVICE_TO_HOST));
}
tensor.SetData(std::move(host_buffer));
string session_id = std::to_string(context_->GetSessionId());
string context_id = std::to_string(graph_context_->context_id);
RuntimeInferenceContext *runtime_infer_ctx = nullptr;
GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::GetContext(session_id, &runtime_infer_ctx),
"Failed to get RuntimeInferenceContext, session_id = %s", session_id.c_str());
GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::GetContext(context_id, &runtime_infer_ctx),
"Failed to get RuntimeInferenceContext, context_id = %s", context_id.c_str());
GE_CHK_STATUS_RET(runtime_infer_ctx->SetTensor(node_item.node_id, output_idx, std::move(tensor)),
"Failed to SetTensor, node = %s, output_index = %d", node_item.NodeName().c_str(), output_idx);
GELOGD("[%s] Output[%d] cached successfully in session: %s. node_id = %d, shape = [%s]",
GELOGD("[%s] Output[%d] cached successfully in context: %s. node_id = %d, shape = [%s]",
node_item.NodeName().c_str(),
output_idx,
session_id.c_str(),
context_id.c_str(),
node_item.node_id,
ge_tensor_desc->GetShape().ToString().c_str());
@ -332,6 +332,7 @@ Status NodeDoneCallback::OnNodeDone() {
if (node_item.shape_inference_type == DEPEND_SHAPE_RANGE || node_item.shape_inference_type == DEPEND_COMPUTE) {
// update output tensor sizes
GE_CHK_STATUS_RET_NOLOG(ShapeInferenceEngine::CalcOutputTensorSizes(node_item));
GE_CHK_STATUS_RET_NOLOG(context_->GetNodeState()->GetShapeInferenceState().UpdateOutputDesc());
}
// PropagateOutputs for type == DEPEND_COMPUTE
if (node_item.shape_inference_type == DEPEND_COMPUTE) {
@ -363,7 +364,7 @@ Status ExecutionEngine::ExecuteAsync(NodeState &node_state,
RECORD_EXECUTION_EVENT(&execution_context, task_context->GetNodeName(), "Start");
auto cb = std::shared_ptr<NodeDoneCallback>(new(std::nothrow) NodeDoneCallback(&execution_context, task_context));
GE_CHECK_NOTNULL(cb);
auto callback = [&, cb]() {
auto callback = [task_context, cb]() {
auto ret = cb->OnNodeDone();
if (ret != SUCCESS) {
task_context->OnError(ret);

@ -109,7 +109,8 @@ Status ShapeInferenceEngine::AwaitDependentNodes(NodeState &node_state) {
return SUCCESS;
}
Status ShapeInferenceEngine::PropagateOutputShapes(const NodeItem &node_item) {
Status ShapeInferenceEngine::PropagateOutputShapes(NodeState &node_state) {
auto &node_item = *node_state.GetNodeItem();
if (node_item.is_output_shape_static) {
return SUCCESS;
}
@ -140,9 +141,8 @@ Status ShapeInferenceEngine::PropagateOutputShapes(const NodeItem &node_item) {
// in case type 3 and 4, shape will be valid after computing is done
auto &infer_state = dst_node_state->GetShapeInferenceState();
if (shape_is_future) {
ShapeFuture future(node_item.node, i, subgraph_context_);
infer_state.UpdateInputShapeFuture(dst_input_index_and_node.first,
std::move(future));
ShapeFuture future(&node_state, i, subgraph_context_);
infer_state.UpdateInputShapeFuture(dst_input_index_and_node.first, std::move(future));
} else {
GE_CHK_STATUS_RET_NOLOG(infer_state.UpdateInputShape(dst_input_index_and_node.first, *output_desc));
}

@ -32,7 +32,7 @@ class ShapeInferenceEngine {
Status InferShapeForSubgraph(const NodeItem &node_item, const FusedSubgraph &fused_subgraph);
Status PropagateOutputShapes(const NodeItem &node_item);
Status PropagateOutputShapes(NodeState &node_state);
static Status CalcOutputTensorSizes(const NodeItem &node_item, bool fallback_with_range = false);

@ -30,6 +30,19 @@ const vector<NodeItem *> &hybrid::GraphItem::GetAllNodes() const {
return node_items_;
}
const vector<NodeItem *> &GraphItem::GetAllNodes(int group) const {
if (group == -1) {
return GetAllNodes();
}
if (group >= static_cast<int>(grouped_node_items_.size())) {
static vector<NodeItem *> empty_nodes;
return empty_nodes;
}
return grouped_node_items_[group];
}
const vector<const NodeItem *> &GraphItem::GetInputNodes() const {
return input_nodes_;
}
@ -74,5 +87,28 @@ const NodeItem *GraphItem::GetOutputNode() const {
const vector<std::pair<const NodeItem *, int>> &GraphItem::GetOutputEdges() const {
return output_edges_;
}
Status GraphItem::GroupNodes() {
int last_group = INT32_MIN;
std::set<int> seen_groups;
for (auto node : node_items_) {
int group = node->group;
if (group != last_group) {
if (seen_groups.find(group) != seen_groups.end()) {
GELOGE(INTERNAL_ERROR, "Unordered node group found. node = %s, group = %d", node->NodeName().c_str(), group);
return INTERNAL_ERROR;
} else {
last_group = group;
seen_groups.insert(group);
grouped_node_items_.emplace_back(std::vector<NodeItem *>());
}
}
GELOGD("Adding node [%s] to group %d", node->NodeName().c_str(), group);
grouped_node_items_.back().emplace_back(node);
}
return SUCCESS;
}
} // namespace hybrid
} // namespace ge

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save