diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index d84bb89a..3dd90b13 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -46,7 +46,7 @@ target_compile_definitions(ge_proto_common PRIVATE ) target_compile_options(ge_proto_common PRIVATE - -O2 + -O0 -g -fno-common ) @@ -72,7 +72,7 @@ target_include_directories(ge_proto_client PRIVATE ) target_compile_options(ge_proto_client PRIVATE - -O2 + -O0 -g -fno-common ) @@ -720,7 +720,7 @@ target_compile_definitions(ge_runner PRIVATE ) target_compile_options(ge_runner PRIVATE - -O2 + -O0 -g -fno-common -fvisibility=hidden $<$:-Werror=unused-variable> @@ -796,7 +796,7 @@ target_compile_definitions(ge_compiler PRIVATE ) target_compile_options(ge_compiler PRIVATE - -O2 + -O0 -g -fno-common -fvisibility=hidden $<$:-Werror=unused-variable> @@ -891,7 +891,7 @@ target_compile_definitions(opensrc_ascendcl PRIVATE ) target_compile_options(opensrc_ascendcl PRIVATE - -O2 + -O0 -g -fvisibility=hidden ) diff --git a/ge/client/ge_api.cc b/ge/client/ge_api.cc index 0c63c6e3..379b6f02 100644 --- a/ge/client/ge_api.cc +++ b/ge/client/ge_api.cc @@ -597,8 +597,7 @@ Status Session::RegisterCallBackFunc(const char *key, const session::pCallBackFu return ge::GELib::GetInstance()->SessionManagerObj().RegisterCallBackFunc(sessionId_, str_key, callback); } -// Build Graph -Status Session::BuildGraph(uint32_t graph_id, const std::vector &inputs) { +Status Session::BuildGraph(uint32_t graph_id, const std::vector &inputs) { ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther); ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id); std::shared_ptr instance_ptr = ge::GELib::GetInstance(); @@ -620,8 +619,7 @@ Status Session::BuildGraph(uint32_t graph_id, const std::vector return SUCCESS; } -// Run Graph Asynchronously -Status Session::RunGraphAsync(uint32_t graph_id, const std::vector &inputs, +Status Session::RunGraphAsync(uint32_t graph_id, const std::vector &inputs, RunAsyncCallback callback) { ErrorManager::GetInstance().SetStage(ErrorMessage::kModelExecute, ErrorMessage::kModelExecute); ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id); diff --git a/ge/executor/ge_executor.cc b/ge/executor/ge_executor.cc index 4081bdf2..7e6752ce 100755 --- a/ge/executor/ge_executor.cc +++ b/ge/executor/ge_executor.cc @@ -72,7 +72,7 @@ void GetDomiInputData(const ge::RunModelData &input_data, ge::InputData &inputs) inputs.timeout = input_data.timeout; inputs.request_id = input_data.request_id; for (const auto &data_item : input_data.blobs) { - ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare}; + ge::DataBuffer dataBuf{data_item.data, data_item.length, 0, data_item.isDataSupportMemShare}; inputs.blobs.emplace_back(dataBuf); } } @@ -81,7 +81,7 @@ void GetDomiOutputData(const ge::RunModelData &output_data, ge::OutputData &outp outputs.index = output_data.index; outputs.model_id = output_data.modelId; for (const auto &data_item : output_data.blobs) { - ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare); + ge::DataBuffer dataBuf(data_item.data, data_item.length, 0, data_item.isDataSupportMemShare); outputs.blobs.emplace_back(dataBuf); } } diff --git a/ge/graph/execute/graph_execute.cc b/ge/graph/execute/graph_execute.cc index d924302c..2081360d 100755 --- a/ge/graph/execute/graph_execute.cc +++ b/ge/graph/execute/graph_execute.cc @@ -380,7 +380,7 @@ Status GraphExecutor::ExecuteGraph(GraphId graph_id, const GeRootModelPtr &ge_ro } Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model, - const std::vector &input_tensor) { + const std::vector &input_tensor) { GELOGI("[GraphExecutor] Start to async execute graph, graph_id=%u", graph_id); if (graph_id != last_graph_id_) { auto ret = FreeExecuteMemory(); @@ -400,7 +400,7 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr & return SUCCESS; } -Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector &inputs) { +Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector &inputs) { try { auto model_manager = ge::ModelManager::GetInstance(); GE_CHECK_NOTNULL(model_manager); diff --git a/ge/graph/execute/graph_execute.h b/ge/graph/execute/graph_execute.h index d2a92e47..2846294d 100755 --- a/ge/graph/execute/graph_execute.h +++ b/ge/graph/execute/graph_execute.h @@ -50,7 +50,7 @@ class GraphExecutor { std::vector &output_tensor); ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model, - const std::vector &input_tensor); + const std::vector &input_tensor); Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr listener); @@ -123,7 +123,7 @@ class GraphExecutor { Status SyncExecuteModel(uint32_t model_id, const std::vector &input_tensor, std::vector &output_tensor); - Status AsyncExecuteModel(uint32_t model_id, const std::vector &input_tensor); + Status AsyncExecuteModel(uint32_t model_id, const std::vector &input_tensor); void InitModelIdInfo(std::vector &out_model_id_info, std::vector &sub_graph_vec, uint32_t output_size); diff --git a/ge/graph/load/model_manager/davinci_model.cc b/ge/graph/load/model_manager/davinci_model.cc index 78f4a64c..120f43ee 100755 --- a/ge/graph/load/model_manager/davinci_model.cc +++ b/ge/graph/load/model_manager/davinci_model.cc @@ -123,6 +123,7 @@ const char* const kInferEndTime = "infer_end_time"; const char* const kOutputBeginTime = "output_start_time"; const char* const kOutputEndTime = "output_end_time"; const uint32_t kStringHeadElems = 2; +const uint32_t kPlacementHost = 0; inline bool IsDataOp(const std::string &node_type) { return (node_type == DATA_TYPE) || (node_type == AIPP_DATA_TYPE) || (node_type == ANN_DATA_TYPE); @@ -2230,8 +2231,7 @@ Status DavinciModel::GetOutputDescInfo(vector &output_descs return SUCCESS; } -Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data) { - rtMemcpyKind_t kind = device_data ? RT_MEMCPY_DEVICE_TO_DEVICE : RT_MEMCPY_HOST_TO_DEVICE; +Status DavinciModel::CopyInputData(const InputData &input_data) { const std::vector &blobs = input_data.blobs; for (const auto &data : input_data_info_) { if (data.first >= blobs.size()) { @@ -2242,8 +2242,9 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data data.second.GetOpName().c_str()); return FAILED; } - + const DataBuffer &data_buf = blobs[data.first]; + rtMemcpyKind_t kind = data_buf.placement == kPlacementHost ? RT_MEMCPY_HOST_TO_DEVICE : RT_MEMCPY_DEVICE_TO_DEVICE; if (data_buf.length == 0) { GELOGW("No data need to memcpy!"); return SUCCESS; @@ -2549,6 +2550,7 @@ Status DavinciModel::CopyOutputData(uint32_t data_id, OutputData &output_data, r uint64_t buffer_length = buffer.length; void *buffer_addr = reinterpret_cast(reinterpret_cast(buffer.data)); + GELOGI("=========bufer data is %p",buffer_addr); GELOGI("CopyPlainData memcpy graph_%u type[F] output[%u] memaddr[%p] mem_size[%lu] datasize[%lu]", runtime_param_.graph_id, output.first, output.second.GetBasicAddr(), data_size, buffer_length); GE_CHK_RT_RET(rtMemcpy(buffer_addr, buffer_length, output.second.GetBasicAddr(), data_size, kind)); @@ -2584,7 +2586,7 @@ Status DavinciModel::InitOutputTensorInfo(const OpDescPtr &op_desc) { return SUCCESS; } -Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector &outputs) { +Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector &outputs) { GE_CHECK_NOTNULL(output_data); if (!output_data->blobs.empty()) { GELOGI("No need to generate output tensor info, model id:%u", model_id_); @@ -2613,21 +2615,36 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector data_buf(new (std::nothrow) uint8_t[output_buffer_size[i]]); - if (data_buf == nullptr) { - REPORT_CALL_ERROR("E19999", "New buffer failed, size:%ld, model_id:%u", - output_buffer_size[i], model_id_); - GELOGE(GE_GRAPH_MALLOC_FAILED, "Malloc buffer failed."); - return GE_GRAPH_MALLOC_FAILED; - } - output_data->blobs.push_back({data_buf.get(), static_cast(output_buffer_size[i]), false}); - OutputTensorInfo output; - output.dims = output_shape_info[i]; - output.data = std::move(data_buf); - output.length = output_buffer_size[i]; - outputs.emplace_back(std::move(output)); + //std::unique_ptr data_buf(new (std::nothrow) uint8_t[output_buffer_size[i]]); + //AlignedPtr aligned_ptr(output_buffer_size[i],64); + //auto data_buf = aligned_ptr.MutableGet(); + // if (data_buf == nullptr) { + // REPORT_CALL_ERROR("E19999", "New buffer failed, size:%ld, model_id:%u when DavinciModel %s", + // output_buffer_size[i], model_id_, __FUNCTION__); + // GELOGE(GE_GRAPH_MALLOC_FAILED, "Malloc buffer failed."); + // return GE_GRAPH_MALLOC_FAILED; + // } + auto aligned_ptr = MakeShared(output_buffer_size[i],64); + GeShape ge_shape(output_shape_info[i]); + GeTensorDesc tensor_desc; + tensor_desc.SetShape(ge_shape); + GeTensor ge_tensor(tensor_desc); + ge_tensor.SetData(aligned_ptr,output_buffer_size[i]); + ge::Tensor output_tensor= TensorAdapter::AsTensor(ge_tensor); + + auto data_ptr = aligned_ptr->MutableGet(); + //output_data->blobs.push_back({reinterpret_cast()}) + // auto data_ptr = data_buf; + // data_buf = nullptr; + output_data->blobs.push_back({reinterpret_cast(data_ptr), static_cast(output_buffer_size[i]), 0, false}); + //ge::Shape shape(output_shape_info[i]); + //TensorDesc tensor_desc; + //tensor_desc.SetShape(shape); + //ge::Tensor output_tensor(tensor_desc); + //output_tensor.SetData(data_ptr, static_cast(output_buffer_size[i]),aligned_ptr.GetDeleter()); + outputs.emplace_back(std::move(output_tensor)); GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i, - formats::JoinToString(output.dims).c_str(), output.length); + formats::JoinToString(output_shape_info[i]).c_str(), output_buffer_size[i]); } return SUCCESS; @@ -2647,7 +2664,7 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector outputs; + std::vector outputs; // return result is not required if (!rslt_flg && !seq_end_flag) { @@ -2711,7 +2728,7 @@ Status DavinciModel::ReturnNoOutput(uint32_t data_id) { GELOGI("ReturnNoOutput model id:%u.", model_id_); GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null!"); - std::vector outputs; + std::vector outputs; GE_CHK_STATUS(listener_->OnComputeDone(model_id_, data_id, SUCCESS, outputs), "OnComputeDone failed."); return SUCCESS; } @@ -2766,7 +2783,7 @@ void *DavinciModel::Run(DavinciModel *model) { GELOGI("Copy input data, model id:%u", model_id); GE_IF_BOOL_EXEC(ProfilingManager::Instance().ProfilingModelExecuteOn(), model->SetProfileTime(MODEL_PRE_PROC_START)); - ret = model->CopyInputData(current_data, false); + ret = model->CopyInputData(current_data); GE_CHK_BOOL_TRUE_EXEC_WITH_LOG( ret != SUCCESS, (void)model->ReturnResult(current_data.index, false, false, data_wrapper->GetOutput()); CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC); diff --git a/ge/graph/load/model_manager/davinci_model.h b/ge/graph/load/model_manager/davinci_model.h index 30240f25..78dda070 100755 --- a/ge/graph/load/model_manager/davinci_model.h +++ b/ge/graph/load/model_manager/davinci_model.h @@ -620,7 +620,7 @@ class DavinciModel { Status UpdateIoTaskArgs(const map &data_info, bool is_input, const vector &blobs, bool is_dynamic, const string &batch_label); - Status CopyInputData(const InputData &input_data, bool device_data = false); + Status CopyInputData(const InputData &input_data); Status CopyOutputData(uint32_t data_id, OutputData &output_data, rtMemcpyKind_t kind); @@ -865,7 +865,7 @@ class DavinciModel { Status SinkTimeProfile(const InputData ¤t_data); Status InitOutputTensorInfo(const OpDescPtr &op_desc); - Status GenOutputTensorInfo(OutputData *output_data, vector &outputs); + Status GenOutputTensorInfo(OutputData *output_data, vector &outputs); Status InitInputDescInfo(const OpDescPtr &op_desc); Status InitOutputDescInfo(const OpDescPtr &op_desc, const vector &out_node_name); diff --git a/ge/graph/load/model_manager/model_manager.cc b/ge/graph/load/model_manager/model_manager.cc index 84259731..72e0b0ae 100755 --- a/ge/graph/load/model_manager/model_manager.cc +++ b/ge/graph/load/model_manager/model_manager.cc @@ -533,7 +533,7 @@ Status ModelManager::GetCurDynamicDims(const vector> &user_real_ /// @brief load Input and output TensorInfo for Model /// @return Status run result /// -Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector &inputs) { +Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector &inputs) { std::shared_ptr model = GetModel(model_id); auto hybrid_model = GetHybridModel(model_id); if (hybrid_model == nullptr) { @@ -547,9 +547,11 @@ Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector(const_cast(inputs[i].GetData())); + data.length = inputs[i].GetSize(); + data.placement = tensor_desc.GetPlacement(); + input_data.shapes.emplace_back(tensor_desc.GetShape().GetDims()); input_data.blobs.push_back(data); } if (!GetLocalOmgContext().user_input_dims.empty() && GetLocalOmgContext().need_multi_batch) { diff --git a/ge/graph/load/model_manager/model_manager.h b/ge/graph/load/model_manager/model_manager.h index b537943b..db38f524 100755 --- a/ge/graph/load/model_manager/model_manager.h +++ b/ge/graph/load/model_manager/model_manager.h @@ -122,7 +122,7 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { /// ge::Status DataInput(const InputData &input_data, OutputData &output_data); - ge::Status DataInputTensor(uint32_t model_id, const std::vector &inputs); + ge::Status DataInputTensor(uint32_t model_id, const std::vector &inputs); /// /// @ingroup domi_ome diff --git a/ge/graph/manager/graph_manager.cc b/ge/graph/manager/graph_manager.cc index f7357d9d..68fb2979 100755 --- a/ge/graph/manager/graph_manager.cc +++ b/ge/graph/manager/graph_manager.cc @@ -2769,7 +2769,7 @@ Status GraphManager::ProcessSubGraphWithMultiThreads(GraphManager *graph_manager } // run graph async on session -Status GraphManager::RunGraphAsync(const GraphId &graph_id, const std::vector &inputs, +Status GraphManager::RunGraphAsync(const GraphId &graph_id, const std::vector &inputs, uint64_t session_id, RunAsyncCallback callback) { ErrorManager::GetInstance().SetStage(ErrorMessage::kModelExecute, ErrorMessage::kModelExecute); GELOGI("[GraphManager] Start to run graph async, graph_id=%u, inputsSize=%zu.", graph_id, inputs.size()); @@ -2841,13 +2841,13 @@ Status GraphManager::IncreBuild(const GraphNodePtr &graph_node, GeModelPtr &ge_m return FAILED; } -void GraphManager::ConstructGeInput(const vector &inputs, vector &ge_inputs) { - for (auto const &input : inputs) { - GeTensorDesc input_tensor_desc(GeShape(input.dims)); - input_tensor_desc.SetDataType(static_cast(input.data_type)); - ge_inputs.emplace_back(input_tensor_desc); - } -} +// void GraphManager::ConstructGeInput(const vector &inputs, vector &ge_inputs) { +// for (auto const &input : inputs) { +// GeTensorDesc input_tensor_desc(GeShape(input.dims)); +// input_tensor_desc.SetDataType(static_cast(input.data_type)); +// ge_inputs.emplace_back(input_tensor_desc); +// } +// } void GraphManager::PreRunThread(GraphManager *graph_manager) { if (prctl(PR_SET_NAME, ("GE_PreRun")) != 0) { @@ -2923,7 +2923,10 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { GeModelPtr ge_model = nullptr; if (graph_manager->IncreBuild(graph_node, ge_model) != SUCCESS) { std::vector ge_inputs; - ConstructGeInput(args.input_tensor, ge_inputs); + for (const auto &item: args.input_tensor) { + ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item)); + } + //ConstructGeInput(args.input_tensor, ge_inputs); ret = graph_manager->PreRun(graph_node, ge_inputs, ge_root_model, args.session_id); // release rts generate context RtContextUtil::GetInstance().DestroyRtContexts(args.session_id, graph_node->GetGraphId()); @@ -2952,20 +2955,25 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { } } -void GraphManager::ParseInputsDimsForData(const std::vector &input_tensor) { +void GraphManager::ParseInputsDimsForData(const std::vector &input_tensor) { GELOGD("Start parse input dims from data."); for (size_t i = 0; i < input_tensor.size(); ++i) { std::vector dynamic_dim; - for (size_t j = 0; j < input_tensor[i].dims.size(); ++j) { - dynamic_dim.emplace_back(input_tensor[i].dims[j]); - } + // for (size_t j = 0; j < input_tensor[i].dims.size(); ++j) { + // dynamic_dim.emplace_back(input_tensor[i].dims[j]); + // } + const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); + const Shape &shape = tensor_desc.GetShape(); + auto shape_dims = shape.GetDims(); + std::copy(shape_dims.begin(), shape_dims.end(),std::back_inserter(dynamic_dim)); + GELOGD("Input tensor dims is %s.", formats::JoinToString(dynamic_dim).c_str()); - GetLocalOmgContext().user_real_input_dims.emplace_back(input_tensor[i].dims); + GetLocalOmgContext().user_real_input_dims.emplace_back(shape_dims); } } Status GraphManager::ParseInputsDimsForGetNexNosinkAndData(const vector &dynamic_nodes, - const std::vector &input_tensor) { + const std::vector &input_tensor) { GELOGD("Start parse inputs dims when coexist data and getnext sink."); for (size_t i = 0; i < dynamic_nodes.size(); ++i) { auto op_desc = dynamic_nodes.at(i)->GetOpDesc(); @@ -2988,13 +2996,17 @@ Status GraphManager::ParseInputsDimsForGetNexNosinkAndData(const vector return PARAM_INVALID; } - GetLocalOmgContext().user_real_input_dims.emplace_back(input_tensor.at(index).dims); - GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(input_tensor.at(index).dims).c_str()); + const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); + const Shape &shape = tensor_desc.GetShape(); + const auto &shape_dims = shape.GetDims(); + GetLocalOmgContext().user_real_input_dims.emplace_back(shape_dims); + GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(shape_dims).c_str()); + } return SUCCESS; } -Status GraphManager::ParseInputsDims(const std::vector &input_tensor) { +Status GraphManager::ParseInputsDims(const std::vector &input_tensor) { GELOGI("Start parse input dims of %zu input tensor.", input_tensor.size()); GetLocalOmgContext().user_real_input_dims.clear(); if (!GetLocalOmgContext().dynamic_node_type.empty()) { @@ -3122,13 +3134,13 @@ void GraphManager::ReturnError(GraphManager *graph_manager, RunAsyncCallback cal } StopQueue(graph_manager); GELOGE(ret, "%s.", log.c_str()); - std::vector outputs; + std::vector outputs; callback(ret, outputs); } void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_node, RunAsyncCallback callback, Status ret, const string &log) { - std::vector outputs; + std::vector outputs; auto compute_graph = GraphUtils::GetComputeGraph(*graph_node->GetGraph()); if (graph_manager == nullptr || compute_graph == nullptr) { REPORT_INNER_ERROR("E19999", "Param graph_manager or compute_graph in graph_node is nullptr, " @@ -3144,9 +3156,14 @@ void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_ } for (size_t i = 0; i < node->GetAllInDataAnchorsSize(); i++) { auto input_desc = node->GetOpDesc()->MutableInputDesc(i); - ge::OutputTensorInfo tensor; - tensor.dims = input_desc->GetShape().GetDims(); - tensor.data_type = static_cast(input_desc->GetDataType()); + ge::Shape shape(input_desc->GetShape().GetDims()); + TensorDesc tensor_desc; + tensor_desc.SetShape(shape); + tensor_desc.SetDataType(input_desc->GetDataType()); + Tensor tensor(tensor_desc); + // ge::OutputTensorInfo tensor; + // tensor.dims = input_desc->GetShape().GetDims(); + // tensor.data_type = static_cast(input_desc->GetDataType()); int64_t len = 1; if (input_desc->GetShape().GetDims() != std::vector({})) { len = input_desc->GetShape().GetShapeSize(); @@ -3181,11 +3198,15 @@ void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_ callback(GRAPH_FAILED, outputs); return; } - tensor.length = len * size; - tensor.data.reset(new(std::nothrow) uint8_t[tensor.length]); + auto length = len * size; + unique_ptr ptr(new (std::nothrow) uint8_t[length]); + tensor.SetData(ptr.get(), length, ptr.get_deleter()); + //tensor.SetData() + // tensor.length = len * size; + // tensor.data.reset(new(std::nothrow) uint8_t[tensor.length]); // To avoid global step too small and can not stop, totally set a bigger value - for (int64_t i = 0; i < tensor.length; i++) { - tensor.data[i] = 0x7F; // here stands for a positive max value + for (int64_t i = 0; i < length; i++) { + ptr[i] = 0x7F; // here stands for a positive max value } outputs.emplace_back(std::move(tensor)); } diff --git a/ge/graph/manager/graph_manager.h b/ge/graph/manager/graph_manager.h index b63b138a..46ff4116 100644 --- a/ge/graph/manager/graph_manager.h +++ b/ge/graph/manager/graph_manager.h @@ -149,7 +149,7 @@ class GraphManager { /// @param [out] callback: callback while run graph async finish /// @return Status result of function /// - Status RunGraphAsync(const GraphId &graph_id, const std::vector &inputs, + Status RunGraphAsync(const GraphId &graph_id, const std::vector &inputs, uint64_t session_id, RunAsyncCallback callback); /// @@ -194,7 +194,7 @@ class GraphManager { struct PreRunArgs { GraphId graph_id; - std::vector input_tensor; + std::vector input_tensor; uint64_t session_id; struct ErrorMessage::Context error_context; GEThreadLocalContext context; @@ -206,7 +206,7 @@ class GraphManager { GraphId graph_id; uint64_t session_id; struct ErrorMessage::Context error_context; - std::vector input_tensor; + std::vector input_tensor; GeRootModelPtr ge_root_model; GEThreadLocalContext context; RunAsyncCallback callback; @@ -225,10 +225,10 @@ class GraphManager { uint64_t session_id, const struct ErrorMessage::Context &error_context, const GEThreadLocalContext &ge_context); - Status ParseInputsDims(const std::vector &input_tensor); - void ParseInputsDimsForData(const std::vector &input_tensor); + Status ParseInputsDims(const std::vector &input_tensor); + void ParseInputsDimsForData(const std::vector &input_tensor); Status ParseInputsDimsForGetNexNosinkAndData(const vector &dynamic_nodes, - const std::vector &input_tensor); + const std::vector &input_tensor); Status RunCustomPass(const GraphNodePtr &graph_node); Status PreRun(const GraphNodePtr &graph_node, const std::vector &inputs, GeRootModelPtr &ge_root_model, uint64_t session_id = INVALID_SESSION_ID); @@ -339,7 +339,7 @@ class GraphManager { void RemoveModelCacheHelper(const GraphId &graph_id); ModelCacheHelperPtr FindModelCacheHelper(GraphId graph_id); - static void ConstructGeInput(const std::vector &inputs, std::vector &ge_inputs); + //static void ConstructGeInput(const std::vector &inputs, std::vector &ge_inputs); static void PreRunThread(GraphManager *graph_manager); static void RunThread(GraphManager *graph_manager); static void StopQueue(GraphManager *graph_manager); diff --git a/ge/graph/manager/graph_manager_utils.cc b/ge/graph/manager/graph_manager_utils.cc index 3a8d577c..130df2c4 100644 --- a/ge/graph/manager/graph_manager_utils.cc +++ b/ge/graph/manager/graph_manager_utils.cc @@ -104,7 +104,7 @@ GraphModelListener::GraphModelListener(std::mutex &mutex, std::condition_variabl : result_code_(0), is_finished_(false), mutex_(mutex), condition_(cond) {} Status GraphModelListener::OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result, - std::vector &outputs) { + std::vector &outputs) { GELOGI( "[GraphManager] graph compute call back, model_id:%u, task_id:%u, " "resultCode:%u.", @@ -141,10 +141,14 @@ void RunAsyncListener::SetCallback(const RunAsyncCallback &callback) { } Status RunAsyncListener::OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result, - std::vector &outputs) { + std::vector &outputs) { GELOGI("[GraphManager] run graph async call back, modelId:%u, taskId:%u, resultCode:%u.", model_id, task_id, result); GE_CHECK_NOTNULL(callback_); + for (auto it : outputs) { + auto addr = it.GetData(); + GELOGI("========= addr is %p",addr); + } callback_(result, outputs); uint8_t unused; sem_.Pop(unused); diff --git a/ge/graph/manager/graph_manager_utils.h b/ge/graph/manager/graph_manager_utils.h index cfe6588f..4547480a 100644 --- a/ge/graph/manager/graph_manager_utils.h +++ b/ge/graph/manager/graph_manager_utils.h @@ -129,7 +129,7 @@ class RunAsyncListener : public ge::ModelListener { // callback Status OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result, - std::vector &outputs) override; + std::vector &outputs) override; private: RunAsyncCallback callback_; @@ -202,7 +202,7 @@ class GraphModelListener : public ge::ModelListener { // callback Status OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result, - std::vector &outputs) override; + std::vector &outputs) override; Status ResetResult(); diff --git a/ge/hybrid/common/tensor_value.h b/ge/hybrid/common/tensor_value.h index 9f68cf2c..8f221f57 100644 --- a/ge/hybrid/common/tensor_value.h +++ b/ge/hybrid/common/tensor_value.h @@ -39,6 +39,12 @@ class TensorBuffer { TensorBuffer &operator = (const TensorBuffer &) = delete; ~TensorBuffer(); + void* Release() { + auto ret = buffer_; + buffer_ = nullptr; + return ret; + } + void *GetData() { return buffer_; } @@ -46,6 +52,10 @@ class TensorBuffer { size_t GetSize() const { return size_; } + + MemStorageType GetMemType() const { + return mem_type_; + } private: TensorBuffer(NpuMemoryAllocator *allocator, void *buffer, size_t size, MemStorageType mem_type = HBM); @@ -68,6 +78,10 @@ class TensorValue { void Destroy(); + void *Release() { + return buffer_->Release(); + } + bool IsEmpty() { return ref_buffer_ == nullptr && buffer_ == nullptr; } @@ -84,6 +98,10 @@ class TensorValue { size_t GetSize() const; + MemStorageType GetMemType() const { + return buffer_->GetMemType(); + } + private: std::shared_ptr buffer_; std::string name_; diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index ca505618..355a4f63 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -20,6 +20,11 @@ #include "graph/utils/type_utils.h" #include "graph/ge_context.h" #include "omm/csa_interact.h" +#include "graph/debug/ge_attr_define.h" +#include "graph/manager/graph_caching_allocator.h" +#include "graph/manager/graph_mem_allocator.h" +#include "graph/manager/rdma_pool_allocator.h" +#include "graph/manager/host_mem_allocator.h" namespace ge { namespace hybrid { @@ -27,6 +32,9 @@ namespace { const int kDataOutputIndex = 0; const size_t kMinimumPiplineStages = 2; const int kDefaultLoopCount = 10; +//const uint32_t kPlacementHost = 0; +const uint32_t kPlacementDevice = 1; +const char *const kLazyRecompile = "lazy_recompile"; } HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) : model_(model), run_flag_(false), data_dumper_(nullptr) { @@ -71,6 +79,8 @@ Status HybridModelAsyncExecutor::Start(const std::shared_ptr &lis GetThreadLocalContext() = *executor_->GetContext()->ge_context; GetContext().SetSessionId(executor_->GetContext()->session_id); GetContext().SetContextId(executor_->GetContext()->context_id); + GE_CHECK_NOTNULL(executor_->GetContext()->ge_context); + GetThreadLocalContext() = *executor_->GetContext()->ge_context; return RunInternal(); }); @@ -198,7 +208,7 @@ Status HybridModelAsyncExecutor::HandleResult(Status exec_ret, HybridModelExecutor::ExecuteArgs &args, OutputData *output_data) { GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret); - std::vector output_tensor_info_list; + std::vector output_tensor_info_list; if (args.is_eos) { GELOGI("End of sequence, model id = %u", model_id_); GE_CHK_STATUS_RET_NOLOG(OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list)); @@ -369,7 +379,7 @@ Status HybridModelAsyncExecutor::InitInputDesc() { } Status HybridModelAsyncExecutor::OnComputeDone(uint32_t data_index, uint32_t result_code, - std::vector &outputs) { + std::vector &outputs) { GELOGD("OnComputeDone. model id = %u, data index = %u, execution ret = %u", model_id_, data_index, result_code); if (listener_ != nullptr) { GE_CHK_STATUS(listener_->OnComputeDone(model_id_, data_index, result_code, outputs), @@ -381,7 +391,7 @@ Status HybridModelAsyncExecutor::OnComputeDone(uint32_t data_index, uint32_t res Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &args, OutputData *output_data, - std::vector &outputs) { + std::vector &outputs) { // copy output data from op to designated position std::vector &output_tensor_desc_list = args.output_desc; std::vector &output_tensors = args.outputs; @@ -396,6 +406,12 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a } GELOGD("Number of outputs = %zu", output_tensor_desc_list.size()); + string execute_mode; + auto result = ge::GetContext().GetOption(OPTION_EXEC_DYNAMIC_EXECUTE_MODE, execute_mode); + if (result != SUCCESS) { + GELOGW("Can not get dynamic execute mode attr"); + } + GELOGD("The dynamic execute is %s", execute_mode.c_str()); for (size_t i = 0; i < output_tensors.size(); ++i) { GELOGD("Start to process output[%zu]", i); auto &output_tensor = output_tensors[i]; @@ -430,27 +446,47 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a return INTERNAL_ERROR; } - ge::OutputTensorInfo output; - output.data_type = static_cast(tensor_desc->GetDataType()); - output.dims = tensor_desc->GetShape().GetDims(); - output.length = output_size; + ge::Shape shape(tensor_desc->GetShape().GetDims()); + TensorDesc desc; + desc.SetShape(shape); + ge::Tensor tensor(desc); if (output_size > 0) { - std::unique_ptr data_buf(new(std::nothrow) uint8_t[output_size]); - GE_CHECK_NOTNULL(data_buf); - GE_CHK_RT_RET(rtMemcpy(data_buf.get(), - output_size, - output_tensor.GetData(), - output_size, - RT_MEMCPY_DEVICE_TO_HOST)); - output.data = std::move(data_buf); - output_data->blobs.emplace_back(data_buf.get(), static_cast(output_size), false); + if (execute_mode != kLazyRecompile) { + std::unique_ptr data_buf(new(std::nothrow) uint8_t[output_size]); + GE_CHECK_NOTNULL(data_buf); + GE_CHK_RT_RET(rtMemcpy(data_buf.get(), + output_size, + output_tensor.GetData(), + output_size, + RT_MEMCPY_DEVICE_TO_HOST)); + tensor.SetData(data_buf.get(), static_cast(output_size),[](uint8_t *ptr) { + ptr = nullptr;}); + output_data->blobs.emplace_back(data_buf.get(), static_cast(output_size), 0, false); + } else { + auto mem_type = output_tensor.GetMemType(); + auto deleter = [=] (uint8_t *device_data) { + if (device_data != nullptr) { + GELOGI("Deallocating buffer successfully. addr = %p", device_data); + if (mem_type == RDMA_HBM) { + MemManager::Instance().RdmaPoolInstance(RT_MEMORY_HBM).Free(device_data, device_id_); + } else if (mem_type == HOST_DDR) { + MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Free(device_data); + } else { + MemManager::Instance().CachingInstance(RT_MEMORY_HBM).Free(device_data, device_id_); + } + } + }; + tensor.GetTensorDesc().SetPlacement(kPlacementDevice); + tensor.SetData(reinterpret_cast(output_tensor.Release()),static_cast(output_size), deleter); + output_data->blobs.emplace_back(output_tensor.Release(), static_cast(output_size), 1, false); + } } else { GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str()); - output.data = nullptr; - output_data->blobs.emplace_back(nullptr, 0U, false); + tensor.SetData(nullptr,0U,nullptr); + output_data->blobs.emplace_back(nullptr, 0U, 0, false); } - outputs.emplace_back(std::move(output)); + outputs.emplace_back(std::move(tensor)); GELOGD("Output[%zu] added, type = %s, shape = [%s], size = %ld", i, TypeUtils::DataTypeToSerialString(tensor_desc->GetDataType()).c_str(), @@ -508,7 +544,7 @@ Status HybridModelAsyncExecutor::Execute(const vector &inputs, vector< GELOGD("Done copying input data successfully."); GE_CHK_STATUS_RET(executor_->Execute(args), "[Invoke][Execute] Failed, model_id = %u.", model_id_); - std::vector output_tensor_info_list; + std::vector output_tensor_info_list; OutputData output_data; GE_CHK_STATUS_RET(CopyOutputs(args, &output_data, output_tensor_info_list), "[Invoke][CopyOutputs]Failed to copy outputs, model_id = %u.", model_id_); @@ -518,15 +554,15 @@ Status HybridModelAsyncExecutor::Execute(const vector &inputs, vector< outputs.resize(output_tensor_info_list.size()); for (auto &out_tensor_info : output_tensor_info_list) { auto &ge_tensor = outputs[out_index]; - if (out_tensor_info.length > 0) { - GE_CHK_GRAPH_STATUS_RET(ge_tensor.SetData(out_tensor_info.data.get(), out_tensor_info.length), + if (out_tensor_info.GetSize() > 0) { + GE_CHK_GRAPH_STATUS_RET(ge_tensor.SetData(out_tensor_info.GetData(), out_tensor_info.GetSize()), "Failed to set output[%d].", out_index); } ge_tensor.MutableTensorDesc() = *args.output_desc[out_index]; GELOGD("Set output[%d], tensor size = %ld, shape = [%s]", out_index, - out_tensor_info.length, + out_tensor_info.GetSize(), ge_tensor.MutableTensorDesc().MutableShape().ToString().c_str()); ++out_index; } diff --git a/ge/hybrid/executor/hybrid_model_async_executor.h b/ge/hybrid/executor/hybrid_model_async_executor.h index b6942b10..133e6cc5 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.h +++ b/ge/hybrid/executor/hybrid_model_async_executor.h @@ -69,9 +69,9 @@ class HybridModelAsyncExecutor { Status CopyOutputs(HybridModelExecutor::ExecuteArgs &args, OutputData *output_data, - std::vector &outputs); + std::vector &outputs); - Status OnComputeDone(uint32_t data_index, uint32_t result_code, std::vector &outputs); + Status OnComputeDone(uint32_t data_index, uint32_t result_code, std::vector &outputs); Status PreRun(InputData ¤t_data, HybridModelExecutor::ExecuteArgs &args); diff --git a/ge/session/inner_session.cc b/ge/session/inner_session.cc index e8b3ae0e..f4118814 100755 --- a/ge/session/inner_session.cc +++ b/ge/session/inner_session.cc @@ -337,19 +337,19 @@ Status InnerSession::RegisterCallBackFunc( return SUCCESS; } -Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector &inputs) { +Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector &inputs) { UpdateThreadContext(graph_id); GELOGI("[InnerSession:%lu] build graph on session, graph_id=%u.", session_id_, graph_id); std::vector ge_inputs; for (auto const &input : inputs) { - std::vector input_dims; - std::transform(input.dims.begin(), input.dims.end(), std::back_inserter(input_dims), - [](int64_t x) -> int64_t { return x; }); - GeShape input_shape(input_dims); - GeTensorDesc input_tensor_desc; - input_tensor_desc.SetShape(input_shape); - input_tensor_desc.SetDataType(static_cast(input.data_type)); - ge_inputs.emplace_back(input_tensor_desc); + // std::vector input_dims; + // std::transform(input.dims.begin(), input.dims.end(), std::back_inserter(input_dims), + // [](int64_t x) -> int64_t { return x; }); + // GeShape input_shape(input_dims); + // GeTensorDesc input_tensor_desc; + // input_tensor_desc.SetShape(input_shape); + // input_tensor_desc.SetDataType(static_cast(input.data_type)); + ge_inputs.emplace_back(TensorAdapter::AsGeTensor(input)); } GeRootModelPtr ge_root_model = nullptr; Status ret = graph_manager_.BuildGraph(graph_id, ge_inputs, ge_root_model, session_id_, true); @@ -363,7 +363,7 @@ Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector &inputs, +Status InnerSession::RunGraphAsync(uint32_t graph_id, const std::vector &inputs, RunAsyncCallback callback) { UpdateThreadContext(graph_id); GELOGI("[InnerSession:%lu] run graph on session, graph_id=%u.", session_id_, graph_id); diff --git a/ge/session/inner_session.h b/ge/session/inner_session.h index 5cab43d8..e8a8a914 100644 --- a/ge/session/inner_session.h +++ b/ge/session/inner_session.h @@ -43,9 +43,9 @@ class InnerSession { Status RemoveGraph(uint32_t graph_id); - Status BuildGraph(uint32_t graph_id, const std::vector &inputs); + Status BuildGraph(uint32_t graph_id, const std::vector &inputs); - Status RunGraphAsync(uint32_t graph_id, const std::vector &inputs, RunAsyncCallback callback); + Status RunGraphAsync(uint32_t graph_id, const std::vector &inputs, RunAsyncCallback callback); Status Finalize(); diff --git a/ge/session/session_manager.cc b/ge/session/session_manager.cc index 1e4efa6b..213e1a04 100755 --- a/ge/session/session_manager.cc +++ b/ge/session/session_manager.cc @@ -336,7 +336,7 @@ Status SessionManager::RegisterCallBackFunc( return innerSession->RegisterCallBackFunc(key, callback); } -Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector &inputs) { +Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector &inputs) { if (!init_flag_) { GELOGE(GE_SESSION_MANAGER_NOT_INIT, "[Build][Graph]fail for Session manager is not initialized," "session_id:%lu, graph_id:%u.", session_id, graph_id); @@ -358,7 +358,7 @@ Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const } Status SessionManager::RunGraphAsync(SessionId session_id, uint32_t graph_id, - const std::vector &inputs, RunAsyncCallback callback) { + const std::vector &inputs, RunAsyncCallback callback) { if (!init_flag_) { GELOGE(GE_SESSION_MANAGER_NOT_INIT, "[AsyncRun][Graph]fail for Session manager is not initialized, session_id:%lu, graph_id:%u.", diff --git a/ge/session/session_manager.h b/ge/session/session_manager.h index da23219c..ca5f9c4a 100644 --- a/ge/session/session_manager.h +++ b/ge/session/session_manager.h @@ -123,7 +123,7 @@ class SessionManager { /// @param [in] inputs input data /// @return Status result of function /// - Status BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector &inputs); + Status BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector &inputs); /// /// @ingroup ge_session @@ -133,7 +133,7 @@ class SessionManager { /// @param [in] inputs input data /// @return Status result of function /// - Status RunGraphAsync(SessionId session_id, uint32_t graph_id, const std::vector &inputs, + Status RunGraphAsync(SessionId session_id, uint32_t graph_id, const std::vector &inputs, RunAsyncCallback callback); /// diff --git a/ge/single_op/task/op_task.cc b/ge/single_op/task/op_task.cc index fbc3d68b..01a19eea 100755 --- a/ge/single_op/task/op_task.cc +++ b/ge/single_op/task/op_task.cc @@ -873,7 +873,7 @@ Status AiCpuTask::LaunchKernel(const std::vector &input_desc, if (unknown_type_ == DEPEND_COMPUTE) { std::vector summary_buffers; for (size_t i = 0; i < num_outputs_; ++i) { - summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false); + summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), 0, false); } GE_CHK_STATUS_RET_NOLOG(UpdateIoAddr(input_buffers, summary_buffers)); } else { diff --git a/inc/external/ge/ge_api.h b/inc/external/ge/ge_api.h index c8b5a8ec..954936d3 100644 --- a/inc/external/ge/ge_api.h +++ b/inc/external/ge/ge_api.h @@ -28,7 +28,6 @@ namespace ge { typedef uint32_t (*pCallBackFunc)(uint32_t graph_id, const std::map ¶ms_list); - namespace session { typedef uint32_t (*pCallBackFunc)(uint32_t graph_id, const std::map ¶ms_list); } @@ -128,7 +127,7 @@ class GE_FUNC_VISIBILITY Session { /// @param [in] inputs: input data /// @return Status result of function /// - Status BuildGraph(uint32_t graphId, const std::vector &inputs); + Status BuildGraph(uint32_t graphId, const std::vector &inputs); /// /// @ingroup ge_graph @@ -140,7 +139,7 @@ class GE_FUNC_VISIBILITY Session { /// Please ensure that the implementation of the function is trusted. /// @return Status result of function /// - Status RunGraphAsync(uint32_t graphId, const std::vector &inputs, RunAsyncCallback callback); + Status RunGraphAsync(uint32_t graphId, const std::vector &inputs, RunAsyncCallback callback); /// /// @ingroup ge_graph diff --git a/inc/external/ge/ge_api_types.h b/inc/external/ge/ge_api_types.h index 5ae5f036..6869a20a 100644 --- a/inc/external/ge/ge_api_types.h +++ b/inc/external/ge/ge_api_types.h @@ -23,6 +23,7 @@ #include #include #include +#include "graph/tensor.h" namespace ge { // Option key: graph run mode @@ -354,7 +355,8 @@ struct OutputTensorInfo { }; using Status = uint32_t; -using RunAsyncCallback = std::function &)>; +using RunAsyncCallback = std::function &)>; + // for ir build namespace ir_option { static const char *const INPUT_FORMAT = "input_format"; diff --git a/inc/framework/common/ge_types.h b/inc/framework/common/ge_types.h index b37574f7..377aa4ea 100644 --- a/inc/framework/common/ge_types.h +++ b/inc/framework/common/ge_types.h @@ -71,7 +71,7 @@ struct DataBuffer { DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare, uint32_t placement = 0) : data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare), placement(placement) {} - DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false) {} + DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false), placement(0) {} }; /// @@ -226,7 +226,7 @@ class GE_FUNC_VISIBILITY ModelListener { /// @param [in] resultCode Execution results /// virtual Status OnComputeDone(uint32_t model_id, uint32_t data_index, uint32_t result_code, - std::vector &outputs) = 0; + std::vector &outputs) = 0; }; // OMM configuration item