From 45b19cbc9a2afe834f34d6619a7e8edcaa18623a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=94=E9=BE=99=E9=A3=9E=20Qiao=20Longfei?= Date: Thu, 14 Feb 2019 09:10:02 +0800 Subject: [PATCH 1/6] Revert "Revert "cpu reduce mode did not need to broadcast params test=develop"" --- paddle/fluid/framework/details/build_strategy.cc | 3 +++ .../framework/details/multi_devices_graph_pass.cc | 6 ++---- .../framework/details/multi_devices_graph_pass.h | 1 - python/paddle/fluid/compiler.py | 11 +++++++++++ 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index f8030c53f7..010c8dee6c 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -133,12 +133,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass; if (strategy_.is_distribution_) { + VLOG(3) << "multi device dist train mode"; multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { + VLOG(3) << "multi device allreduce mode"; multi_devices_pass = AppendPass("allreduce_mode_multi_devices_pass").get(); } else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { + VLOG(3) << "multi device reduce mode"; multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); } else { PADDLE_THROW("Unknown reduce strategy."); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 75f922d2cc..24977aabda 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -731,7 +731,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } } insert_op = true; - need_broadcast_var_ = true; } else if (OpHaveRole(*node, OpRole::kDist)) { int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { @@ -925,9 +924,8 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, } void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { - if (need_broadcast_var_ || - (UseGPU() && - strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce)) { + // only GPU reduce mode need to broadcast parameters to each device. + if (UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { if (strategy_.fuse_broadcast_op_) { CreateFusedBroadcastOp(result, bcast_var_name_set_); } else { diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 6d4386538e..21f85dc828 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -174,7 +174,6 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder { int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; mutable std::vector> bcast_var_name_set_; - mutable bool need_broadcast_var_{false}; }; std::unordered_set &MultiDevSSAGraphBuilder(); diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index ef02429428..2b69fd89a2 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -19,6 +19,7 @@ import sys from .. import compat as cpt from . import core +from . import framework __all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy'] @@ -34,6 +35,15 @@ def _place_obj(place): return p +def _is_pserver_mode(main_program): + main = main_program if main_program \ + else framework.default_main_program() + for op in main.global_block().ops: + if op.type in ["send", "recv"]: + return True + return False + + class CompiledProgram(object): """ Compiles a Program for execution. @@ -110,6 +120,7 @@ class CompiledProgram(object): self._exec_strategy = ExecutionStrategy() if self._build_strategy is None: self._build_strategy = BuildStrategy() + self._build_strategy.is_distribution = _is_pserver_mode(self._program) return self def with_inference_optimize(self, config): From 62f1248ff5bf7aafe57bcc4be0068529330604cb Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 21 Feb 2019 13:51:53 +0800 Subject: [PATCH 2/6] fix use gpu test=develop --- .../details/multi_devices_graph_pass.cc | 20 +++++++++++-------- .../details/multi_devices_graph_pass.h | 1 + 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 24977aabda..e0246740dd 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -731,6 +731,7 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, } } insert_op = true; + need_broadcast_var_ = true; } else if (OpHaveRole(*node, OpRole::kDist)) { int op_dev_id = CreateDistTrainOp(result, node); if (node->Op()->Type() == "concat") { @@ -925,14 +926,17 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { // only GPU reduce mode need to broadcast parameters to each device. - if (UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { - if (strategy_.fuse_broadcast_op_) { - CreateFusedBroadcastOp(result, bcast_var_name_set_); - } else { - for (size_t dev_id = 0; dev_id < bcast_var_name_set_.size(); ++dev_id) { - auto &to_bcast_set = bcast_var_name_set_[dev_id]; - for (auto &bcast_name : to_bcast_set) { - CreateBroadcastOp(result, bcast_name, dev_id); + if (UseGPU()) { + if (need_broadcast_var_ || + strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { + if (strategy_.fuse_broadcast_op_) { + CreateFusedBroadcastOp(result, bcast_var_name_set_); + } else { + for (size_t dev_id = 0; dev_id < bcast_var_name_set_.size(); ++dev_id) { + auto &to_bcast_set = bcast_var_name_set_[dev_id]; + for (auto &bcast_name : to_bcast_set) { + CreateBroadcastOp(result, bcast_name, dev_id); + } } } } diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 21f85dc828..6d4386538e 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -174,6 +174,7 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder { int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; mutable std::vector> bcast_var_name_set_; + mutable bool need_broadcast_var_{false}; }; std::unordered_set &MultiDevSSAGraphBuilder(); From 3bccc1e6e275412f30baf5a0c5698eb307f90252 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 22 Feb 2019 10:39:42 +0800 Subject: [PATCH 3/6] optimize broadcast logic test=develop --- .../details/multi_devices_graph_pass.cc | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index e0246740dd..c0fb3ee833 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -925,18 +925,20 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, } void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { - // only GPU reduce mode need to broadcast parameters to each device. - if (UseGPU()) { - if (need_broadcast_var_ || + // broad cast received parameters when training in parameter server mode. + if (need_broadcast_var_) { + // cpu reduce mode did not need to broadcast received parameters. + if (!UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { - if (strategy_.fuse_broadcast_op_) { - CreateFusedBroadcastOp(result, bcast_var_name_set_); - } else { - for (size_t dev_id = 0; dev_id < bcast_var_name_set_.size(); ++dev_id) { - auto &to_bcast_set = bcast_var_name_set_[dev_id]; - for (auto &bcast_name : to_bcast_set) { - CreateBroadcastOp(result, bcast_name, dev_id); - } + return; + } + if (strategy_.fuse_broadcast_op_) { + CreateFusedBroadcastOp(result, bcast_var_name_set_); + } else { + for (size_t dev_id = 0; dev_id < bcast_var_name_set_.size(); ++dev_id) { + auto &to_bcast_set = bcast_var_name_set_[dev_id]; + for (auto &bcast_name : to_bcast_set) { + CreateBroadcastOp(result, bcast_name, dev_id); } } } From 4233d0a820f2f889fa12ecb1e0739d4ae285295b Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 22 Feb 2019 13:11:54 +0800 Subject: [PATCH 4/6] add more comment test=develop --- .../framework/details/multi_devices_graph_pass.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index c0fb3ee833..23b9890e9b 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -927,7 +927,16 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { // broad cast received parameters when training in parameter server mode. if (need_broadcast_var_) { - // cpu reduce mode did not need to broadcast received parameters. + // There are 4 conditions: + // 1. GPU && Reduce: Reduce gradient then broadcast gradient to other GPUS. + // Need to broadcast received parameters to other GPU. + // 2. GPU && AllReduce: AllReduce all graident to each GPU. Need to + // broadcast received parameters to other GPU. + // 3. CPU && AllReduce: AllReduce all gradient to each thread. Need to + // broadcast received parameters to other scope. + // 4. CPU && Reduce: because all parameters share the same memory, did not + // broadcast + // received parameters. if (!UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { return; From 3f9263f67eeab08126fde5ca143dcb3ddd2da71d Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Fri, 22 Feb 2019 13:20:46 +0800 Subject: [PATCH 5/6] optimize style test=develop --- paddle/fluid/framework/details/multi_devices_graph_pass.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 23b9890e9b..180d169815 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -935,8 +935,7 @@ void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { // 3. CPU && AllReduce: AllReduce all gradient to each thread. Need to // broadcast received parameters to other scope. // 4. CPU && Reduce: because all parameters share the same memory, did not - // broadcast - // received parameters. + // broadcast received parameters. if (!UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { return; From 2b7931d5c933efd91dfa3f25073a997dee3b00b7 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Sat, 23 Feb 2019 09:52:13 +0800 Subject: [PATCH 6/6] refine code test=develop --- paddle/fluid/framework/details/build_strategy.cc | 6 +++--- python/paddle/fluid/compiler.py | 12 ++---------- python/paddle/fluid/framework.py | 9 +++++++++ python/paddle/fluid/parallel_executor.py | 11 +---------- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 010c8dee6c..a6359402f8 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -133,15 +133,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { void AppendMultiDevPass(const BuildStrategy &strategy) { ir::Pass *multi_devices_pass; if (strategy_.is_distribution_) { - VLOG(3) << "multi device dist train mode"; + VLOG(3) << "multi device parameter server mode"; multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); } else { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { - VLOG(3) << "multi device allreduce mode"; + VLOG(3) << "multi devices collective mode with allreduce"; multi_devices_pass = AppendPass("allreduce_mode_multi_devices_pass").get(); } else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { - VLOG(3) << "multi device reduce mode"; + VLOG(3) << "multi deivces collective mode with reduce"; multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); } else { PADDLE_THROW("Unknown reduce strategy."); diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 2b69fd89a2..d253f0cca8 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -35,15 +35,6 @@ def _place_obj(place): return p -def _is_pserver_mode(main_program): - main = main_program if main_program \ - else framework.default_main_program() - for op in main.global_block().ops: - if op.type in ["send", "recv"]: - return True - return False - - class CompiledProgram(object): """ Compiles a Program for execution. @@ -120,7 +111,8 @@ class CompiledProgram(object): self._exec_strategy = ExecutionStrategy() if self._build_strategy is None: self._build_strategy = BuildStrategy() - self._build_strategy.is_distribution = _is_pserver_mode(self._program) + self._build_strategy.is_distribution = framework.is_pserver_mode( + self._program) return self def with_inference_optimize(self, config): diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 832c97c7de..162e94ec59 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -85,6 +85,15 @@ def _current_expected_place(): return _imperative_current_expected_place_ +def is_pserver_mode(main_program): + main = main_program if main_program \ + else default_main_program() + for op in main.global_block().ops: + if op.type in ["send", "recv"]: + return True + return False + + class NameScope(object): def __init__(self, name="", parent=None): self._children = dict() diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 22212ae9a2..9bff3599a0 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -29,15 +29,6 @@ ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy BuildStrategy = core.ParallelExecutor.BuildStrategy -def _is_pserver_mode(main_program): - main = main_program if main_program \ - else framework.default_main_program() - for op in main.global_block().ops: - if op.type in ["send", "recv"]: - return True - return False - - class ParallelExecutor(object): """ ParallelExecutor is designed for data parallelism, which focuses on distributing @@ -140,7 +131,7 @@ class ParallelExecutor(object): # FIXME(zcd): is_distribution_ is a temporary field, because in pserver mode, # num_trainers is 1, so the current fields of build_strategy doesn't tell if # it's distributed model. - build_strategy.is_distribution = _is_pserver_mode( + build_strategy.is_distribution = framework.is_pserver_mode( main_program) or num_trainers > 1 # step4: get main_program, scope, local_scopes