rungraphasync

pull/1283/head
zhou_chao1993 4 years ago
parent 25aecfb52c
commit 8b932ef30f

@ -46,7 +46,7 @@ target_compile_definitions(ge_proto_common PRIVATE
)
target_compile_options(ge_proto_common PRIVATE
-O2
-O0 -g
-fno-common
)
@ -72,7 +72,7 @@ target_include_directories(ge_proto_client PRIVATE
)
target_compile_options(ge_proto_client PRIVATE
-O2
-O0 -g
-fno-common
)
@ -720,7 +720,7 @@ target_compile_definitions(ge_runner PRIVATE
)
target_compile_options(ge_runner PRIVATE
-O2
-O0 -g
-fno-common
-fvisibility=hidden
$<$<STREQUAL:${CMAKE_CXX_COMPILER_VERSION},7.3.0>:-Werror=unused-variable>
@ -796,7 +796,7 @@ target_compile_definitions(ge_compiler PRIVATE
)
target_compile_options(ge_compiler PRIVATE
-O2
-O0 -g
-fno-common
-fvisibility=hidden
$<$<STREQUAL:${CMAKE_CXX_COMPILER_VERSION},7.3.0>:-Werror=unused-variable>
@ -891,7 +891,7 @@ target_compile_definitions(opensrc_ascendcl PRIVATE
)
target_compile_options(opensrc_ascendcl PRIVATE
-O2
-O0 -g
-fvisibility=hidden
)

@ -597,8 +597,7 @@ Status Session::RegisterCallBackFunc(const char *key, const session::pCallBackFu
return ge::GELib::GetInstance()->SessionManagerObj().RegisterCallBackFunc(sessionId_, str_key, callback);
}
// Build Graph
Status Session::BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs) {
Status Session::BuildGraph(uint32_t graph_id, const std::vector<ge::Tensor> &inputs) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id);
std::shared_ptr<GELib> instance_ptr = ge::GELib::GetInstance();
@ -620,8 +619,7 @@ Status Session::BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo>
return SUCCESS;
}
// Run Graph Asynchronously
Status Session::RunGraphAsync(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs,
Status Session::RunGraphAsync(uint32_t graph_id, const std::vector<ge::Tensor> &inputs,
RunAsyncCallback callback) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelExecute, ErrorMessage::kModelExecute);
ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id);

@ -72,7 +72,7 @@ void GetDomiInputData(const ge::RunModelData &input_data, ge::InputData &inputs)
inputs.timeout = input_data.timeout;
inputs.request_id = input_data.request_id;
for (const auto &data_item : input_data.blobs) {
ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare};
ge::DataBuffer dataBuf{data_item.data, data_item.length, 0, data_item.isDataSupportMemShare};
inputs.blobs.emplace_back(dataBuf);
}
}
@ -81,7 +81,7 @@ void GetDomiOutputData(const ge::RunModelData &output_data, ge::OutputData &outp
outputs.index = output_data.index;
outputs.model_id = output_data.modelId;
for (const auto &data_item : output_data.blobs) {
ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare);
ge::DataBuffer dataBuf(data_item.data, data_item.length, 0, data_item.isDataSupportMemShare);
outputs.blobs.emplace_back(dataBuf);
}
}

