From aa13d6b1cdc8f5f45d95a9708708c69a9d1591ae Mon Sep 17 00:00:00 2001 From: Xiaoda Zhang Date: Sat, 7 Nov 2020 14:21:38 +0800 Subject: [PATCH] support for-loop in auto-parallel --- .../parallel/auto_parallel/graph_costmodel.cc | 9 ++ .../parallel/auto_parallel/graph_costmodel.h | 1 + .../frontend/parallel/costmodel_context.cc | 3 + .../frontend/parallel/costmodel_context.h | 7 + .../parallel/graph_util/graph_info.cc | 23 +++ .../frontend/parallel/graph_util/graph_info.h | 2 + .../frontend/parallel/ops_info/ops_utils.h | 1 + .../frontend/parallel/step_auto_parallel.cc | 113 ++++++++++++++- mindspore/ccsrc/pipeline/jit/init.cc | 6 + mindspore/ccsrc/pipeline/jit/pipeline.cc | 10 ++ mindspore/ccsrc/pipeline/jit/pipeline.h | 3 + mindspore/common/api.py | 4 + mindspore/parallel/_cost_model_context.py | 42 +++++- .../parallel/test_auto_parallel_arithmetic.py | 16 +-- .../parallel/test_auto_parallel_cast.py | 10 +- .../parallel/test_auto_parallel_for_loop.py | 129 +++++++++++++++++ ...t_auto_parallel_for_loop_multi_subgraph.py | 136 ++++++++++++++++++ .../test_auto_parallel_for_loop_simplify.py | 101 +++++++++++++ .../parallel/test_auto_parallel_transpose.py | 8 +- .../parallel/test_auto_parallel_two_matmul.py | 9 ++ 20 files changed, 611 insertions(+), 22 deletions(-) create mode 100644 tests/ut/python/parallel/test_auto_parallel_for_loop.py create mode 100644 tests/ut/python/parallel/test_auto_parallel_for_loop_multi_subgraph.py create mode 100644 tests/ut/python/parallel/test_auto_parallel_for_loop_simplify.py diff --git a/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.cc b/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.cc index e3b9d7f9c1..4965751164 100644 --- a/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.cc +++ b/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.cc @@ -43,6 +43,7 @@ int64_t RUN_PHASE = DEFAULT_RUN_PHASE; bool TRIANGLE_STAR_STRATEGY_OVERWRITE = DEFAULT_TRIANGLE_STAR_STRATEGY_OVERWRITE; bool DP_ALGO_ENABLE_APPROX = DEFAULT_DP_ALGO_ENABLE_APPROX; double DP_ALGO_APPROX_EPSILON = DEFAULT_DP_ALGO_APPROX_EPSILON; +bool DP_ALGO_SINGLE_LOOP = DEFAULT_DP_ALGO_SINGLE_LOOP; void CostGraph::SetDeviceMemoryAndCostParameter() { MS_EXCEPTION_IF_NULL(CostModelContext::GetInstance()); @@ -187,6 +188,14 @@ void CostGraph::SetDeviceMemoryAndCostParameter() { } DP_ALGO_APPROX_EPSILON = epsilon; MS_LOG(INFO) << "epsilon: " << epsilon << "."; + + auto single_loop = CostModelContext::GetInstance()->dp_algo_single_loop(); + DP_ALGO_SINGLE_LOOP = single_loop; + if (single_loop) { + MS_LOG(INFO) << "dp_algo_single_loop: true."; + } else { + MS_LOG(INFO) << "dp_algo_single_loop: false."; + } } void CostGraph::RemoveOperator(const OperatorInfoPtr &op) { diff --git a/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.h b/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.h index 4547ab290e..5721f7f786 100644 --- a/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.h +++ b/mindspore/ccsrc/frontend/parallel/auto_parallel/graph_costmodel.h @@ -49,6 +49,7 @@ extern bool DP_ALGO_ENABLE_APPROX; extern double DP_ALGO_APPROX_EPSILON; extern int64_t RUN_PHASE; extern bool TRIANGLE_STAR_STRATEGY_OVERWRITE; +extern bool DP_ALGO_SINGLE_LOOP; class CostGraph { // 'CostGraph' consists of Operators and edges between them. An edge is created between two Operators if they have diff --git a/mindspore/ccsrc/frontend/parallel/costmodel_context.cc b/mindspore/ccsrc/frontend/parallel/costmodel_context.cc index e769d13dec..183f8b2627 100644 --- a/mindspore/ccsrc/frontend/parallel/costmodel_context.cc +++ b/mindspore/ccsrc/frontend/parallel/costmodel_context.cc @@ -56,6 +56,7 @@ void CostModelContext::ResetCostModel() { costmodel_allreduce_fusion_allreduce_bandwidth_ = DEFAULT_COST_MODEL_ALLREDUCE_FUSION_ALLREDUCE_BANDWIDTH; costmodel_allreduce_fusion_computation_time_parameter_ = DEFAULT_COST_MODEL_ALLREDUCE_FUSION_COMPUTATION_TIME_PARAMETER; + dp_algo_single_loop_ = DEFAULT_DP_ALGO_SINGLE_LOOP; } void CostModelContext::ResetAlgoParameters() { @@ -146,6 +147,8 @@ void CostModelContext::set_triangle_star_strategy_overwrite(bool overwrite) { void CostModelContext::set_run_phase(int64_t phase) { run_phase_ = phase; } +void CostModelContext::set_dp_algo_single_loop(bool single_loop) { dp_algo_single_loop_ = single_loop; } + struct CostRegister { CostRegister() { MsContext::device_seter([](const std::string &device_target) { diff --git a/mindspore/ccsrc/frontend/parallel/costmodel_context.h b/mindspore/ccsrc/frontend/parallel/costmodel_context.h index 92f961bf51..c3dc0a479b 100644 --- a/mindspore/ccsrc/frontend/parallel/costmodel_context.h +++ b/mindspore/ccsrc/frontend/parallel/costmodel_context.h @@ -47,6 +47,7 @@ namespace parallel { #define DEFAULT_TRIANGLE_STAR_STRATEGY_OVERWRITE true; #define DEFAULT_DP_ALGO_ENABLE_APPROX false #define DEFAULT_DP_ALGO_APPROX_EPSILON 0.1 +#define DEFAULT_DP_ALGO_SINGLE_LOOP true class CostModelContext { public: @@ -149,6 +150,9 @@ class CostModelContext { void set_dp_algo_enable_approxi(bool); bool dp_algo_enable_approxi() const { return dp_algo_enable_approxi_; } + void set_dp_algo_single_loop(bool); + bool dp_algo_single_loop() const { return dp_algo_single_loop_; } + private: CostModelContext(); static std::shared_ptr cm_context_inst_; @@ -190,6 +194,9 @@ class CostModelContext { // When APPROXIMATION is enabled in the DP algorithm, the 'epsilon' value used in the APPROXIMATION. double dp_algo_approxi_epsilon_; + // Whether to generate a single suite of OperatorInfo for a loop. + bool dp_algo_single_loop_; + int64_t run_phase_; // 0: 'training', 1: 'inference' int64_t costmodel_allreduce_fusion_algorithm_; diff --git a/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.cc b/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.cc index c6d8aeb22a..84d5d9ad99 100644 --- a/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.cc +++ b/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.cc @@ -14,12 +14,14 @@ * limitations under the License. */ +#include #include "frontend/parallel/graph_util/graph_info.h" #include "debug/anf_ir_dump.h" #include "debug/anf_ir_utils.h" #include "debug/draw.h" #include "utils/ms_context.h" #include "ir/graph_utils.h" +#include "pipeline/jit/pipeline.h" namespace mindspore { namespace parallel { @@ -50,5 +52,26 @@ void DumpGraph(const FuncGraphPtr &root, const std::string &name) { ExportIR(name + ".dat", "0", root); } } + +// Return true if the cnode is in a for-loop and loop_index indicates the i-th loop; +// otherwise return false +bool GetLoopIndexFromCNode(const CNodePtr &cnode, size_t *loop_index) { + std::regex pattern(CELLLIST_KEYWORD_PATTERN); + std::smatch result; + const auto &cnode_fullname = cnode->fullname_with_scope(); + if (std::regex_search(cnode_fullname, result, pattern)) { + if (result.length() < 2) { + MS_LOG(EXCEPTION) << "Wrong format of fullname_with_scope: " << cnode_fullname; + } + *loop_index = std::stoi(result[1]); + return true; + } + return false; +} + +void SetOpsNumToExecutor(size_t num_ops) { + auto executor = pipeline::ExecutorPy::GetInstance(); + executor->SetNumOpsInfo(num_ops); +} } // namespace parallel } // namespace mindspore diff --git a/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.h b/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.h index a8837fe265..6f502d62b2 100644 --- a/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.h +++ b/mindspore/ccsrc/frontend/parallel/graph_util/graph_info.h @@ -26,6 +26,8 @@ namespace mindspore { namespace parallel { std::vector FindPrimtive(const FuncGraphPtr &graph, const std::string &name); void DumpGraph(const FuncGraphPtr &root, const std::string &name); +bool GetLoopIndexFromCNode(const CNodePtr &cnode, size_t *loop_index); +void SetOpsNumToExecutor(size_t); } // namespace parallel } // namespace mindspore diff --git a/mindspore/ccsrc/frontend/parallel/ops_info/ops_utils.h b/mindspore/ccsrc/frontend/parallel/ops_info/ops_utils.h index 6338b08384..1793c20055 100644 --- a/mindspore/ccsrc/frontend/parallel/ops_info/ops_utils.h +++ b/mindspore/ccsrc/frontend/parallel/ops_info/ops_utils.h @@ -146,6 +146,7 @@ constexpr char FIELD_SIZE[] = "field_size"; constexpr char OPTIMIZER_SUB_STRING[] = "optimizer"; constexpr char DEVICE[] = "Device"; constexpr char PARALLEL_OPTIMIZER_ALLGATHER[] = "parallel_optimizer_allgather"; +constexpr char CELLLIST_KEYWORD_PATTERN[] = "-CellList/(\\d+)-"; // Operator constexpr char VIRTUAL_DIV[] = "_VirtualDiv"; diff --git a/mindspore/ccsrc/frontend/parallel/step_auto_parallel.cc b/mindspore/ccsrc/frontend/parallel/step_auto_parallel.cc index 26ce63c4fc..454e4ff431 100644 --- a/mindspore/ccsrc/frontend/parallel/step_auto_parallel.cc +++ b/mindspore/ccsrc/frontend/parallel/step_auto_parallel.cc @@ -38,6 +38,7 @@ #include "frontend/parallel/auto_parallel/rec_core/rec_partition.h" #include "frontend/parallel/context.h" #include "frontend/parallel/graph_util/node_info.h" +#include "frontend/parallel/graph_util/graph_info.h" #include "frontend/parallel/ops_info/reshape_info.h" #include "frontend/parallel/ops_info/tmp_identity_info.h" #include "frontend/parallel/step_parallel.h" @@ -346,6 +347,39 @@ bool IsAutoParallelCareNode(const CNodePtr &cnode) { return IsParallelCareNode(cnode) && IsSplittableOperator(prim->name()); } +// Recording the operators appearing in a for-loop. +// Currently, we assume that the operators in different for-loops are identical, and their traversal +// orderings are also identical. +// Therefore, we create OperatorInfo objects for the operators in a loop (say, loop-3), and reuse them in +// the rest of loops (loop-2, loop-1 and loop-0) +std::set ops_in_a_loop_; +// Whether two operators are in different loops; if it is true, then return true. +// If at least one of the two operators is not in the loop, then return false. +// If two operators are in the same loop, the return false. +bool IsOperatorsInTwoSeparateLoops(const CNodePtr &a_cnode, const CNodePtr &b_cnode) { + auto a_op_info = a_cnode->user_data(); + MS_EXCEPTION_IF_NULL(a_op_info); + auto b_op_info = b_cnode->user_data(); + MS_EXCEPTION_IF_NULL(b_op_info); + if ((ops_in_a_loop_.find(a_op_info->name()) == ops_in_a_loop_.end()) || + (ops_in_a_loop_.find(b_op_info->name()) == ops_in_a_loop_.end())) { + return false; + } + size_t a_loop_index = 0, b_loop_index = 0; + const auto &a_fullname = a_cnode->fullname_with_scope(); + if (!GetLoopIndexFromCNode(a_cnode, &a_loop_index)) { + MS_LOG(EXCEPTION) << "The operator with fullname_with_scope: " << a_fullname << " was not included in the set."; + } + const auto &b_fullname = b_cnode->fullname_with_scope(); + if (!GetLoopIndexFromCNode(b_cnode, &b_loop_index)) { + MS_LOG(EXCEPTION) << "The operator with fullname_with_scope: " << b_fullname << " was not included in the set."; + } + if (a_loop_index == b_loop_index) { + return false; + } + return true; +} + OperatorInfoPtr CreateTheOperatorInfo(const PrimitivePtr &prim, const CNodePtr &cnode, StrategyMap *stra_map) { MS_EXCEPTION_IF_NULL(prim); MS_EXCEPTION_IF_NULL(cnode); @@ -460,6 +494,10 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector &all_node entire_costgraph->SetDeviceMemoryAndCostParameter(); // The map from CNode's UniqueId to its operatorInfo std::map from_cnode_to_info; + // The operator_infos in a loop + std::vector operators_in_forloop; + // Key: i-th loop; Value: index of 'operators_in_forloop' + std::map loop_to_ops; // extract strategy from checkpoint for multi-train StrategyMap stra_map; if (StrategyCheckpoint::GetInstance().LoadCheckPointOn()) { @@ -491,6 +529,27 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector &all_node auto search_cnode = from_cnode_to_info.find(cnode->UniqueId()); if (search_cnode == from_cnode_to_info.end()) { + size_t loop_index = 0; + bool is_in_loop = GetLoopIndexFromCNode(cnode, &loop_index); + if (DP_ALGO_SINGLE_LOOP && is_in_loop && (loop_to_ops[loop_index] < operators_in_forloop.size())) { + const auto ¤t_op_ptr = operators_in_forloop[loop_to_ops[loop_index]]; + bool is_find_wrong = (current_op_ptr->name().find(VIRTUAL_DATA_SET_INFO) == std::string::npos) && + (current_op_ptr->name().find(BATCH_PARALLEL) == std::string::npos) && + (current_op_ptr->name().find(prim->name()) == std::string::npos); + if (is_find_wrong) { + MS_LOG(EXCEPTION) << "The OperatorInfo: " << current_op_ptr->name() + << " does not match the Prim: " << prim->name() + << ". The fullname_with_scope: " << cnode->fullname_with_scope(); + } + loop_to_ops[loop_index]++; + cnode->set_user_data(current_op_ptr); + MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() + << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() + << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() + << " is set OperatorInfo: " << current_op_ptr->name() << ", Primitive: " << prim->name(); + (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueId(), current_op_ptr)); + continue; + } auto operator_info = CreateTheOperatorInfo(prim, cnode, &stra_map); if (operator_info == nullptr) { return FAILED; @@ -503,8 +562,14 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector &all_node cnode->set_user_data(operator_info); MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() + << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() << " is set OperatorInfo: " << operator_info->name() << ", Primitive: " << prim->name(); - (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), operator_info)); + (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueId(), operator_info)); + if (DP_ALGO_SINGLE_LOOP && is_in_loop) { + operators_in_forloop.push_back(operator_info); + ops_in_a_loop_.insert(operator_info->name()); + loop_to_ops[loop_index]++; + } // Needed by rec_parser entire_costgraph->add_inputs_tensor_name(inputs_tensor_name); } else { @@ -526,6 +591,10 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector &all_no entire_costgraph->SetDeviceMemoryAndCostParameter(); // The map from CNode's UniqueIdThroughCopy to its operatorInfo std::map from_cnode_to_info; + // The operator_infos in a loop + std::vector operators_in_forloop; + // Key: i-th loop; Value: index of 'operators_in_forloop' + std::map loop_to_ops; // extract strategy from checkpoint for multi-train StrategyMap stra_map; if (StrategyCheckpoint::GetInstance().LoadCheckPointOn()) { @@ -556,6 +625,27 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector &all_no // Find the operatorInfo if it exists auto search_cnode = from_cnode_to_info.find(cnode->UniqueIdThroughCopy()); if (search_cnode == from_cnode_to_info.end()) { + size_t loop_index = 0; + bool is_in_loop = GetLoopIndexFromCNode(cnode, &loop_index); + if (DP_ALGO_SINGLE_LOOP && is_in_loop && (loop_to_ops[loop_index] < operators_in_forloop.size())) { + const auto ¤t_op_ptr = operators_in_forloop[loop_to_ops[loop_index]]; + bool is_find_wrong = (current_op_ptr->name().find(VIRTUAL_DATA_SET_INFO) == std::string::npos) && + (current_op_ptr->name().find(BATCH_PARALLEL) == std::string::npos) && + (current_op_ptr->name().find(prim->name()) == std::string::npos); + if (is_find_wrong) { + MS_LOG(EXCEPTION) << "The OperatorInfo: " << current_op_ptr->name() + << " does not match the Prim: " << prim->name() + << ". The fullname_with_scope: " << cnode->fullname_with_scope(); + } + loop_to_ops[loop_index]++; + cnode->set_user_data(current_op_ptr); + MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() + << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() + << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() + << " is set OperatorInfo: " << current_op_ptr->name() << ", Primitive: " << prim->name(); + (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), current_op_ptr)); + continue; + } // In this case, the corresponding OperatorInfo is not created, create the new one. auto operator_info = CreateTheOperatorInfo(prim, cnode, &stra_map); if (operator_info == nullptr) { @@ -569,8 +659,14 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector &all_no cnode->set_user_data(operator_info); MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() + << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() << " is set OperatorInfo: " << operator_info->name() << ", Primitive: " << prim->name(); (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), operator_info)); + if (DP_ALGO_SINGLE_LOOP && is_in_loop) { + operators_in_forloop.push_back(operator_info); + ops_in_a_loop_.insert(operator_info->name()); + loop_to_ops[loop_index]++; + } // Needed by rec_parser entire_costgraph->add_inputs_tensor_name(inputs_tensor_name); } else { @@ -642,7 +738,12 @@ void ConstructCostGraphEdges(const std::vector &all_nodes) { } EdgePtr edge_ptr; MS_LOG(INFO) << "Creating edge: " << edge_name; - + if (IsOperatorsInTwoSeparateLoops(prev_cnode, cnode)) { + MS_LOG(INFO) << "prev_cnode_fullname: " << prev_cnode->fullname_with_scope() + << ", cnode_fullname: " << cnode->fullname_with_scope(); + MS_LOG(INFO) << "The two operators in two separate for-loops, thus skip the edge."; + break; + } bool follow_strategy = (prim->name() == RESHAPE) || (prev_prim->name() == RESHAPE) || (ELEMENTWISE_OP_STRA_FOLLOW && IsElementWiseOperator(prev_prim->name())); if (follow_strategy) { @@ -1044,8 +1145,11 @@ Status ParallelStrategySearch(const std::vector &all_nodes, const Fu // Step 3: Augment the costgraph. AugmentCostGraph(all_nodes); - MS_LOG(INFO) << "After the augmenting procedure, there are " << entire_costgraph->GetOperators().size() - << " operators, and " << entire_costgraph->GetNumEdges() << " edges."; + auto num_ops = entire_costgraph->GetOperators().size(); + SetOpsNumToExecutor(num_ops); + auto num_edges = entire_costgraph->GetNumEdges(); + MS_LOG(INFO) << "After the augmenting procedure, there are " << num_ops << " operators, and " << num_edges + << " edges."; // Step 3.1: Calculate the memory usage if (entire_costgraph->CalculateMemoryCost() != SUCCESS) { @@ -1071,6 +1175,7 @@ Status ParallelStrategySearch(const std::vector &all_nodes, const Fu MS_LOG(INFO) << op->name() << " : The strategy is:"; PrintStrategy(s_strategy); } + ops_in_a_loop_.clear(); return SUCCESS; } diff --git a/mindspore/ccsrc/pipeline/jit/init.cc b/mindspore/ccsrc/pipeline/jit/init.cc index 952e114c57..a6b677a50a 100644 --- a/mindspore/ccsrc/pipeline/jit/init.cc +++ b/mindspore/ccsrc/pipeline/jit/init.cc @@ -82,6 +82,8 @@ PYBIND11_MODULE(_c_expression, m) { "Get Parameter Tensor Layout Dictionary.") .def("get_strategy", &ExecutorPy::GetCNodeStrategy, py::arg("phase") = py::str("train"), "Get CNode Strategy Dictionary.") + .def("get_num_parallel_ops", &ExecutorPy::GetNumOpsInfo, py::arg("phase") = py::str("train"), + "Get the number of parallel operators.") .def("get_allreduce_fusion", &ExecutorPy::GetAllreduceFusion, py::arg("phase") = py::str("train"), "Get Allreduce Fusion Dictionary.") .def("fetch_info_for_quant_export", &ExecutorPy::FetchInfoForQuantExport, py::arg("phase") = py::str("train"), @@ -254,6 +256,10 @@ PYBIND11_MODULE(_c_expression, m) { "Set the epsilon which is used in the approximation of DP algorithm.") .def("get_dp_algo_approxi_epsilon", &CostModelContext::dp_algo_approxi_epsilon, "Get the epsilon which is used in the approximation of DP algorithm.") + .def("set_dp_algo_single_loop", &CostModelContext::set_dp_algo_single_loop, + "Set the flag of generating a single suite of OperatorInfos in for-loop.") + .def("get_dp_algo_single_loop", &CostModelContext::dp_algo_single_loop, + "Get the flag of whether or not generating a single suite of OperatorInfos in for-loop.") .def("reset_cost_model", &CostModelContext::ResetCostModel, "Reset the CostModelContext.") .def("reset_algo_parameters", &CostModelContext::ResetAlgoParameters, "Reset the AlgoParameters."); diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.cc b/mindspore/ccsrc/pipeline/jit/pipeline.cc index 5d65a2ff57..85ac39affa 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline.cc @@ -252,6 +252,16 @@ void ExecutorPy::SetCNodeStrategy(const std::string &name, const parallel::Strat stra_dict_[phase_][py::str(name)] = strategy; } +size_t ExecutorPy::GetNumOpsInfo(const std::string &phase) { + MS_LOG(DEBUG) << "GetNumOpsInfo!"; + return phase_to_num_op_info_[phase]; +} + +void ExecutorPy::SetNumOpsInfo(size_t num_ops) { + MS_LOG(DEBUG) << "SetNumOpsInfo!"; + phase_to_num_op_info_[phase_] = num_ops; +} + py::dict ExecutorPy::GetAllreduceFusion(const std::string &phase) { MS_LOG(INFO) << "GetAllreduceFusion!"; auto graph = GetFuncGraph(phase); diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.h b/mindspore/ccsrc/pipeline/jit/pipeline.h index 642ba99fb7..2e7502ec90 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.h +++ b/mindspore/ccsrc/pipeline/jit/pipeline.h @@ -93,6 +93,8 @@ class ExecutorPy : public std::enable_shared_from_this { py::dict GetParameterLayout(const std::string &phase); py::dict GetCNodeStrategy(const std::string &phase); void SetCNodeStrategy(const std::string &name, const parallel::Strategys &strategy); + size_t GetNumOpsInfo(const std::string &phase); + void SetNumOpsInfo(size_t); py::dict GetAllreduceFusion(const std::string &phase); void DelNetRes(const std::string &id); void ReleaseResource(const py::object &phase); @@ -117,6 +119,7 @@ class ExecutorPy : public std::enable_shared_from_this { static bool debugger_terminate_; std::map stra_dict_; std::string phase_ = ""; + std::map phase_to_num_op_info_; }; using ExecutorPyPtr = std::shared_ptr; diff --git a/mindspore/common/api.py b/mindspore/common/api.py index 1ae058e1a6..16c4a10a4d 100644 --- a/mindspore/common/api.py +++ b/mindspore/common/api.py @@ -455,6 +455,10 @@ class _Executor: real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) return self._executor.get_strategy(real_phase) + def _get_num_parallel_ops(self, obj): + real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) + return self._executor.get_num_parallel_ops(real_phase) + def _get_allreduce_fusion(self, obj): real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) return self._executor.get_allreduce_fusion(real_phase) diff --git a/mindspore/parallel/_cost_model_context.py b/mindspore/parallel/_cost_model_context.py index 549efb2005..2b406b7a8a 100644 --- a/mindspore/parallel/_cost_model_context.py +++ b/mindspore/parallel/_cost_model_context.py @@ -266,6 +266,31 @@ class _CostModelContext: raise ValueError("Context handle is none in context!!!") return self._context_handle.get_run_phase() + def set_dp_algo_single_loop(self, single_loop): + """ + Set the flag of generating a single suite of OperatorInfos in for-loop. + + Args: + single_loop (bool): The parameter for the single loop flag. + + Raises: + ValueError: If context handle is none. + """ + if self._context_handle is None: + raise ValueError("Context handle is none in context!!!") + self._context_handle.set_dp_algo_single_loop(single_loop) + + def get_dp_algo_single_loop(self): + """ + Get the flag of whether or not generating a single suite of OperatorInfos in for-loop. + + Raises: + ValueError: If context handle is none. + """ + if self._context_handle is None: + raise ValueError("Context handle is none in context!!!") + return self._context_handle.get_dp_algo_single_loop() + def set_costmodel_allreduce_fusion_algorithm(self, algorithm): """ Set costmodel allreduce fusion algorithm. @@ -602,4 +627,19 @@ def _get_multi_subgraphs(): """ Get the flag of ANF graph containing multiple subgraphs. """ - cost_model_context().get_multi_subgraphs() + return cost_model_context().get_multi_subgraphs() + +def _set_algo_single_loop(single_loop=True): + """ + Set the flag of generating a single suite of OperatorInfos in for-loop. + + Args: + single_loop (bool): The parameter for the single loop flag. + """ + cost_model_context().set_dp_algo_single_loop(single_loop) + +def _get_algo_single_loop(): + """ + Get the flag of whether or not generating a single suite of OperatorInfos in for-loop. + """ + return cost_model_context().get_dp_algo_single_loop() diff --git a/tests/ut/python/parallel/test_auto_parallel_arithmetic.py b/tests/ut/python/parallel/test_auto_parallel_arithmetic.py index 51413f98e5..fa9a601171 100644 --- a/tests/ut/python/parallel/test_auto_parallel_arithmetic.py +++ b/tests/ut/python/parallel/test_auto_parallel_arithmetic.py @@ -78,8 +78,8 @@ def test_auto_parallel_arithmetic(): b = Tensor(np.ones([64, 128]), dtype=ms.float32) compile_net(net, x, y, b, phase='train') strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/FloorDiv-op1': [[2, 4], [2, 4]], - 'Default/network-Net/MatMul-op0': [[2, 1], [1, 4]]} + expected_strategies = {'Default/network-Net/FloorDiv-op0': [[2, 4], [2, 4]], + 'Default/network-Net/MatMul-op1': [[2, 1], [1, 4]]} assert strategies == expected_strategies @@ -105,8 +105,8 @@ def test_auto_parallel_arithmetic_broadcast_both(): b = Tensor(np.ones([1, 64]), dtype=ms.float32) compile_net(net, x, y, b, phase='train') strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/FloorDiv-op1': [[8, 1], [1, 1]], - 'Default/network-Net/MatMul-op0': [[8, 1], [1, 1]]} + expected_strategies = {'Default/network-Net/FloorDiv-op0': [[8, 1], [1, 1]], + 'Default/network-Net/MatMul-op1': [[8, 1], [1, 1]]} assert strategies == expected_strategies @@ -132,8 +132,8 @@ def test_auto_parallel_arithmetic_broadcast_right(): b = Tensor(np.ones([32]), dtype=ms.float32) compile_net(net, x, y, b, phase='train') strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/FloorDiv-op1': [[4, 2], [2]], - 'Default/network-Net/MatMul-op0': [[4, 1], [1, 2]]} + expected_strategies = {'Default/network-Net/FloorDiv-op0': [[4, 2], [2]], + 'Default/network-Net/MatMul-op1': [[4, 1], [1, 2]]} assert strategies == expected_strategies @@ -159,6 +159,6 @@ def test_auto_parallel_arithmetic_broadcast_left(): b = Tensor(np.ones([128, 64, 32]), dtype=ms.float32) compile_net(net, x, y, b, phase="train") strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/FloorDiv-op1': [[4, 2], [1, 4, 2]], - 'Default/network-Net/MatMul-op0': [[4, 1], [1, 2]]} + expected_strategies = {'Default/network-Net/FloorDiv-op0': [[4, 2], [1, 4, 2]], + 'Default/network-Net/MatMul-op1': [[4, 1], [1, 2]]} assert strategies == expected_strategies diff --git a/tests/ut/python/parallel/test_auto_parallel_cast.py b/tests/ut/python/parallel/test_auto_parallel_cast.py index 4dee5c42de..a67bf7f9eb 100644 --- a/tests/ut/python/parallel/test_auto_parallel_cast.py +++ b/tests/ut/python/parallel/test_auto_parallel_cast.py @@ -84,9 +84,9 @@ def test_double_star_graph(): net.set_train() _executor.compile(net, x, y, z, w, phase='train') strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/Cast-op0': [[8, 1]], - 'Default/network-Net/Cast-op1': [[1, 8]], - 'Default/network-Net/MatMul-op3': [[8, 1], [1, 1]], - 'Default/network-Net/MatMul-op2': [[1, 1], [1, 8]], - 'Default/network-Net/MatMul-op4': [[1, 8], [8, 1]]} + expected_strategies = {'Default/network-Net/Cast-op1': [[8, 1]], + 'Default/network-Net/Cast-op3': [[1, 8]], + 'Default/network-Net/MatMul-op2': [[8, 1], [1, 1]], + 'Default/network-Net/MatMul-op4': [[1, 1], [1, 8]], + 'Default/network-Net/MatMul-op0': [[1, 8], [8, 1]]} assert strategies == expected_strategies diff --git a/tests/ut/python/parallel/test_auto_parallel_for_loop.py b/tests/ut/python/parallel/test_auto_parallel_for_loop.py new file mode 100644 index 0000000000..90404aed13 --- /dev/null +++ b/tests/ut/python/parallel/test_auto_parallel_for_loop.py @@ -0,0 +1,129 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +import numpy as np + +import mindspore as ms +from mindspore import context, Tensor, Parameter +from mindspore.nn import Cell +import mindspore.nn as nn +from mindspore.ops import operations as P, functional as F +from mindspore.common.initializer import initializer +import mindspore.common.dtype as mstype +from mindspore.common.api import _executor +from tests.dataset_mock import MindData + + +class Dataset(MindData): + def __init__(self, predict, label, length=3): + super(Dataset, self).__init__(size=length) + self.predict = predict + self.label = label + self.index = 0 + self.length = length + + def __iter__(self): + return self + + def __next__(self): + if self.index >= self.length: + raise StopIteration + self.index += 1 + return self.predict, self.label + + def reset(self): + self.index = 0 + + +class LayerNorm(nn.Cell): + def __init__(self, normalized_shape, eps=1e-5): + super(LayerNorm, self).__init__() + self.gamma = Parameter(initializer('ones', normalized_shape), name="gamma") + self.beta = Parameter(initializer('zeros', normalized_shape), name="beta") + self.mean = P.ReduceMean(keep_dims=True) + self.eps = eps + self.sub = P.Sub() + self.add = P.TensorAdd() + self.mul = P.Mul() + self.div = P.RealDiv() + + def construct(self, x): + mean = self.mean(x, -1) + variance = self.mean(F.square(self.sub(x, mean))) + output = self.div(self.sub(x, mean), F.sqrt(self.add(variance, self.eps))) + rescaled_output = self.add(self.mul(output, self.gamma), self.beta) + return rescaled_output + + +class SubNet(Cell): + def __init__(self, index): + super().__init__() + self.matmul = P.MatMul() + self.relu = P.ReLU() + self.weight = Parameter(Tensor(np.ones([128, 128]), dtype=ms.float32), "matmul_w"+str(index)) + self.layernorm1 = LayerNorm((128,)).to_float(mstype.float32) + + def construct(self, x): + x = self.layernorm1(x) + out = self.matmul(x, self.weight) + out = self.relu(out) + return out + + +class Net(Cell): + def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): + super().__init__() + self.mul = P.Mul().shard(strategy1) + self.neg = P.Neg().shard(strategy2) + self.mul_weight = Parameter(mul_weight, "w1") + self.num_layers = num_layers + self.layers = nn.CellList() + for i in range(num_layers): + self.layers.append(SubNet(i)) + + def construct(self, x): + for i in range(self.num_layers): + x = self.layers[i](x) + out = self.mul(x, self.mul_weight) + out = self.neg(out) + return out + + +class Full(Cell): + def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): + super().__init__() + self.network = Net(mul_weight, num_layers, strategy1, strategy2) + self.relu = P.ReLU() + + def construct(self, x): + out = self.network(x) + out = self.relu(out) + return out + + +_x = Tensor(np.ones([512, 128]), dtype=ms.float32) +_b = Tensor(np.ones([32]), dtype=ms.int32) +_w1 = Tensor(np.ones([512, 128]), dtype=ms.float32) + + +def test_auto_parallel(): + context.set_context(save_graphs=True) + context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=16, global_rank=0) + net = Full(_w1, 3) + net.set_auto_parallel() + net.set_train() + _executor.compile(net, _x, phase='train') + num_ops = _executor._get_num_parallel_ops(net) + expected_num = 16 + assert num_ops == expected_num diff --git a/tests/ut/python/parallel/test_auto_parallel_for_loop_multi_subgraph.py b/tests/ut/python/parallel/test_auto_parallel_for_loop_multi_subgraph.py new file mode 100644 index 0000000000..920f49e592 --- /dev/null +++ b/tests/ut/python/parallel/test_auto_parallel_for_loop_multi_subgraph.py @@ -0,0 +1,136 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +import numpy as np + +import mindspore as ms +import mindspore.nn as nn +from mindspore import Tensor, Parameter, ParameterTuple +from mindspore import context +from mindspore.common.api import _executor +from mindspore.nn.optim import Adam, FTRL +from mindspore.ops import composite as C +from mindspore.ops import functional as F +from mindspore.ops import operations as P +from mindspore.parallel._cost_model_context import _set_multi_subgraphs +from mindspore.parallel._utils import _reset_op_id as reset_op_id + + +class SubNet(nn.Cell): + def __init__(self, index): + super().__init__() + self.matmul = P.BatchMatMul() + self.relu = P.ReLU() + self.weight = Parameter(Tensor(np.ones([8, 8, 8, 8]), dtype=ms.float32), "matmul_w"+str(index)) + + def construct(self, x): + out = self.matmul(x, self.weight) + out = self.relu(out) + return out + + +class Net(nn.Cell): + def __init__(self): + super(Net, self).__init__() + self.mul = P.Mul() + self.relu = P.ReLU() + self.wd = Parameter(Tensor(np.ones([8, 8, 8, 8]).astype(np.float32)), name="wide") + self.wt = Parameter(Tensor(np.ones([8, 8, 8, 8]).astype(np.float32)), name="l") + self.layers = nn.CellList() + for i in range(3): + self.layers.append(SubNet(i)) + + def construct(self, x): + for i in range(3): + x = self.layers[i](x) + out = self.mul(x, self.wd) + out = self.mul(out, self.wt) + out = self.relu(out) + return out + + +class NetWithLoss(nn.Cell): + def __init__(self, network): + super(NetWithLoss, self).__init__() + self.sum = P.ReduceSum() + self.mean = P.ReduceMean() + self.net = network + + def construct(self, x): + predict = self.net(x) + loss1 = self.sum(predict, -1) + loss2 = self.mean(predict, -1) + return loss1, loss2 + + +class IthOutputCell(nn.Cell): + def __init__(self, network, output_index): + super(IthOutputCell, self).__init__() + self.network = network + self.output_index = output_index + + def construct(self, x): + predict = self.network(x)[self.output_index] + return predict + + +class TrainStepWarp(nn.Cell): + def __init__(self, network, sens=1000.0): + super(TrainStepWarp, self).__init__() + self.network = network + self.network.set_train() + self.trainable_params = network.trainable_params() + weights_w = [] + weights_d = [] + for params in self.trainable_params: + weights_w.append(params) + weights_d.append(params) + self.weights_w = ParameterTuple(weights_w) + self.weights_d = ParameterTuple(weights_d) + self.optimizer_w = FTRL(learning_rate=1e-2, params=self.weights_w, l1=1e-8, + l2=1e-8, initial_accum=1.0) + self.optimizer_d = Adam(self.weights_d, learning_rate=3.5e-4, eps=1e-8, + loss_scale=sens) + self.hyper_map = C.HyperMap() + self.grad_w = C.GradOperation(get_by_list=True, sens_param=True) + self.grad_d = C.GradOperation(get_by_list=True, sens_param=True) + self.sens = sens + self.loss_net_w = IthOutputCell(network, output_index=0) + self.loss_net_d = IthOutputCell(network, output_index=1) + + def construct(self, x): + weights_w = self.weights_w + weights_d = self.weights_d + loss_w, loss_d = self.network(x) + sens_w = P.Fill()(P.DType()(loss_w), P.Shape()(loss_w), self.sens) + sens_d = P.Fill()(P.DType()(loss_d), P.Shape()(loss_d), self.sens) + grads_w = self.grad_w(self.loss_net_w, weights_w)(x, sens_w) + grads_d = self.grad_d(self.loss_net_d, weights_d)(x, sens_d) + return F.depend(loss_w, self.optimizer_w(grads_w)), F.depend(loss_d, self.optimizer_d(grads_d)) + + +def test_double_subgraphs(): + context.set_context(save_graphs=True) + context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=8, global_rank=0) + net = TrainStepWarp(NetWithLoss(Net())) + _set_multi_subgraphs() + net.set_auto_parallel() + + x = Tensor(np.ones([8, 8, 8, 8]), dtype=ms.float32) + reset_op_id() + net.set_train() + _executor.compile(net, x, phase='train') + num_ops = _executor._get_num_parallel_ops(net) + expected_num = 7 + assert expected_num == num_ops diff --git a/tests/ut/python/parallel/test_auto_parallel_for_loop_simplify.py b/tests/ut/python/parallel/test_auto_parallel_for_loop_simplify.py new file mode 100644 index 0000000000..fc63d07dc7 --- /dev/null +++ b/tests/ut/python/parallel/test_auto_parallel_for_loop_simplify.py @@ -0,0 +1,101 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +import numpy as np + +import mindspore as ms +from mindspore import context, Tensor, Parameter +from mindspore.nn import Cell, Momentum +from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits +import mindspore.nn as nn +from mindspore.ops import operations as P +from mindspore.train import Model +from tests.dataset_mock import MindData + + +class Dataset(MindData): + def __init__(self, predict, label, length=3): + super(Dataset, self).__init__(size=length) + self.predict = predict + self.label = label + self.index = 0 + self.length = length + + def __iter__(self): + return self + + def __next__(self): + if self.index >= self.length: + raise StopIteration + self.index += 1 + return self.predict, self.label + + def reset(self): + self.index = 0 + + +class SubNet(Cell): + def __init__(self, index): + super().__init__() + self.matmul = P.MatMul() + self.relu = P.ReLU() + self.weight = Parameter(Tensor(np.ones([128, 128]), dtype=ms.float32), "matmul_w"+str(index)) + + def construct(self, x): + out = self.matmul(x, self.weight) + out = self.relu(out) + return out + + +class Net(Cell): + def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): + super().__init__() + self.mul = P.Mul().shard(strategy1) + self.neg = P.Neg().shard(strategy2) + self.mul_weight = Parameter(mul_weight, "w1") + self.num_layers = num_layers + self.layers = nn.CellList() + for i in range(num_layers): + self.layers.append(SubNet(i)) + + def construct(self, x): + for i in range(self.num_layers): + x = self.layers[i](x) + out = self.mul(x, self.mul_weight) + out = self.neg(out) + return out + + +_x = Tensor(np.ones([32, 128]), dtype=ms.float32) +_b = Tensor(np.ones([32]), dtype=ms.int32) +_w1 = Tensor(np.ones([512, 128]), dtype=ms.float32) + + +def compile_net(net): + context.set_context(save_graphs=True) + learning_rate = 0.1 + momentum = 0.9 + epoch_size = 2 + dataset = Dataset(_x, _b) + loss = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') + opt = Momentum(net.trainable_params(), learning_rate, momentum) + model = Model(net, loss, optimizer=opt) + model.train(epoch_size, dataset, dataset_sink_mode=False) + context.reset_auto_parallel_context() + + +def test_auto_parallel(): + context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=16, global_rank=0) + net = Net(_w1, 3) + compile_net(net) diff --git a/tests/ut/python/parallel/test_auto_parallel_transpose.py b/tests/ut/python/parallel/test_auto_parallel_transpose.py index d3d44e8f15..15cf788625 100644 --- a/tests/ut/python/parallel/test_auto_parallel_transpose.py +++ b/tests/ut/python/parallel/test_auto_parallel_transpose.py @@ -79,8 +79,8 @@ def test_two_matmul_transpose(): net.set_train() _executor.compile(net, x, y, b, phase='train') strategies = _executor._get_shard_strategy(net) - expected_strategies = {'Default/network-Net/Transpose-op3': [[1, 16]], - 'Default/network-Net/Transpose-op2': [[16, 1]], - 'Default/network-Net/MatMul-op0': [[16, 1], [1, 1]], - 'Default/network-Net/MatMul-op1': [[16, 1], [1, 1]]} + expected_strategies = {'Default/network-Net/Transpose-op0': [[1, 16]], + 'Default/network-Net/Transpose-op1': [[16, 1]], + 'Default/network-Net/MatMul-op2': [[16, 1], [1, 1]], + 'Default/network-Net/MatMul-op3': [[16, 1], [1, 1]]} assert strategies == expected_strategies diff --git a/tests/ut/python/parallel/test_auto_parallel_two_matmul.py b/tests/ut/python/parallel/test_auto_parallel_two_matmul.py index d20b3b39bd..4ef067aed2 100644 --- a/tests/ut/python/parallel/test_auto_parallel_two_matmul.py +++ b/tests/ut/python/parallel/test_auto_parallel_two_matmul.py @@ -22,6 +22,7 @@ from mindspore.common.api import _executor from mindspore.ops import composite as C from mindspore.ops import operations as P from mindspore.parallel import _cost_model_context as cost_model_context +from mindspore.parallel._cost_model_context import _set_algo_single_loop, _get_algo_single_loop from mindspore.parallel import set_algo_parameters, get_algo_parameters, reset_algo_parameters from mindspore.parallel._utils import _reset_op_id as reset_op_id from tests.ut.python.ops.test_math_ops import VirtualLoss @@ -120,6 +121,14 @@ def test_two_matmul(): algo_epsilon = get_algo_parameters("algo_approxi_epsilon") assert algo_epsilon == 0.001 + expecte_single_loop = True + signle_loop = _get_algo_single_loop() + assert expecte_single_loop == signle_loop + expecte_single_loop = False + _set_algo_single_loop(expecte_single_loop) + signle_loop = _get_algo_single_loop() + assert expecte_single_loop == signle_loop + reset_algo_parameters() para_slice_align_enable = get_algo_parameters("tensor_slice_align_enable") assert not para_slice_align_enable