You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
graphengine/ge/graph/manager/graph_manager.h

423 lines
17 KiB

/**
* Copyright 2019-2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef GE_GRAPH_MANAGER_GRAPH_MANAGER_H_
#define GE_GRAPH_MANAGER_GRAPH_MANAGER_H_
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <vector>
#include "common/blocking_queue.h"
#include "common/ge_inner_error_codes.h"
#include "common/helper/model_cache_helper.h"
#include "external/graph/types.h"
#include "ge/ge_api_types.h"
#include "graph/build/graph_builder.h"
#include "graph/execute/graph_execute.h"
#include "graph/ge_local_context.h"
#include "graph/load/graph_loader.h"
#include "graph/manager/graph_manager_utils.h"
#include "graph/manager/util/variable_accelerate_ctrl.h"
#include "graph/optimize/graph_optimize.h"
#include "graph/partition/graph_partition.h"
#include "graph/preprocess/graph_preprocess.h"
#include "graph/tuning_utils.h"
#include "model/ge_model.h"
namespace ge {
class GraphManager {
public:
GraphManager();
~GraphManager() = default;
///
/// @ingroup ge_graph
/// @brief graph manager init
/// @param [in] options user config params
/// @return Status result of function
///
Status Initialize(const std::map<string, string> &options);
///
/// @ingroup ge_graph
/// @brief graph manager finalize
/// @return Status result of function
///
Status Finalize();
///
/// @ingroup ge_graph
/// @brief add specific graph
/// @param [in] graph_id graph id
/// @param [out] Graph output graph
/// @return Status result of function
///
Status AddGraph(const GraphId &graph_id, const Graph &graph, const std::map<std::string, std::string> &options,
const OmgContext &omg_context);
Status InitDynamicParams(ComputeGraphPtr &compute_graph);
///
/// @ingroup ge_graph
/// @brief add a copy graph
/// @param [in] graph_id graph id
/// @param [out] Graph output graph
/// @return Status result of function
///
Status AddGraphWithCopy(const GraphId &graph_id, const Graph &graph,
const std::map<std::string, std::string> &options, const OmgContext &omg_context);
///
/// @ingroup ge_graph
/// @brief remove specific graph
/// @param [in] graph_id graph id
/// @return Status result of function
///
Status RemoveGraph(const GraphId &graph_id);
///
/// @ingroup ge_graph
/// @brief run specific graph
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs,
uint64_t session_id = INVALID_SESSION_ID);
///
/// @ingroup ge_graph
/// @brief build specific graph
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [out] models build result
/// @return Status result of function
///
ge::Status BuildGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, GeRootModelPtr &models,
uint64_t session_id = 0, bool async = false);
Status BuildGraphForUnregisteredOp(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
GeRootModelPtr &ge_root_model, uint64_t session_id);
///
/// @ingroup ge_graph
/// @brief Save extra attribute to Model
/// @param [in] model: Model attribues will save to.
/// @param [in] type: type of OpDesc.
/// @param [in] attrs: attributes of OpDesc
/// @param [in] inputs: input tensor
/// @param [in] outputs: output tensor
/// @return: Status
///
Status SaveParams(ge::GeModel &model, const std::string &type, const std::map<string, GeAttrValue> &attrs,
const std::vector<GeTensor> &inputs, const std::vector<GeTensor> &outputs);
///
/// @ingroup ge_graph
/// @brief get variable value from the session with specific session id
/// @param [in] sessionId session id
/// @param [in] name op name
/// @param [out] val out value tensor
/// @return Status result of function
///
Status GetVariable(const std::string &name, Tensor &val);
///
/// @ingroup ge_graph
/// @brief run graph async on session with specific session id
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [out] callback: callback while run graph async finish
/// @return Status result of function
///
Status RunGraphAsync(const GraphId &graph_id, const std::vector<ge::InputTensorInfo> &inputs,
uint64_t session_id, RunAsyncCallback callback);
///
/// @ingroup ge_graph
/// @brief me register the callback function to get the result of summary or checkpoin
/// @param [in] key: summary or checkpoint
/// @param [in] callbak: The real callback object of me
/// @return Status result of function
///
Status RegisterCallBackFunc(
const std::string &key,
const std::function<Status(uint32_t, const std::map<std::string, ge::Tensor> &)> &callback);
Status RegisterCallBackFunc(
const std::string &key,
const std::function<Status(uint32_t, const std::map<AscendString, ge::Tensor> &)> &callback);
const bool GetTrainFlag() const { return options_.train_graph_flag; }
bool IsGraphNeedRebuild(uint32_t graph_id);
Status GenerateInfershapeGraph(GraphId &graph_id);
const std::map<std::string, std::string> *GetGraphOptions(uint32_t graph_id);
void SetOptionsRunGraphFlag(bool run_graph_flag);
Status GenCheckPointGraph(const std::map<std::string, GeTensorDesc> &all_variables, Graph &graph);
Status SaveVariables(const Graph &graph, const std::vector<std::string> &var_names,
const std::vector<Tensor> &outputs, std::vector<Tensor> &var_values);
Status SaveCheckPointResult(const Graph &graph, const std::vector<Tensor> &outputs, map<string, Tensor> &var_results);
private:
struct CompilerStages {
GraphPrepare preparer;
GraphOptimize optimizer;
GraphPartitioner partitioner;
GraphBuilder builder;
};
struct PreRunArgs {
GraphId graph_id;
std::vector<ge::InputTensorInfo> input_tensor;
uint64_t session_id;
struct ErrorMessage::Context error_context;
GEThreadLocalContext context;
RunAsyncCallback callback;
};
struct RunArgs {
GraphNodePtr graph_node;
GraphId graph_id;
uint64_t session_id;
struct ErrorMessage::Context error_context;
std::vector<ge::InputTensorInfo> input_tensor;
GeRootModelPtr ge_root_model;
GEThreadLocalContext context;
RunAsyncCallback callback;
};
void AddGraphNode(GraphId graph_id, const GraphNodePtr &graph_node);
void RemoveGraphNode(GraphId graph_id);
bool HasGraphNode(GraphId graph_id);
Status GetGraphNode(const GraphId &graph_id, GraphNodePtr &out);
std::shared_ptr<GraphModelListener> GetModelListener() const { return graph_run_listener_; }
static Status ProcessSubGraphWithMultiThreads(GraphManager *graph_manager, GraphId root_graph_id,
const SubGraphInfoPtr &sub_graph_info_ptr,
const std::string &root_graph_name,
uint64_t session_id,
const struct ErrorMessage::Context &error_context,
const GEThreadLocalContext &ge_context);
Status ParseInputsDims(const std::vector<InputTensorInfo> &input_tensor);
void ParseInputsDimsForData(const std::vector<InputTensorInfo> &input_tensor);
Status ParseInputsDimsForGetNexNosinkAndData(const vector<NodePtr> &dynamic_nodes,
const std::vector<InputTensorInfo> &input_tensor);
Status RunCustomPass(const GraphNodePtr &graph_node);
Status PreRun(const GraphNodePtr &graph_node, const std::vector<GeTensor> &inputs, GeRootModelPtr &ge_root_model,
uint64_t session_id = INVALID_SESSION_ID);
Status OptimizeSubgraph(const GraphNodePtr &graph_node, ComputeGraphPtr &compute_graph, uint64_t session_id);
Status Build(const GraphNodePtr &graph_node, ComputeGraphPtr &compute_graph,
GeRootModelPtr &ge_root_model, uint64_t session_id);
Status StartForRunGraph(const GraphNodePtr &graph_node, const std::vector<GeTensor> &inputs,
GeRootModelPtr &ge_root_model, uint64_t session_id = INVALID_SESSION_ID);
Status InnerRunGraph(GraphNodePtr &graph_node, const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs);
Status ParseOptions(const std::map<std::string, std::string> &options);
static void ParseOption(const std::map<std::string, std::string> &options, const std::string &key,
std::string &option);
static Status ParseOption(const std::map<std::string, std::string> &options, const std::string &key, bool &option);
static Status ParseOption(const std::map<std::string, std::string> &options, const std::string &key, int &option);
static Status ParseOption(const std::map<std::string, std::string> &options, const std::string &key,
std::map<std::string, int> &option);
static void Trim(std::string &str);
static Status CheckEngineName(const std::string &engine_name, const std::string &key,
const std::map<std::string, int> &option);
static Status ParseParallelNum(const std::string &parallel_num, const std::string &key, int &num);
static Status ParseTrainGraphFlag(bool &options, bool &option);
static bool IsPerfLevelInvalid(int32_t perf_level);
Status SummaryHandle(const GraphId &graph_id, std::vector<GeTensor> &outputs);
Status CheckpointHandle(const GraphId &graph_id, const ComputeGraphPtr &compute_graph,
const std::vector<GeTensor> &outputs);
// call the callback function of ME to push summary result data to ME
Status PushSummaryData2ME(const GraphId &graph_id, const std::map<std::string, ge::Tensor> &summary_data);
// call the callback function of ME to push save result data to ME
Status PushSaveData2ME(const GraphId &graph_id, const std::map<std::string, ge::Tensor> &save_data);
bool IsCheckpointGraph(ComputeGraphPtr &compute_graph);
bool CheckNetOutputForCheckpointGraph(NodePtr &node);
bool CheckVariableForCheckpointGraph(NodePtr &node);
bool CheckTransOpForCheckpointGraph(NodePtr &node);
Status MergeSubGraph(ComputeGraphPtr &compute_graph, const ge::ComputeGraphPtr &original_compute_graph,
GraphId root_graph_id);
Status ConvertGraphToFile(ComputeGraphPtr &compute_graph, GraphPartitioner &partitioner, std::string file_path,
bool exe_flag = false);
Status SetSubgraph(uint64_t session_id, ComputeGraphPtr compute_graph, GraphPartitioner &partitioner);
void SetAttrForHcomBroadCastOp(ge::ComputeGraphPtr &compute_graph);
bool IsBroadCastOpData(const ge::NodePtr &var_node);
void AdjustBroadCastOpData(const ge::NodePtr &var_node);
bool IsAssignOpData(const ge::NodePtr &var_node);
void AdjustAssignOpData(const ge::NodePtr &var_node);
bool ConfirmUseOpAndIndexByAnchor(const ge::InDataAnchorPtr &in_anchor, const map<string, std::set<int>> &confirm_ops,
ge::NodePtr &use_node);
bool ConfirmUseOpAndIndexByNode(const ge::NodePtr &var_node, const map<string, std::set<int>> &confirm_ops,
ge::NodePtr &use_node);
// graph context
std::shared_ptr<GraphContext> GetGraphContext() const { return graph_context_; }
Status RemoveIsolatedConst(ge::ComputeGraphPtr &compute_graph);
Status RemoveIsolatedConstInThisGraph(ge::ComputeGraphPtr &compute_graph);
Status OptimizeStage1(ComputeGraphPtr &compute_graph);
Status OptimizeStage2(ComputeGraphPtr &compute_graph);
Status SubexpressionMigration(ComputeGraphPtr &compute_graph);
Status LoadGraphAsync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node);
Status CheckAndReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node);
bool CheckModelLoad(const GeRootModelPtr &ge_model, bool load_flag);
Status LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node);
bool IsGraphNeedBuild(const GraphNodePtr &graph_node);
Status LoadFromCache(const GraphNodePtr &graph_node, const ModelCacheHelperPtr &cache_helper, GeModelPtr &ge_model);
Status SaveCacheBeforeBuild(uint32_t graph_id, const ModelCacheHelperPtr &cache_helper);
Status SaveCacheAfterBuild(uint32_t graph_id, ComputeGraphPtr graph, GeModelPtr &ge_model);
void AddModelCacheHelperToMap(const GraphId &graph_id, uint64_t session_id, ComputeGraphPtr &compute_graph);
Status IncreBuild(const GraphNodePtr &graph_node, GeModelPtr &ge_model);
void RemoveModelCacheHelper(const GraphId &graph_id);
ModelCacheHelperPtr FindModelCacheHelper(GraphId graph_id);
static void ConstructGeInput(const std::vector<InputTensorInfo> &inputs, std::vector<GeTensor> &ge_inputs);
static void PreRunThread(GraphManager *graph_manager);
static void RunThread(GraphManager *graph_manager);
static void StopQueue(GraphManager *graph_manager);
static void ReturnError(GraphManager *graph_manager, RunAsyncCallback callback, Status ret, const string &log);
static void ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_node, RunAsyncCallback callback,
Status ret, const string &log);
void ChangeConstTypeWhenTraining(const ComputeGraphPtr &compute_graph);
Status PreRunOptimizeOriginalGraph(const GraphNodePtr &graph_node, const std::vector<GeTensor> &inputs,
ge::ComputeGraphPtr &compute_graph, uint64_t session_id);
Status PreRunOptimizeSubGraph(const GraphNodePtr &graph_node,
ge::ComputeGraphPtr &compute_graph,
uint64_t session_id);
Status PreRunAfterOptimizeSubGraph(const GraphNodePtr &graph_node,
ComputeGraphPtr &compute_graph,
GeRootModelPtr &ge_root_model,
uint64_t session_id);
Status SetFuzzCompileFlag(ComputeGraphPtr &compute_graph);
Status CopySubGraphAndMarkFusion(const ComputeGraphPtr &compute_graph,
Graph2SubGraphInfoList &sub_graph_map,
std::unordered_map<std::string, ComputeGraphPtr> &copy_graphs);
Status OptimizeSubGraphWithMultiThreads(ComputeGraphPtr compute_graph,
Graph2SubGraphInfoList &sub_graph_map,
uint64_t session_id);
bool CheckAllFusionOptimizeSuccess(const ComputeGraphPtr &compute_graph, Graph2SubGraphInfoList &sub_graph_map);
Status ReplaceSubgraphWithOriGraph(const ComputeGraphPtr &compute_graph,
Graph2SubGraphInfoList &sub_graph_map,
std::unordered_map<std::string, ComputeGraphPtr> &copy_graphs);
Status SetRtContext(rtContext_t rt_context, rtCtxMode_t mode, uint64_t session_id, uint32_t graph_id);
void AddLocalOmgContext(GraphId graph_id, const OmgContext &omg_context);
void UpdateLocalOmgContext(GraphId graph_id);
CompilerStages &GetCompilerStages(GraphId graph_id);
void RemoveCompilerStages(GraphId graph_id);
std::atomic_bool thread_run_flag_;
BlockingQueue<PreRunArgs> prerun_args_q_{};
BlockingQueue<RunArgs> run_args_q_{};
std::thread prerun_thread_;
std::thread run_thread_;
ComputeGraphPtr compute_graph_;
std::map<GraphId, GraphNodePtr> graph_map_;
std::map<GraphId, ModelCacheHelperPtr> cache_helper_map_;
// for run graph synchronous return
std::mutex sync_run_mutex_;
std::condition_variable condition_;
// run graph synchronization call back listener
std::shared_ptr<GraphModelListener> graph_run_listener_;
// summary and checkpoint callback function list for ME, key is summary or checkpoint
std::map<std::string, std::function<Status(uint32_t, const std::map<std::string, ge::Tensor> &)>> me_callback_map_;
std::map<std::string, std::function<Status(uint32_t, const std::map<AscendString, ge::Tensor> &)>> callback_map_;
bool init_flag_;
GraphManagerOptions options_;
GraphContextPtr graph_context_ = nullptr;
map<GraphId, OmgContext> omg_contexts_;
map<GraphId, CompilerStages> compiler_stages_;
GraphExecutor graph_executor_;
VarAccelerateCtrl var_acc_ctrl_;
std::mutex run_mutex_;
std::mutex member_mutex_;
std::mutex unload_model_mutex_;
};
} // namespace ge
#endif // GE_GRAPH_MANAGER_GRAPH_MANAGER_H_