@ -380,7 +380,7 @@ Status GraphExecutor::ExecuteGraph(GraphId graph_id, const GeRootModelPtr &ge_ro
}
Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model,
const std::vector<InputTensorInfo> &input_tensor) {
const std::vector<ge::Tensor> &input_tensor) {
GELOGI("[GraphExecutor] Start to async execute graph, graph_id=%u", graph_id);
if (graph_id != last_graph_id_) {
auto ret = FreeExecuteMemory();
@ -400,7 +400,7 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &
return SUCCESS;
}
Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector<InputTensorInfo> &inputs) {
Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector<ge::Tensor> &inputs) {
try {
auto model_manager = ge::ModelManager::GetInstance();
GE_CHECK_NOTNULL(model_manager);

@ -50,7 +50,7 @@ class GraphExecutor {
std::vector<GeTensor> &output_tensor);
ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model,
const std::vector<InputTensorInfo> &input_tensor);
const std::vector<ge::Tensor> &input_tensor);
Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr<GraphModelListener> listener);
@ -123,7 +123,7 @@ class GraphExecutor {
Status SyncExecuteModel(uint32_t model_id, const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor);
Status AsyncExecuteModel(uint32_t model_id, const std::vector<InputTensorInfo> &input_tensor);
Status AsyncExecuteModel(uint32_t model_id, const std::vector<ge::Tensor> &input_tensor);
void InitModelIdInfo(std::vector<uint32_t> &out_model_id_info, std::vector<SubGraphInfoPtr> &sub_graph_vec,
uint32_t output_size);

@ -123,6 +123,7 @@ const char* const kInferEndTime = "infer_end_time";
const char* const kOutputBeginTime = "output_start_time";
const char* const kOutputEndTime = "output_end_time";
const uint32_t kStringHeadElems = 2;
const uint32_t kPlacementHost = 0;
inline bool IsDataOp(const std::string &node_type) {
return (node_type == DATA_TYPE) || (node_type == AIPP_DATA_TYPE) || (node_type == ANN_DATA_TYPE);
@ -2230,8 +2231,7 @@ Status DavinciModel::GetOutputDescInfo(vector<InputOutputDescInfo> &output_descs
return SUCCESS;
}
Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data) {
rtMemcpyKind_t kind = device_data ? RT_MEMCPY_DEVICE_TO_DEVICE : RT_MEMCPY_HOST_TO_DEVICE;
Status DavinciModel::CopyInputData(const InputData &input_data) {
const std::vector<DataBuffer> &blobs = input_data.blobs;
for (const auto &data : input_data_info_) {
if (data.first >= blobs.size()) {
@ -2244,6 +2244,7 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data
}
const DataBuffer &data_buf = blobs[data.first];
rtMemcpyKind_t kind = data_buf.placement == kPlacementHost ? RT_MEMCPY_HOST_TO_DEVICE : RT_MEMCPY_DEVICE_TO_DEVICE;
if (data_buf.length == 0) {
GELOGW("No data need to memcpy!");
return SUCCESS;
@ -2549,6 +2550,7 @@ Status DavinciModel::CopyOutputData(uint32_t data_id, OutputData &output_data, r
uint64_t buffer_length = buffer.length;
void *buffer_addr = reinterpret_cast<void *>(reinterpret_cast<uintptr_t>(buffer.data));
GELOGI("=========bufer data is %p",buffer_addr);
GELOGI("CopyPlainData memcpy graph_%u type[F] output[%u] memaddr[%p] mem_size[%lu] datasize[%lu]",
runtime_param_.graph_id, output.first, output.second.GetBasicAddr(), data_size, buffer_length);
GE_CHK_RT_RET(rtMemcpy(buffer_addr, buffer_length, output.second.GetBasicAddr(), data_size, kind));
@ -2584,7 +2586,7 @@ Status DavinciModel::InitOutputTensorInfo(const OpDescPtr &op_desc) {
return SUCCESS;
}
Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector<OutputTensorInfo> &outputs) {
Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector<ge::Tensor> &outputs) {
GE_CHECK_NOTNULL(output_data);
if (!output_data->blobs.empty()) {
GELOGI("No need to generate output tensor info, model id:%u", model_id_);
@ -2613,21 +2615,36 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector<OutputT
GELOGI("Output blobs size:%zu, model id:%u", output_buffer_size_.size(), model_id_);
for (size_t i = 0; i < output_buffer_size.size(); ++i) {
std::unique_ptr<uint8_t[]> data_buf(new (std::nothrow) uint8_t[output_buffer_size[i]]);
if (data_buf == nullptr) {
REPORT_CALL_ERROR("E19999", "New buffer failed, size:%ld, model_id:%u",
output_buffer_size[i], model_id_);
GELOGE(GE_GRAPH_MALLOC_FAILED, "Malloc buffer failed.");
return GE_GRAPH_MALLOC_FAILED;
}
output_data->blobs.push_back({data_buf.get(), static_cast<uint64_t>(output_buffer_size[i]), false});
OutputTensorInfo output;
output.dims = output_shape_info[i];
output.data = std::move(data_buf);
output.length = output_buffer_size[i];
outputs.emplace_back(std::move(output));
//std::unique_ptr<uint8_t[]> data_buf(new (std::nothrow) uint8_t[output_buffer_size[i]]);
//AlignedPtr aligned_ptr(output_buffer_size[i],64);
//auto data_buf = aligned_ptr.MutableGet();
// if (data_buf == nullptr) {
// REPORT_CALL_ERROR("E19999", "New buffer failed, size:%ld, model_id:%u when DavinciModel %s",
// output_buffer_size[i], model_id_, __FUNCTION__);
// GELOGE(GE_GRAPH_MALLOC_FAILED, "Malloc buffer failed.");
// return GE_GRAPH_MALLOC_FAILED;
// }
auto aligned_ptr = MakeShared<AlignedPtr>(output_buffer_size[i],64);
GeShape ge_shape(output_shape_info[i]);
GeTensorDesc tensor_desc;
tensor_desc.SetShape(ge_shape);
GeTensor ge_tensor(tensor_desc);
ge_tensor.SetData(aligned_ptr,output_buffer_size[i]);
ge::Tensor output_tensor= TensorAdapter::AsTensor(ge_tensor);
auto data_ptr = aligned_ptr->MutableGet();
//output_data->blobs.push_back({reinterpret_cast<void *>()})
// auto data_ptr = data_buf;
// data_buf = nullptr;
output_data->blobs.push_back({reinterpret_cast<void *>(data_ptr), static_cast<uint64_t>(output_buffer_size[i]), 0, false});
//ge::Shape shape(output_shape_info[i]);
//TensorDesc tensor_desc;
//tensor_desc.SetShape(shape);
//ge::Tensor output_tensor(tensor_desc);
//output_tensor.SetData(data_ptr, static_cast<size_t>(output_buffer_size[i]),aligned_ptr.GetDeleter());
outputs.emplace_back(std::move(output_tensor));
GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i,
formats::JoinToString(output.dims).c_str(), output.length);
formats::JoinToString(output_shape_info[i]).c_str(), output_buffer_size[i]);
}
return SUCCESS;
@ -2647,7 +2664,7 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector<OutputT
Status DavinciModel::ReturnResult(uint32_t data_id, const bool rslt_flg, const bool seq_end_flag,
OutputData *output_data) {
GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null.");
std::vector<ge::OutputTensorInfo> outputs;
std::vector<ge::Tensor> outputs;
// return result is not required
if (!rslt_flg && !seq_end_flag) {
@ -2711,7 +2728,7 @@ Status DavinciModel::ReturnNoOutput(uint32_t data_id) {
GELOGI("ReturnNoOutput model id:%u.", model_id_);
GE_CHK_BOOL_EXEC(listener_ != nullptr, return PARAM_INVALID, "listener_ is null!");
std::vector<ge::OutputTensorInfo> outputs;
std::vector<ge::Tensor> outputs;
GE_CHK_STATUS(listener_->OnComputeDone(model_id_, data_id, SUCCESS, outputs), "OnComputeDone failed.");
return SUCCESS;
}
@ -2766,7 +2783,7 @@ void *DavinciModel::Run(DavinciModel *model) {
GELOGI("Copy input data, model id:%u", model_id);
GE_IF_BOOL_EXEC(ProfilingManager::Instance().ProfilingModelExecuteOn(),
model->SetProfileTime(MODEL_PRE_PROC_START));
ret = model->CopyInputData(current_data, false);
ret = model->CopyInputData(current_data);
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(
ret != SUCCESS, (void)model->ReturnResult(current_data.index, false, false, data_wrapper->GetOutput());
CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC);

@ -620,7 +620,7 @@ class DavinciModel {
Status UpdateIoTaskArgs(const map<uint32_t, ZeroCopyOffset> &data_info, bool is_input,
const vector<DataBuffer> &blobs, bool is_dynamic, const string &batch_label);
Status CopyInputData(const InputData &input_data, bool device_data = false);
Status CopyInputData(const InputData &input_data);
Status CopyOutputData(uint32_t data_id, OutputData &output_data, rtMemcpyKind_t kind);
@ -865,7 +865,7 @@ class DavinciModel {
Status SinkTimeProfile(const InputData &current_data);
Status InitOutputTensorInfo(const OpDescPtr &op_desc);
Status GenOutputTensorInfo(OutputData *output_data, vector<OutputTensorInfo> &outputs);
Status GenOutputTensorInfo(OutputData *output_data, vector<ge::Tensor> &outputs);
Status InitInputDescInfo(const OpDescPtr &op_desc);
Status InitOutputDescInfo(const OpDescPtr &op_desc, const vector<string> &out_node_name);

@ -533,7 +533,7 @@ Status ModelManager::GetCurDynamicDims(const vector<vector<int64_t>> &user_real_
/// @brief load Input and output TensorInfo for Model
/// @return Status run result
///
Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector<InputTensorInfo> &inputs) {
Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector<ge::Tensor> &inputs) {
std::shared_ptr<DavinciModel> model = GetModel(model_id);
auto hybrid_model = GetHybridModel(model_id);
if (hybrid_model == nullptr) {
@ -547,9 +547,11 @@ Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector<InputT
input_data.index = 0;
for (size_t i = 0; i < inputs.size(); ++i) {
DataBuffer data;
data.data = inputs[i].data;
data.length = inputs[i].length;
input_data.shapes.emplace_back(inputs[i].dims);
const TensorDesc &tensor_desc = inputs[i].GetTensorDesc();
data.data = reinterpret_cast<void *>(const_cast<uint8_t *>(inputs[i].GetData()));
data.length = inputs[i].GetSize();
data.placement = tensor_desc.GetPlacement();
input_data.shapes.emplace_back(tensor_desc.GetShape().GetDims());
input_data.blobs.push_back(data);
}
if (!GetLocalOmgContext().user_input_dims.empty() && GetLocalOmgContext().need_multi_batch) {

@ -122,7 +122,7 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager {
///
ge::Status DataInput(const InputData &input_data, OutputData &output_data);
ge::Status DataInputTensor(uint32_t model_id, const std::vector<InputTensorInfo> &inputs);
ge::Status DataInputTensor(uint32_t model_id, const std::vector<ge::Tensor> &inputs);
///
/// @ingroup domi_ome

@ -2769,7 +2769,7 @@ Status GraphManager::ProcessSubGraphWithMultiThreads(GraphManager *graph_manager
}
// run graph async on session
Status GraphManager::RunGraphAsync(const GraphId &graph_id, const std::vector<ge::InputTensorInfo> &inputs,
Status GraphManager::RunGraphAsync(const GraphId &graph_id, const std::vector<ge::Tensor> &inputs,
uint64_t session_id, RunAsyncCallback callback) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelExecute, ErrorMessage::kModelExecute);
GELOGI("[GraphManager] Start to run graph async, graph_id=%u, inputsSize=%zu.", graph_id, inputs.size());
@ -2841,13 +2841,13 @@ Status GraphManager::IncreBuild(const GraphNodePtr &graph_node, GeModelPtr &ge_m
return FAILED;
}
void GraphManager::ConstructGeInput(const vector<InputTensorInfo> &inputs, vector<GeTensor> &ge_inputs) {
for (auto const &input : inputs) {
GeTensorDesc input_tensor_desc(GeShape(input.dims));
input_tensor_desc.SetDataType(static_cast<ge::DataType>(input.data_type));
ge_inputs.emplace_back(input_tensor_desc);
}
}
// void GraphManager::ConstructGeInput(const vector<ge::Tensor> &inputs, vector<GeTensor> &ge_inputs) {
// for (auto const &input : inputs) {
// GeTensorDesc input_tensor_desc(GeShape(input.dims));
// input_tensor_desc.SetDataType(static_cast<ge::DataType>(input.data_type));
// ge_inputs.emplace_back(input_tensor_desc);
// }
// }
void GraphManager::PreRunThread(GraphManager *graph_manager) {
if (prctl(PR_SET_NAME, ("GE_PreRun")) != 0) {
@ -2923,7 +2923,10 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) {
GeModelPtr ge_model = nullptr;
if (graph_manager->IncreBuild(graph_node, ge_model) != SUCCESS) {
std::vector<GeTensor> ge_inputs;
ConstructGeInput(args.input_tensor, ge_inputs);
for (const auto &item: args.input_tensor) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
//ConstructGeInput(args.input_tensor, ge_inputs);
ret = graph_manager->PreRun(graph_node, ge_inputs, ge_root_model, args.session_id);
// release rts generate context
RtContextUtil::GetInstance().DestroyRtContexts(args.session_id, graph_node->GetGraphId());
@ -2952,20 +2955,25 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) {
}
}
void GraphManager::ParseInputsDimsForData(const std::vector<InputTensorInfo> &input_tensor) {
void GraphManager::ParseInputsDimsForData(const std::vector<ge::Tensor> &input_tensor) {
GELOGD("Start parse input dims from data.");
for (size_t i = 0; i < input_tensor.size(); ++i) {
std::vector<int64_t> dynamic_dim;
for (size_t j = 0; j < input_tensor[i].dims.size(); ++j) {
dynamic_dim.emplace_back(input_tensor[i].dims[j]);
}
// for (size_t j = 0; j < input_tensor[i].dims.size(); ++j) {
// dynamic_dim.emplace_back(input_tensor[i].dims[j]);
// }
const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc();
const Shape &shape = tensor_desc.GetShape();
auto shape_dims = shape.GetDims();
std::copy(shape_dims.begin(), shape_dims.end(),std::back_inserter(dynamic_dim));
GELOGD("Input tensor dims is %s.", formats::JoinToString(dynamic_dim).c_str());
GetLocalOmgContext().user_real_input_dims.emplace_back(input_tensor[i].dims);
GetLocalOmgContext().user_real_input_dims.emplace_back(shape_dims);
}
}
Status GraphManager::ParseInputsDimsForGetNexNosinkAndData(const vector<NodePtr> &dynamic_nodes,
const std::vector<InputTensorInfo> &input_tensor) {
const std::vector<ge::Tensor> &input_tensor) {
GELOGD("Start parse inputs dims when coexist data and getnext sink.");
for (size_t i = 0; i < dynamic_nodes.size(); ++i) {
auto op_desc = dynamic_nodes.at(i)->GetOpDesc();
@ -2988,13 +2996,17 @@ Status GraphManager::ParseInputsDimsForGetNexNosinkAndData(const vector<NodePtr>
return PARAM_INVALID;
}
GetLocalOmgContext().user_real_input_dims.emplace_back(input_tensor.at(index).dims);
GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(input_tensor.at(index).dims).c_str());
const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc();
const Shape &shape = tensor_desc.GetShape();
const auto &shape_dims = shape.GetDims();
GetLocalOmgContext().user_real_input_dims.emplace_back(shape_dims);
GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(shape_dims).c_str());
}
return SUCCESS;
}
Status GraphManager::ParseInputsDims(const std::vector<InputTensorInfo> &input_tensor) {
Status GraphManager::ParseInputsDims(const std::vector<ge::Tensor> &input_tensor) {
GELOGI("Start parse input dims of %zu input tensor.", input_tensor.size());
GetLocalOmgContext().user_real_input_dims.clear();
if (!GetLocalOmgContext().dynamic_node_type.empty()) {
@ -3122,13 +3134,13 @@ void GraphManager::ReturnError(GraphManager *graph_manager, RunAsyncCallback cal
}
StopQueue(graph_manager);
GELOGE(ret, "%s.", log.c_str());
std::vector<ge::OutputTensorInfo> outputs;
std::vector<ge::Tensor> outputs;
callback(ret, outputs);
}
void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_node,
RunAsyncCallback callback, Status ret, const string &log) {
std::vector<ge::OutputTensorInfo> outputs;
std::vector<ge::Tensor> outputs;
auto compute_graph = GraphUtils::GetComputeGraph(*graph_node->GetGraph());
if (graph_manager == nullptr || compute_graph == nullptr) {
REPORT_INNER_ERROR("E19999", "Param graph_manager or compute_graph in graph_node is nullptr, "
@ -3144,9 +3156,14 @@ void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_
}
for (size_t i = 0; i < node->GetAllInDataAnchorsSize(); i++) {
auto input_desc = node->GetOpDesc()->MutableInputDesc(i);
ge::OutputTensorInfo tensor;
tensor.dims = input_desc->GetShape().GetDims();
tensor.data_type = static_cast<uint32_t>(input_desc->GetDataType());
ge::Shape shape(input_desc->GetShape().GetDims());
TensorDesc tensor_desc;
tensor_desc.SetShape(shape);
tensor_desc.SetDataType(input_desc->GetDataType());
Tensor tensor(tensor_desc);
// ge::OutputTensorInfo tensor;
// tensor.dims = input_desc->GetShape().GetDims();
// tensor.data_type = static_cast<uint32_t>(input_desc->GetDataType());
int64_t len = 1;
if (input_desc->GetShape().GetDims() != std::vector<int64_t>({})) {
len = input_desc->GetShape().GetShapeSize();
@ -3181,11 +3198,15 @@ void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_
callback(GRAPH_FAILED, outputs);
return;
}
tensor.length = len * size;
tensor.data.reset(new(std::nothrow) uint8_t[tensor.length]);
auto length = len * size;
unique_ptr<uint8_t []> ptr(new (std::nothrow) uint8_t[length]);
tensor.SetData(ptr.get(), length, ptr.get_deleter());
//tensor.SetData()
// tensor.length = len * size;
// tensor.data.reset(new(std::nothrow) uint8_t[tensor.length]);
// To avoid global step too small and can not stop, totally set a bigger value
for (int64_t i = 0; i < tensor.length; i++) {
tensor.data[i] = 0x7F; // here stands for a positive max value
for (int64_t i = 0; i < length; i++) {
ptr[i] = 0x7F; // here stands for a positive max value
}
outputs.emplace_back(std::move(tensor));
}

@ -149,7 +149,7 @@ class GraphManager {
/// @param [out] callback: callback while run graph async finish
/// @return Status result of function
///
Status RunGraphAsync(const GraphId &graph_id, const std::vector<ge::InputTensorInfo> &inputs,
Status RunGraphAsync(const GraphId &graph_id, const std::vector<ge::Tensor> &inputs,
uint64_t session_id, RunAsyncCallback callback);
///
@ -194,7 +194,7 @@ class GraphManager {
struct PreRunArgs {
GraphId graph_id;
std::vector<ge::InputTensorInfo> input_tensor;
std::vector<ge::Tensor> input_tensor;
uint64_t session_id;
struct ErrorMessage::Context error_context;
GEThreadLocalContext context;
@ -206,7 +206,7 @@ class GraphManager {
GraphId graph_id;
uint64_t session_id;
struct ErrorMessage::Context error_context;
std::vector<ge::InputTensorInfo> input_tensor;
std::vector<ge::Tensor> input_tensor;
GeRootModelPtr ge_root_model;
GEThreadLocalContext context;
RunAsyncCallback callback;
@ -225,10 +225,10 @@ class GraphManager {
uint64_t session_id,
const struct ErrorMessage::Context &error_context,
const GEThreadLocalContext &ge_context);
Status ParseInputsDims(const std::vector<InputTensorInfo> &input_tensor);
void ParseInputsDimsForData(const std::vector<InputTensorInfo> &input_tensor);
Status ParseInputsDims(const std::vector<ge::Tensor> &input_tensor);
void ParseInputsDimsForData(const std::vector<ge::Tensor> &input_tensor);
Status ParseInputsDimsForGetNexNosinkAndData(const vector<NodePtr> &dynamic_nodes,
const std::vector<InputTensorInfo> &input_tensor);
const std::vector<ge::Tensor> &input_tensor);
Status RunCustomPass(const GraphNodePtr &graph_node);
Status PreRun(const GraphNodePtr &graph_node, const std::vector<GeTensor> &inputs, GeRootModelPtr &ge_root_model,
uint64_t session_id = INVALID_SESSION_ID);
@ -339,7 +339,7 @@ class GraphManager {
void RemoveModelCacheHelper(const GraphId &graph_id);
ModelCacheHelperPtr FindModelCacheHelper(GraphId graph_id);
static void ConstructGeInput(const std::vector<InputTensorInfo> &inputs, std::vector<GeTensor> &ge_inputs);
//static void ConstructGeInput(const std::vector<ge::Tensor> &inputs, std::vector<GeTensor> &ge_inputs);
static void PreRunThread(GraphManager *graph_manager);
static void RunThread(GraphManager *graph_manager);
static void StopQueue(GraphManager *graph_manager);

@ -104,7 +104,7 @@ GraphModelListener::GraphModelListener(std::mutex &mutex, std::condition_variabl
: result_code_(0), is_finished_(false), mutex_(mutex), condition_(cond) {}
Status GraphModelListener::OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result,
std::vector<ge::OutputTensorInfo> &outputs) {
std::vector<ge::Tensor> &outputs) {
GELOGI(
"[GraphManager] graph compute call back, model_id:%u, task_id:%u, "
"resultCode:%u.",
@ -141,10 +141,14 @@ void RunAsyncListener::SetCallback(const RunAsyncCallback &callback) {
}
Status RunAsyncListener::OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result,
std::vector<ge::OutputTensorInfo> &outputs) {
std::vector<ge::Tensor> &outputs) {
GELOGI("[GraphManager] run graph async call back, modelId:%u, taskId:%u, resultCode:%u.",
model_id, task_id, result);
GE_CHECK_NOTNULL(callback_);
for (auto it : outputs) {
auto addr = it.GetData();
GELOGI("========= addr is %p",addr);
}
callback_(result, outputs);
uint8_t unused;
sem_.Pop(unused);

@ -129,7 +129,7 @@ class RunAsyncListener : public ge::ModelListener {
// callback
Status OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result,
std::vector<ge::OutputTensorInfo> &outputs) override;
std::vector<ge::Tensor> &outputs) override;
private:
RunAsyncCallback callback_;
@ -202,7 +202,7 @@ class GraphModelListener : public ge::ModelListener {
// callback
Status OnComputeDone(uint32_t model_id, uint32_t task_id, uint32_t result,
std::vector<ge::OutputTensorInfo> &outputs) override;
std::vector<ge::Tensor> &outputs) override;
Status ResetResult();

@ -39,6 +39,12 @@ class TensorBuffer {
TensorBuffer &operator = (const TensorBuffer &) = delete;
~TensorBuffer();
void* Release() {
auto ret = buffer_;
buffer_ = nullptr;
return ret;
}
void *GetData() {
return buffer_;
}
@ -47,6 +53,10 @@ class TensorBuffer {
return size_;
}
MemStorageType GetMemType() const {
return mem_type_;
}
private:
TensorBuffer(NpuMemoryAllocator *allocator, void *buffer, size_t size, MemStorageType mem_type = HBM);
@ -68,6 +78,10 @@ class TensorValue {
void Destroy();
void *Release() {
return buffer_->Release();
}
bool IsEmpty() {
return ref_buffer_ == nullptr && buffer_ == nullptr;
}
@ -84,6 +98,10 @@ class TensorValue {
size_t GetSize() const;
MemStorageType GetMemType() const {
return buffer_->GetMemType();
}
private:
std::shared_ptr<TensorBuffer> buffer_;
std::string name_;

@ -20,6 +20,11 @@
#include "graph/utils/type_utils.h"
#include "graph/ge_context.h"
#include "omm/csa_interact.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/manager/graph_caching_allocator.h"
#include "graph/manager/graph_mem_allocator.h"
#include "graph/manager/rdma_pool_allocator.h"
#include "graph/manager/host_mem_allocator.h"
namespace ge {
namespace hybrid {
@ -27,6 +32,9 @@ namespace {
const int kDataOutputIndex = 0;
const size_t kMinimumPiplineStages = 2;
const int kDefaultLoopCount = 10;
//const uint32_t kPlacementHost = 0;
const uint32_t kPlacementDevice = 1;
const char *const kLazyRecompile = "lazy_recompile";
}
HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model)
: model_(model), run_flag_(false), data_dumper_(nullptr) {
@ -71,6 +79,8 @@ Status HybridModelAsyncExecutor::Start(const std::shared_ptr<ModelListener> &lis
GetThreadLocalContext() = *executor_->GetContext()->ge_context;
GetContext().SetSessionId(executor_->GetContext()->session_id);
GetContext().SetContextId(executor_->GetContext()->context_id);
GE_CHECK_NOTNULL(executor_->GetContext()->ge_context);
GetThreadLocalContext() = *executor_->GetContext()->ge_context;
return RunInternal();
});
@ -198,7 +208,7 @@ Status HybridModelAsyncExecutor::HandleResult(Status exec_ret,
HybridModelExecutor::ExecuteArgs &args,
OutputData *output_data) {
GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret);
std::vector<ge::OutputTensorInfo> output_tensor_info_list;
std::vector<ge::Tensor> output_tensor_info_list;
if (args.is_eos) {
GELOGI("End of sequence, model id = %u", model_id_);
GE_CHK_STATUS_RET_NOLOG(OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list));
@ -369,7 +379,7 @@ Status HybridModelAsyncExecutor::InitInputDesc() {
}
Status HybridModelAsyncExecutor::OnComputeDone(uint32_t data_index, uint32_t result_code,
std::vector<ge::OutputTensorInfo> &outputs) {
std::vector<ge::Tensor> &outputs) {
GELOGD("OnComputeDone. model id = %u, data index = %u, execution ret = %u", model_id_, data_index, result_code);
if (listener_ != nullptr) {
GE_CHK_STATUS(listener_->OnComputeDone(model_id_, data_index, result_code, outputs),
@ -381,7 +391,7 @@ Status HybridModelAsyncExecutor::OnComputeDone(uint32_t data_index, uint32_t res
Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &args,
OutputData *output_data,
std::vector<ge::OutputTensorInfo> &outputs) {
std::vector<ge::Tensor> &outputs) {
// copy output data from op to designated position
std::vector<ConstGeTensorDescPtr> &output_tensor_desc_list = args.output_desc;
std::vector<TensorValue> &output_tensors = args.outputs;
@ -396,6 +406,12 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
}
GELOGD("Number of outputs = %zu", output_tensor_desc_list.size());
string execute_mode;
auto result = ge::GetContext().GetOption(OPTION_EXEC_DYNAMIC_EXECUTE_MODE, execute_mode);
if (result != SUCCESS) {
GELOGW("Can not get dynamic execute mode attr");
}
GELOGD("The dynamic execute is %s", execute_mode.c_str());
for (size_t i = 0; i < output_tensors.size(); ++i) {
GELOGD("Start to process output[%zu]", i);
auto &output_tensor = output_tensors[i];
@ -430,11 +446,12 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
return INTERNAL_ERROR;
}
ge::OutputTensorInfo output;
output.data_type = static_cast<uint32_t>(tensor_desc->GetDataType());
output.dims = tensor_desc->GetShape().GetDims();
output.length = output_size;
ge::Shape shape(tensor_desc->GetShape().GetDims());
TensorDesc desc;
desc.SetShape(shape);
ge::Tensor tensor(desc);
if (output_size > 0) {
if (execute_mode != kLazyRecompile) {
std::unique_ptr<uint8_t[]> data_buf(new(std::nothrow) uint8_t[output_size]);
GE_CHECK_NOTNULL(data_buf);
GE_CHK_RT_RET(rtMemcpy(data_buf.get(),
@ -442,15 +459,34 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
output_tensor.GetData(),
output_size,
RT_MEMCPY_DEVICE_TO_HOST));
output.data = std::move(data_buf);
output_data->blobs.emplace_back(data_buf.get(), static_cast<uint32_t>(output_size), false);
tensor.SetData(data_buf.get(), static_cast<size_t>(output_size),[](uint8_t *ptr) {
ptr = nullptr;});
output_data->blobs.emplace_back(data_buf.get(), static_cast<uint32_t>(output_size), 0, false);
} else {
auto mem_type = output_tensor.GetMemType();
auto deleter = [=] (uint8_t *device_data) {
if (device_data != nullptr) {
GELOGI("Deallocating buffer successfully. addr = %p", device_data);
if (mem_type == RDMA_HBM) {
MemManager::Instance().RdmaPoolInstance(RT_MEMORY_HBM).Free(device_data, device_id_);
} else if (mem_type == HOST_DDR) {
MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Free(device_data);
} else {
MemManager::Instance().CachingInstance(RT_MEMORY_HBM).Free(device_data, device_id_);
}
}
};
tensor.GetTensorDesc().SetPlacement(kPlacementDevice);
tensor.SetData(reinterpret_cast<uint8_t *>(output_tensor.Release()),static_cast<size_t>(output_size), deleter);
output_data->blobs.emplace_back(output_tensor.Release(), static_cast<uint32_t>(output_size), 1, false);
}
} else {
GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str());
output.data = nullptr;
output_data->blobs.emplace_back(nullptr, 0U, false);
tensor.SetData(nullptr,0U,nullptr);
output_data->blobs.emplace_back(nullptr, 0U, 0, false);
}
outputs.emplace_back(std::move(output));
outputs.emplace_back(std::move(tensor));
GELOGD("Output[%zu] added, type = %s, shape = [%s], size = %ld",
i,
TypeUtils::DataTypeToSerialString(tensor_desc->GetDataType()).c_str(),
@ -508,7 +544,7 @@ Status HybridModelAsyncExecutor::Execute(const vector<GeTensor> &inputs, vector<
GELOGD("Done copying input data successfully.");
GE_CHK_STATUS_RET(executor_->Execute(args), "[Invoke][Execute] Failed, model_id = %u.", model_id_);
std::vector<ge::OutputTensorInfo> output_tensor_info_list;
std::vector<ge::Tensor> output_tensor_info_list;
OutputData output_data;
GE_CHK_STATUS_RET(CopyOutputs(args, &output_data, output_tensor_info_list),
"[Invoke][CopyOutputs]Failed to copy outputs, model_id = %u.", model_id_);
@ -518,15 +554,15 @@ Status HybridModelAsyncExecutor::Execute(const vector<GeTensor> &inputs, vector<
outputs.resize(output_tensor_info_list.size());
for (auto &out_tensor_info : output_tensor_info_list) {
auto &ge_tensor = outputs[out_index];
if (out_tensor_info.length > 0) {
GE_CHK_GRAPH_STATUS_RET(ge_tensor.SetData(out_tensor_info.data.get(), out_tensor_info.length),
if (out_tensor_info.GetSize() > 0) {
GE_CHK_GRAPH_STATUS_RET(ge_tensor.SetData(out_tensor_info.GetData(), out_tensor_info.GetSize()),
"Failed to set output[%d].", out_index);
}
ge_tensor.MutableTensorDesc() = *args.output_desc[out_index];
GELOGD("Set output[%d], tensor size = %ld, shape = [%s]",
out_index,
out_tensor_info.length,
out_tensor_info.GetSize(),
ge_tensor.MutableTensorDesc().MutableShape().ToString().c_str());
++out_index;
}

@ -69,9 +69,9 @@ class HybridModelAsyncExecutor {
Status CopyOutputs(HybridModelExecutor::ExecuteArgs &args,
OutputData *output_data,
std::vector<ge::OutputTensorInfo> &outputs);
std::vector<ge::Tensor> &outputs);
Status OnComputeDone(uint32_t data_index, uint32_t result_code, std::vector<ge::OutputTensorInfo> &outputs);
Status OnComputeDone(uint32_t data_index, uint32_t result_code, std::vector<ge::Tensor> &outputs);
Status PreRun(InputData &current_data, HybridModelExecutor::ExecuteArgs &args);

@ -337,19 +337,19 @@ Status InnerSession::RegisterCallBackFunc(
return SUCCESS;
}
Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs) {
Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector<ge::Tensor> &inputs) {
UpdateThreadContext(graph_id);
GELOGI("[InnerSession:%lu] build graph on session, graph_id=%u.", session_id_, graph_id);
std::vector<ge::GeTensor> ge_inputs;
for (auto const &input : inputs) {
std::vector<int64_t> input_dims;
std::transform(input.dims.begin(), input.dims.end(), std::back_inserter(input_dims),
[](int64_t x) -> int64_t { return x; });
GeShape input_shape(input_dims);
GeTensorDesc input_tensor_desc;
input_tensor_desc.SetShape(input_shape);
input_tensor_desc.SetDataType(static_cast<ge::DataType>(input.data_type));
ge_inputs.emplace_back(input_tensor_desc);
// std::vector<int64_t> input_dims;
// std::transform(input.dims.begin(), input.dims.end(), std::back_inserter(input_dims),
// [](int64_t x) -> int64_t { return x; });
// GeShape input_shape(input_dims);
// GeTensorDesc input_tensor_desc;
// input_tensor_desc.SetShape(input_shape);
// input_tensor_desc.SetDataType(static_cast<ge::DataType>(input.data_type));
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(input));
}
GeRootModelPtr ge_root_model = nullptr;
Status ret = graph_manager_.BuildGraph(graph_id, ge_inputs, ge_root_model, session_id_, true);
@ -363,7 +363,7 @@ Status InnerSession::BuildGraph(uint32_t graph_id, const std::vector<InputTensor
return ret;
}
Status InnerSession::RunGraphAsync(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs,
Status InnerSession::RunGraphAsync(uint32_t graph_id, const std::vector<ge::Tensor> &inputs,
RunAsyncCallback callback) {
UpdateThreadContext(graph_id);
GELOGI("[InnerSession:%lu] run graph on session, graph_id=%u.", session_id_, graph_id);

@ -43,9 +43,9 @@ class InnerSession {
Status RemoveGraph(uint32_t graph_id);
Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);
Status BuildGraph(uint32_t graph_id, const std::vector<ge::Tensor> &inputs);
Status RunGraphAsync(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs, RunAsyncCallback callback);
Status RunGraphAsync(uint32_t graph_id, const std::vector<ge::Tensor> &inputs, RunAsyncCallback callback);
Status Finalize();

@ -336,7 +336,7 @@ Status SessionManager::RegisterCallBackFunc(
return innerSession->RegisterCallBackFunc(key, callback);
}
Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector<InputTensorInfo> &inputs) {
Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector<ge::Tensor> &inputs) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "[Build][Graph]fail for Session manager is not initialized,"
"session_id:%lu, graph_id:%u.", session_id, graph_id);
@ -358,7 +358,7 @@ Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const
}
Status SessionManager::RunGraphAsync(SessionId session_id, uint32_t graph_id,
const std::vector<InputTensorInfo> &inputs, RunAsyncCallback callback) {
const std::vector<ge::Tensor> &inputs, RunAsyncCallback callback) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,
"[AsyncRun][Graph]fail for Session manager is not initialized, session_id:%lu, graph_id:%u.",

@ -123,7 +123,7 @@ class SessionManager {
/// @param [in] inputs input data
/// @return Status result of function
///
Status BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);
Status BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector<ge::Tensor> &inputs);
///
/// @ingroup ge_session
@ -133,7 +133,7 @@ class SessionManager {
/// @param [in] inputs input data
/// @return Status result of function
///
Status RunGraphAsync(SessionId session_id, uint32_t graph_id, const std::vector<InputTensorInfo> &inputs,
Status RunGraphAsync(SessionId session_id, uint32_t graph_id, const std::vector<ge::Tensor> &inputs,
RunAsyncCallback callback);
///

@ -873,7 +873,7 @@ Status AiCpuTask::LaunchKernel(const std::vector<GeTensorDesc> &input_desc,
if (unknown_type_ == DEPEND_COMPUTE) {
std::vector<DataBuffer> summary_buffers;
for (size_t i = 0; i < num_outputs_; ++i) {
summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false);
summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), 0, false);
}
GE_CHK_STATUS_RET_NOLOG(UpdateIoAddr(input_buffers, summary_buffers));
} else {

@ -28,7 +28,6 @@
namespace ge {
typedef uint32_t (*pCallBackFunc)(uint32_t graph_id, const std::map<std::string, ge::Tensor> &params_list);
namespace session {
typedef uint32_t (*pCallBackFunc)(uint32_t graph_id, const std::map<AscendString, ge::Tensor> &params_list);
}
@ -128,7 +127,7 @@ class GE_FUNC_VISIBILITY Session {
/// @param [in] inputs: input data
/// @return Status result of function
///
Status BuildGraph(uint32_t graphId, const std::vector<InputTensorInfo> &inputs);
Status BuildGraph(uint32_t graphId, const std::vector<ge::Tensor> &inputs);
///
/// @ingroup ge_graph
@ -140,7 +139,7 @@ class GE_FUNC_VISIBILITY Session {
/// Please ensure that the implementation of the function is trusted.
/// @return Status result of function
///
Status RunGraphAsync(uint32_t graphId, const std::vector<ge::InputTensorInfo> &inputs, RunAsyncCallback callback);
Status RunGraphAsync(uint32_t graphId, const std::vector<ge::Tensor> &inputs, RunAsyncCallback callback);
///
/// @ingroup ge_graph

@ -23,6 +23,7 @@
#include <set>
#include <functional>
#include <memory>
#include "graph/tensor.h"
namespace ge {
// Option key: graph run mode
@ -354,7 +355,8 @@ struct OutputTensorInfo {
};
using Status = uint32_t;
using RunAsyncCallback = std::function<void(Status, std::vector<ge::OutputTensorInfo> &)>;
using RunAsyncCallback = std::function<void(Status, std::vector<ge::Tensor> &)>;
// for ir build
namespace ir_option {
static const char *const INPUT_FORMAT = "input_format";

@ -71,7 +71,7 @@ struct DataBuffer {
DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare, uint32_t placement = 0)
: data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare), placement(placement) {}
DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false) {}
DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false), placement(0) {}
};
///
@ -226,7 +226,7 @@ class GE_FUNC_VISIBILITY ModelListener {
/// @param [in] resultCode Execution results
///
virtual Status OnComputeDone(uint32_t model_id, uint32_t data_index, uint32_t result_code,
std::vector<ge::OutputTensorInfo> &outputs) = 0;
std::vector<ge::Tensor> &outputs) = 0;
};
// OMM configuration item

Loading…
Cancel
Save