/** * Copyright 2020 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef GE_GRAPH_BUILD_LOGICAL_STREAM_ALLOCATOR_H_ #define GE_GRAPH_BUILD_LOGICAL_STREAM_ALLOCATOR_H_ #include #include #include #include #include #include #include "engine_manager/dnnengine_manager.h" #include "graph/manager/graph_manager_utils.h" namespace ge { // Define default fuctions for stream passes. #define STREAM_PASS_DEFAULT_FUNC(CLASS) \ CLASS() : LogicalStreamPass(#CLASS) {} \ ~CLASS() override = default; \ CLASS(const CLASS &) = delete; \ CLASS &operator=(const CLASS &) = delete static const int64_t kInvalidStream = -1; // Base stream class. class LogicalStreamPass { public: static const int64_t kDefaultMaxParalleNum = 1; struct Subgraph; using SubgraphPtr = std::shared_ptr; struct Subgraph { string name; int64_t stream_id = kInvalidStream; const SubGraphInfo &subgraph_info; const EngineConf &engine_conf; int64_t max_parallel_num = kDefaultMaxParalleNum; SubgraphPtr reused_subgraph = nullptr; Subgraph(const SubGraphInfo &subgraph_info, const EngineConf &engine_conf) : subgraph_info(subgraph_info), engine_conf(engine_conf) {} }; struct Context { int64_t default_stream = kInvalidStream; int64_t next_stream = 0; bool enable_single_stream = false; bool enable_hcom_parallel = false; }; explicit LogicalStreamPass(const std::string &name); LogicalStreamPass(const LogicalStreamPass &) = delete; LogicalStreamPass &operator=(const LogicalStreamPass &) = delete; virtual ~LogicalStreamPass() = default; const std::string &GetName() const; virtual Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) = 0; protected: bool IsEngineSkip(const Subgraph &subgraph) const; bool IsEngineAttach(const Subgraph &subgraph) const; bool IsEngineIndependent(const Subgraph &subgraph) const; bool HasStreamLabel(const Subgraph &subgraph) const; bool HasAssignedStream(const Subgraph &subgraph) const; private: std::string name_; }; using LogicalStreamPassPtr = std::shared_ptr; // Allocate streams by label. class AssignByLabelPass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(AssignByLabelPass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; }; // Engines such as hccl require independent Stream. class IndependentStreamPass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(IndependentStreamPass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; }; // Reuse streams or assign new streams based on dependencies. class AssignByDependencyPass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(AssignByDependencyPass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; private: void InitEndSubgraphMap(const std::vector &subgraphs, std::map &end_subgraph_map); void InitPldSubgraphMap(const std::vector &subgraphs, std::map &pld_subgraph_map); SubgraphPtr GetReusableSubgraph(const SubgraphPtr &subgraph, const std::map &end_subgraph_map, const std::map &pld_subgraph_map); int64_t AssignNewStream(SubgraphPtr subgraph); void UpdateAssignedSubgraphs(Context &context); void UpdateReusedSubgraphs(); bool CouldReuse(const SubgraphPtr &subgraph, const SubgraphPtr &pred_subgraph, const std::map &pld_subgraph_map); // std::map engine_next_streams_; // std::map engine_stream_num_; // Subgraphs of assign stream by engine std::vector assigned_subgraphs_; // std::vector> reused_subgraphs_; }; // All nodes in the graph are assigned the same stream. class SingleStreamPass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(SingleStreamPass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; }; // Update the stream of subgraphs to nodes. class NodeStreamUpdatePass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(NodeStreamUpdatePass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; }; // Update the stream of subgraphs to nodes. class UpdateForSkippedEnginePass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(UpdateForSkippedEnginePass); /// Optimize for case like: /// NodeA(stream1) -> Const(stream2) -> NodeB(stream1) /// To case: /// NodeA(stream1) -> Const(stream1) -> NodeB(stream1) /// Which could reduce event number (Const could be other type which belong to skipped engine subgraph) Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; private: int64_t GetSingleInoutStream(const NodePtr &node) const; // Judge if all predecessors' streams of node are kInvalidStream bool AreAllPredStreamsInvalid(const NodePtr &node) const; }; // AllReduce and backward operators execute in parallel. class AllReduceParallelPass : public LogicalStreamPass { public: STREAM_PASS_DEFAULT_FUNC(AllReduceParallelPass); Status Run(ComputeGraphPtr graph, const std::vector &subgraphs, Context &context) override; private: bool IsHcomNode(const std::string& node_type); }; // Assign logical streams which is not limited by the number of tasks. class LogicalStreamAllocator { using Subgraph = LogicalStreamPass::Subgraph; using SubgraphPtr = LogicalStreamPass::SubgraphPtr; using Context = LogicalStreamPass::Context; public: LogicalStreamAllocator(const std::map &scheduler_confs, const std::map &max_parallel_num); LogicalStreamAllocator(const LogicalStreamAllocator &) = delete; LogicalStreamAllocator &operator=(const LogicalStreamAllocator &) = delete; ~LogicalStreamAllocator() = default; void EnableSingleStream(bool enable); void EnableHcomParallel(bool hcom_parallel); Status Assign(const ComputeGraphPtr &root_graph, const Graph2SubGraphInfoList &subgraph_map, int64_t &stream_num); private: Status DoAssign(const ComputeGraphPtr &graph, const Graph2SubGraphInfoList &subgraph_map, const map &engine_confs); Status ConvertSubgraphs(const std::vector &subgraph_infos, const std::map &engine_confs, std::vector &subgraphs); Status RunPasses(const ComputeGraphPtr &graph, const std::vector &subgraphs); void RefreshContinuousStreams(const ComputeGraphPtr &graph); const std::map &scheduler_confs_; const std::map &max_parallel_num_; Context context_; }; } // namespace ge #endif // GE_GRAPH_BUILD_LOGICAL_STREAM_ALLOCATOR_H_