From b54d1ba9689e11dd2180cf25f8fa5aa6953a3f36 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 20 Jun 2018 10:04:22 +0800 Subject: [PATCH 01/28] fix pserver sub-blocks --- paddle/fluid/operators/listen_and_serv_op.cc | 10 +++- .../fluid/transpiler/distribute_transpiler.py | 49 ++++++++++++------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 57c2ce4577..c0952133c3 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,13 +101,16 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); + auto skip_sub_blks = Attr>("skip_sub_blks"); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); std::vector optimize_block_id_list; for (int blkid = 1; blkid < num_blocks; ++blkid) { if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), - blkid) == prefetch_block_id_list.end()) { + blkid) == prefetch_block_id_list.end() && + std::find(skip_sub_blks.begin(), skip_sub_blks.end(), blkid) == + skip_sub_blks.end()) { optimize_block_id_list.push_back(blkid); } } @@ -344,6 +347,11 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); + AddAttr>("skip_sub_blks", + "do not parallel execute the specify sub blocks, " + "it's used for the op which has" + "condition blocks") + .SetDefault({}); } }; diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index d62a184e97..a0f9334c00 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -250,20 +250,15 @@ class DistributeTranspiler: split_method=RoundRobin, sync_mode=True): """ - :param trainer_id: one unique id for each trainer in a job. - :type trainer_id: int - :param program: program to transpile, default is default_main_program - :type program: Program - :param pservers: parameter server endpoints like "m1:6174,m2:6174" - :type pservers: string - :param trainers: total number of workers/trainers in the job - :type trainers: int - :param split_method: A function to determin how to split variables - to different servers equally. - :type split_method: function - :type sync_mode: boolean default True - :param sync_mode: if sync_mode is set True, it means that dist transpiler - will transpile the program into sync_mode pserver and trainer program. + Args: + trainer_id(int): one unique id for each trainer in a job. + program(Program): program to transpile, default is default_main_program + pservers(string): parameter server endpoints like "m1:6174,m2:6174" + trainers(int): total number of workers/trainers in the job + split_method(PSDispatcher): A function to determin how to split variables + to different servers equally. + sync_mode(boolean): if sync_mode is set True, it means that dist transpiler + will transpile the program into sync_mode pserver and trainer program. """ assert (split_method.__bases__[0] == PSDispatcher) if program is None: @@ -403,6 +398,11 @@ class DistributeTranspiler: NOTE: assume blocks of the same variable is not distributed on the same pserver, only change param/grad varnames for trainers to fetch. + Args: + endpoint(string): the endpoint for the current pserver instance. + + Returns(Program): the pserver program + """ # step1 pserver_program = Program() @@ -479,9 +479,9 @@ class DistributeTranspiler: return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block): + def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): if not op.has_attr('sub_block'): - return + return -1 origin_block_desc = op.attr('sub_block') origin_block = self.origin_program.block(origin_block_desc.id) @@ -489,6 +489,7 @@ class DistributeTranspiler: # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) + skip_sub_blks(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -498,20 +499,24 @@ class DistributeTranspiler: for op in origin_block.ops: self._clone_lr_op(program, new_sub_block, op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block) + __clone_lr_op_sub_block__(op, program, new_sub_block, + skip_sub_blks) # reset the block of op op.set_attr('sub_block', new_sub_block) + return new_sub_block.idx # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() + skip_sub_blks = [] if len(lr_ops) > 0: lr_decay_block = pserver_program.create_block( pserver_program.num_blocks - 1) for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) + __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block, + skip_sub_blks) # append op to the current block grad_to_block_id = [] @@ -561,7 +566,8 @@ class DistributeTranspiler: "endpoint": endpoint, "Fanin": self.trainer_num, "sync_mode": self.sync_mode, - "grad_to_block_id": grad_to_block_id + "grad_to_block_id": grad_to_block_id, + "skip_sub_blks": skip_sub_blks } if len(prefetch_var_name_to_block_id) > 0: attrs['prefetch_var_name_to_block_id'] \ @@ -582,6 +588,11 @@ class DistributeTranspiler: Get startup program for current parameter server. Modify operator input variables if there are variables that were split to several blocks. + Args: + endpoint(string): the endpoint for the current pserver instance. + pserver_program(Program): the program for pserver to execute. + + Returns(Program): the startup program for pserver """ s_prog = Program() orig_s_prog = default_startup_program() From 5c7d6a5594a48ed688523eb7256f9743a632d23d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 20 Jun 2018 10:08:48 +0800 Subject: [PATCH 02/28] update --- python/paddle/fluid/transpiler/distribute_transpiler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index a0f9334c00..e94abfc006 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -489,7 +489,7 @@ class DistributeTranspiler: # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) - skip_sub_blks(new_sub_block.idx) + skip_sub_blks.append(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -504,7 +504,6 @@ class DistributeTranspiler: # reset the block of op op.set_attr('sub_block', new_sub_block) - return new_sub_block.idx # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() From cbc82d0f503eafaa8465d8889eefa6d81c1e0907 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Wed, 20 Jun 2018 14:27:49 +0800 Subject: [PATCH 03/28] add FAQ --- doc/v2/faq/build_and_install/index_cn.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/doc/v2/faq/build_and_install/index_cn.rst b/doc/v2/faq/build_and_install/index_cn.rst index f292684fb5..33490662e0 100644 --- a/doc/v2/faq/build_and_install/index_cn.rst +++ b/doc/v2/faq/build_and_install/index_cn.rst @@ -213,3 +213,9 @@ virtualenv本身也是Python的一个包,可以用pip进行安装: 保存并关闭文件。 这样,每次打开终端时就会自动启动名为‘paddle’的Python环境了。 + +10. 通过pip安装的PaddlePaddle在`import paddle.fluid`报找不到libmkldnn.so.0 +------------------------------------------------------------------------------------------ +出现这个问题的原因是在导入`paddle.fluid`时需要加载libmkldnn.so,但是系统没有找到该文件。一般通过pip +安装PaddlePaddle时会将libmkldnn.so.0拷贝到`/usr/local/lib`路径下,所以解决办法是将该路径加到 +`LD_LIBRARY_PATH`环境变量下,即:`LD_LIBRARY_PATH=(ibmklml_intel.so.0所在的路径):$LD_LIBRARY_PATH`。 \ No newline at end of file From b5daeac86d79400302ab8acd658716f783012346 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Wed, 20 Jun 2018 15:32:11 +0800 Subject: [PATCH 04/28] follow comment --- doc/v2/faq/build_and_install/index_cn.rst | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/doc/v2/faq/build_and_install/index_cn.rst b/doc/v2/faq/build_and_install/index_cn.rst index 33490662e0..0d64477728 100644 --- a/doc/v2/faq/build_and_install/index_cn.rst +++ b/doc/v2/faq/build_and_install/index_cn.rst @@ -214,8 +214,11 @@ virtualenv本身也是Python的一个包,可以用pip进行安装: 这样,每次打开终端时就会自动启动名为‘paddle’的Python环境了。 -10. 通过pip安装的PaddlePaddle在`import paddle.fluid`报找不到libmkldnn.so.0 +10. 通过pip安装的PaddlePaddle在 :code:`import paddle.fluid` 报找不到 :code:`libmkldnn.so` 或 :code:`libmklml_intel.so` ------------------------------------------------------------------------------------------ -出现这个问题的原因是在导入`paddle.fluid`时需要加载libmkldnn.so,但是系统没有找到该文件。一般通过pip -安装PaddlePaddle时会将libmkldnn.so.0拷贝到`/usr/local/lib`路径下,所以解决办法是将该路径加到 -`LD_LIBRARY_PATH`环境变量下,即:`LD_LIBRARY_PATH=(ibmklml_intel.so.0所在的路径):$LD_LIBRARY_PATH`。 \ No newline at end of file +出现这种问题的原因是在导入 :code:`paddle.fluid` 时需要加载 :code:`libmkldnn.so` 和 :code:`libmklml_intel.so`, +但是系统没有找到该文件。一般通过pip安装PaddlePaddle时会将 :code:`libmkldnn.so` 和 :code:`libmklml_intel.so` +拷贝到 :code:`/usr/local/lib` 路径下,所以解决办法是将该路径加到 :code:`LD_LIBRARY_PATH` 环境变量下, +即: :code:`export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH` 。 + +**注意**:如果是在虚拟环境中安装PaddlePaddle, :code:`libmkldnn.so` 和 :code:`libmklml_intel.so` 可能不在 :code:`/usr/local/lib` 路径下。 \ No newline at end of file From 28a86aebc3a0874746a8e969e645cdc0949c5372 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Thu, 21 Jun 2018 10:29:22 +0800 Subject: [PATCH 05/28] Fix Parallel Exe(VarHandel's version) --- .../fluid/framework/details/multi_devices_graph_builder.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 78356cb1be..1fb6258348 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -345,7 +345,7 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result, auto &prev_grad = vars.back(); op_handle->AddInput(prev_grad.get()); - auto var = new VarHandle(vars.size() - 1, i, og, p); + auto var = new VarHandle(vars.size(), i, og, p); vars.emplace_back(var); op_handle->AddOutput(var); } @@ -442,8 +442,7 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result, op_handle->AddInput(prev_grad.get()); } auto &vars = result->vars_[dst_dev_id][og]; - auto var = - new VarHandle(vars.size() - 1, dst_dev_id, og, places_[dst_dev_id]); + auto var = new VarHandle(vars.size(), dst_dev_id, og, places_[dst_dev_id]); vars.emplace_back(var); op_handle->AddOutput(var); return var; From 13de72388dc5b7e1f7ea184e9606b3538b2f7f3d Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Thu, 21 Jun 2018 11:19:57 +0800 Subject: [PATCH 06/28] Fix broadcast --- paddle/fluid/framework/details/broadcast_op_handle.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index d5ca061944..1d9f1bd6e4 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -73,6 +73,9 @@ void BroadcastOpHandle::RunImpl() { int root_id = boost::get(in_tensor.place()).device; std::vector> broadcast_calls; + int type = platform::ToNCCLDataType(in_tensor.type()); + size_t numel = static_cast(in_tensor.numel()); + for (auto out_var_handle : out_var_handles) { Variable *out_var = var_scopes.at(out_var_handle->scope_idx_) ->FindVar(out_var_handle->name_); @@ -87,13 +90,11 @@ void BroadcastOpHandle::RunImpl() { send_recv_buffer = const_cast(in_tensor.data()); out_handle = out_var_handle; } else { - send_recv_buffer = - VariableVisitor::GetMutableTensor(out_var).mutable_data( - out_var_handle->place_); + send_recv_buffer = VariableVisitor::GetMutableTensor(out_var) + .Resize(in_tensor.dims()) + .mutable_data(out_var_handle->place_); } - int type = platform::ToNCCLDataType(in_tensor.type()); - size_t numel = static_cast(in_tensor.numel()); broadcast_calls.emplace_back( [send_recv_buffer, numel, type, root_id, &nccl_ctx] { PADDLE_ENFORCE(platform::dynload::ncclBcast( From 32bfebfe38646480880f4c8d627155df6d1b778a Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Thu, 21 Jun 2018 12:21:40 +0800 Subject: [PATCH 07/28] disable the LODTensor warning for now --- paddle/fluid/pybind/pybind.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index dc02c6632e..5a45e431df 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -167,9 +167,6 @@ PYBIND11_PLUGIN(core) { .def("set_lod", [](LoDTensor &self, const std::vector> &lod) { // the input lod is offset-based level-of-detail info - LOG(WARNING) - << "set_lod is deprecated and will be removed by 9.2018, " - "please switch to set_recursive_sequence_lengths."; LoD new_lod; new_lod.reserve(lod.size()); std::copy(lod.begin(), lod.end(), std::back_inserter(new_lod)); @@ -196,8 +193,6 @@ PYBIND11_PLUGIN(core) { .def("lod", [](LoDTensor &self) -> std::vector> { // output the offset-based lod info - LOG(WARNING) << "lod is deprecated and will be removed by 9.2018, " - "please switch to recursive_sequence_lengths."; LoD lod = self.lod(); std::vector> new_lod; new_lod.reserve(lod.size()); From c99fca5f90ede6297eaf5b4c9617ad21df8b445d Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Thu, 21 Jun 2018 12:25:31 +0800 Subject: [PATCH 08/28] Add No Mutex --- .../framework/details/broadcast_op_handle.cc | 57 ++++++++++++++----- .../fluid/framework/details/op_handle_base.cc | 23 ++++++++ .../fluid/framework/details/op_handle_base.h | 4 ++ .../framework/details/reduce_op_handle.cc | 4 +- paddle/fluid/platform/device_context.h | 8 +++ 5 files changed, 80 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index 1d9f1bd6e4..b0bf641d9d 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -103,23 +103,50 @@ void BroadcastOpHandle::RunImpl() { }); } - this->RunAndRecordEvent([&] { - { - platform::NCCLGroupGuard guard; - for (auto &call : broadcast_calls) { - call(); + // FIXME(zcd): a temporary fix for some language model that has sparse + // parameter. + bool use_mutex = true; + if (in_var->IsType()) { + use_mutex = false; + } + if (use_mutex) { + this->RunAndRecordEvent([&] { + { + platform::NCCLGroupGuard guard; + for (auto &call : broadcast_calls) { + call(); + } } - } - if (!out_handle->IsTheSameVar(*in_var_handle)) { - auto out_var = var_scopes.at(in_var_handle->scope_idx_) - ->FindVar(out_var_handles[0]->name_); - paddle::framework::TensorCopy( - in_tensor, in_var_handle->place_, - *(dev_ctxes_.at(in_var_handle->place_)), - &VariableVisitor::GetMutableTensor(out_var)); - } - }); + if (!out_handle->IsTheSameVar(*in_var_handle)) { + auto out_var = var_scopes.at(in_var_handle->scope_idx_) + ->FindVar(out_var_handles[0]->name_); + paddle::framework::TensorCopy( + in_tensor, in_var_handle->place_, + *(dev_ctxes_.at(in_var_handle->place_)), + &VariableVisitor::GetMutableTensor(out_var)); + } + }); + } else { + this->RunAndRecordEventNoMutex([&] { + { + platform::NCCLGroupGuard guard; + for (auto &call : broadcast_calls) { + call(); + } + } + + if (!out_handle->IsTheSameVar(*in_var_handle)) { + auto out_var = var_scopes.at(in_var_handle->scope_idx_) + ->FindVar(out_var_handles[0]->name_); + paddle::framework::TensorCopy( + in_tensor, in_var_handle->place_, + *(dev_ctxes_.at(in_var_handle->place_)), + &VariableVisitor::GetMutableTensor(out_var)); + } + }); + } + #else PADDLE_THROW("CUDA is not enabled."); #endif diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index f79565fe71..a40a881508 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -139,6 +139,29 @@ void OpHandleBase::RunAndRecordEvent(const std::function &callback) { #endif } +void OpHandleBase::RunAndRecordEventNoMutex( + const std::function &callback) { +#ifdef PADDLE_WITH_CUDA + if (!events_.empty()) { // Use event + std::function method = callback; + + for (auto &p : dev_ctxes_) { + method = [method, p, this]() { + static_cast(p.second) + ->RecordEventNoMutex( + events_.at(boost::get(p.first).device), + method); + }; + } + method(); + } else { +#endif + callback(); +#ifdef PADDLE_WITH_CUDA + } +#endif +} + void OpHandleBase::RunAndRecordEvent(platform::Place p, const std::function &callback) { #ifdef PADDLE_WITH_CUDA diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index fbd90a3296..775be0233a 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -85,6 +85,10 @@ class OpHandleBase { protected: void RunAndRecordEvent(const std::function &callback); + // FIXME(zcd): A temporary fix for some language model that has sparse + // parameter. + void RunAndRecordEventNoMutex(const std::function &callback); + void RunAndRecordEvent(platform::Place p, const std::function &callback); diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc index 7160e346da..9a626c890f 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.cc +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -80,7 +80,9 @@ void ReduceOpHandle::RunImpl() { } if (pre_in_var->IsType()) { - this->RunAndRecordEvent([&] { + // FIXME(zcd): A temporary fix for some language model that has sparse + // parameter. + this->RunAndRecordEventNoMutex([&] { std::vector in_selected_rows = GetInputValues(in_var_handles, var_scopes); GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p, diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index 292ffef1ae..d37e5ee578 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -106,6 +106,14 @@ class CUDADeviceContext : public DeviceContext { PADDLE_ENFORCE(cudaEventRecord(ev, stream_)); } + // FIXME(zcd): A temporary fix for some language model that has sparse + // parameter. + template + void RecordEventNoMutex(cudaEvent_t ev, Callback callback) { + callback(); + PADDLE_ENFORCE(cudaEventRecord(ev, stream_)); + } + private: CUDAPlace place_; From 231762a6c4016763f2ca7d8e829491c061810bd5 Mon Sep 17 00:00:00 2001 From: ktlichkid Date: Thu, 21 Jun 2018 06:17:21 +0000 Subject: [PATCH 09/28] Fix log and relu layer --- python/paddle/fluid/layers/nn.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 2979ff3057..9dc2e4799f 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -4920,16 +4920,16 @@ def random_crop(x, shape, seed=None): return out -def log(x): +def log(input): """ Calculates the natural log of the given input tensor, element-wise. .. math:: - Out = \\ln(x) + Out = \\ln(input) Args: - x (Variable): Input tensor. + input (Variable): Input tensor. Returns: Variable: The natural log of the input tensor computed element-wise. @@ -4938,7 +4938,7 @@ def log(x): .. code-block:: python - output = fluid.layers.log(x) + output = fluid.layers.log(input) """ helper = LayerHelper('log', **locals()) dtype = helper.input_dtype() @@ -4947,18 +4947,18 @@ def log(x): return out -def relu(x): +def relu(input): """ Relu takes one input data (Tensor) and produces one output data (Tensor) - where the rectified linear function, y = max(0, x), is applied to + where the rectified linear function, y = max(0, input), is applied to the tensor elementwise. .. math:: - Out = \\max(0, x) + Out = \\max(0, input) Args: - x (Variable): The input tensor. + input (Variable): The input tensor. Returns: Variable: The output tensor with the same shape as input. @@ -4967,7 +4967,7 @@ def relu(x): .. code-block:: python - output = fluid.layers.relu(x) + output = fluid.layers.relu(input) """ helper = LayerHelper('relu', **locals()) dtype = helper.input_dtype() From 0970bd9edc7f2454927ee088a52bddc2cac9d1d0 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 21 Jun 2018 17:32:35 +0800 Subject: [PATCH 10/28] use optimize blocks attr to record optimize block id --- paddle/fluid/operators/listen_and_serv_op.cc | 29 +++++++------------ paddle/fluid/operators/listen_and_serv_op.h | 2 +- .../fluid/transpiler/distribute_transpiler.py | 9 ++++-- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index c0952133c3..2fbd62505d 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -106,13 +106,8 @@ void ListenAndServOp::RunSyncLoop( "server program should have at least 2 blocks"); std::vector optimize_block_id_list; - for (int blkid = 1; blkid < num_blocks; ++blkid) { - if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), - blkid) == prefetch_block_id_list.end() && - std::find(skip_sub_blks.begin(), skip_sub_blks.end(), blkid) == - skip_sub_blks.end()) { - optimize_block_id_list.push_back(blkid); - } + for (auto *block : optimize_blocks) { + optimize_block_id_list.push_back(block->ID()); } auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. @@ -137,9 +132,9 @@ void ListenAndServOp::RunSyncLoop( // and this will still work. // The optimize blocks which have the same parent ID would run parallel // TODO(Yancey1989): need to use ParallelExecutor for future - int32_t last_parent_blkid = program->Block(1).Parent(); + int32_t last_parent_blkid = optimize_blocks[0]->Parent(); std::vector parallel_blkids; - parallel_blkids.push_back(1); + parallel_blkids.push_back(optimize_blocks[0]->ID()); double ts = GetTimestamp(); for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { // skip the first optimize block because it is already in the @@ -262,8 +257,11 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->RegisterRPC(detail::kRequestPrefetch, request_prefetch_handler_.get()); - auto *optimize_block = Attr(kOptimizeBlock); - auto *program = optimize_block->Program(); + auto optimize_blocks = + Attr>(kOptimizeBlocks); + PADDLE_ENFORCE(optimize_blocks.size() > 1, + "optimize blocks should be 1 at least on the pserver side."); + auto *program = optimize_block[0]->Program(); framework::Executor executor(dev_place); // prepare for prefetch @@ -340,18 +338,13 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { "a map from grad name to it's optimize block id") .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); - AddAttr(kOptimizeBlock, - "BlockID to run on server side."); + AddAttr(kOptimizeBlocks, + "Optimize blocks to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); - AddAttr>("skip_sub_blks", - "do not parallel execute the specify sub blocks, " - "it's used for the op which has" - "condition blocks") - .SetDefault({}); } }; diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 46c3a19e20..55da61e34a 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -30,7 +30,7 @@ limitations under the License. */ namespace paddle { namespace operators { -constexpr char kOptimizeBlock[] = "OptimizeBlock"; +constexpr char kOptimizeBlocks[] = "optimize_blocks"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; void RunServer(std::shared_ptr service); diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index dc0ec6b8c6..cf59808b3d 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -424,10 +424,12 @@ class DistributeTranspiler(object): # append lr decay ops to the child block if exists lr_ops = self._get_lr_ops() - skip_sub_blks = [] + # record optimize blocks and we can run them on pserver parallel + optimize_blocks = [] if len(lr_ops) > 0: lr_decay_block = pserver_program.create_block( pserver_program.num_blocks - 1) + optimize_blocks.append(lr_decay_block) for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op @@ -439,6 +441,7 @@ class DistributeTranspiler(object): pre_block_idx = pserver_program.num_blocks - 1 for idx, opt_op in enumerate(opt_op_on_pserver): per_opt_block = pserver_program.create_block(pre_block_idx) + optimize_blocks.append(per_opt_block) # append grad merging ops before clip and weight decay for _, op in enumerate(self.optimize_ops): # find the origin @GRAD var before clipping @@ -457,6 +460,7 @@ class DistributeTranspiler(object): if global_ops: opt_state_block = pserver_program.create_block( pserver_program.num_blocks - 1) + optimize_blocks.append(opt_state_block) for glb_op in global_ops: __append_optimize_op__(glb_op, opt_state_block, grad_to_block_id, None) @@ -478,12 +482,11 @@ class DistributeTranspiler(object): assert len(prefetch_var_name_to_block_id) == 0 attrs = { - "OptimizeBlock": pserver_program.block(1), + "optimize_blocks": optimize_blocks, "endpoint": endpoint, "Fanin": self.trainer_num, "sync_mode": self.sync_mode, "grad_to_block_id": grad_to_block_id, - "skip_sub_blks": skip_sub_blks } if len(prefetch_var_name_to_block_id) > 0: attrs['prefetch_var_name_to_block_id'] \ From 6f6642ed655cc234ef9751889328142a32a2a85a Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Thu, 21 Jun 2018 09:01:18 +0000 Subject: [PATCH 11/28] Fix relu and log op. --- python/paddle/fluid/layers/nn.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 787054a91c..097fbe982f 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -4938,9 +4938,9 @@ def log(x): output = fluid.layers.log(x) """ helper = LayerHelper('log', **locals()) - dtype = helper.input_dtype() + dtype = helper.input_dtype(input_param_name='x') out = helper.create_tmp_variable(dtype) - helper.append_op(type="log", inputs={"X": input}, outputs={"Out": out}) + helper.append_op(type="log", inputs={"X": x}, outputs={"Out": out}) return out @@ -4967,9 +4967,9 @@ def relu(x): output = fluid.layers.relu(x) """ helper = LayerHelper('relu', **locals()) - dtype = helper.input_dtype() + dtype = helper.input_dtype(input_param_name='x') out = helper.create_tmp_variable(dtype) - helper.append_op(type="relu", inputs={"X": input}, outputs={"Out": out}) + helper.append_op(type="relu", inputs={"X": x}, outputs={"Out": out}) return out From 964f515e9ac12afab04118f68d8e9cf21b58375e Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Thu, 21 Jun 2018 20:47:08 +0800 Subject: [PATCH 12/28] fix mac compile --- paddle/fluid/framework/details/multi_devices_graph_builder.h | 2 +- paddle/fluid/framework/parallel_executor.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index eb1c07630a..0b6347bf51 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -47,7 +47,7 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #endif std::unique_ptr Build(const ProgramDesc &program) const override; - int GetVarDeviceID(const std::string &varname) const; + int GetVarDeviceID(const std::string &varname) const override; private: void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op, diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index d478865fa8..a6788cb6d5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -121,7 +121,7 @@ ParallelExecutor::ParallelExecutor( #endif } - builder_ = std::move(builder_factory.Create()); + builder_ = builder_factory.Create(); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, builder_->Build(main_program))); From d5fb8fa7783dac7d17d9072eb19912c7b2dba3b9 Mon Sep 17 00:00:00 2001 From: tensor-tang Date: Thu, 21 Jun 2018 21:03:11 +0800 Subject: [PATCH 13/28] Revert "Merge pull request #11628 from PaddlePaddle/revert-11102-mozga-intel/Sum_mkldnn_layout" This reverts commit 4d8e8ee22663fa0de9260b0ff0c41d6595ccbf11, reversing changes made to d6a9f005c827f7f1ed8b59a3197ac49a34d58e69. --- paddle/fluid/operators/parallel_do_op.cc | 2 +- paddle/fluid/operators/recurrent_op.cc | 3 +- paddle/fluid/operators/sum_mkldnn_op.cc | 240 ++++++++++++++++++ paddle/fluid/operators/sum_op.cc | 32 ++- paddle/fluid/operators/while_op.cc | 4 +- paddle/fluid/platform/mkldnn_helper.h | 6 + python/paddle/fluid/backward.py | 11 +- python/paddle/fluid/layers/nn.py | 143 ++++++----- python/paddle/fluid/layers/tensor.py | 30 ++- .../tests/unittests/test_sum_mkldnn_op.py | 26 ++ .../fluid/tests/unittests/test_sum_op.py | 6 + .../fluid/transpiler/distribute_transpiler.py | 6 +- python/paddle/reader/decorator.py | 4 +- 13 files changed, 411 insertions(+), 102 deletions(-) create mode 100644 paddle/fluid/operators/sum_mkldnn_op.cc create mode 100644 python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py diff --git a/paddle/fluid/operators/parallel_do_op.cc b/paddle/fluid/operators/parallel_do_op.cc index 1012640d5e..c9744db3d0 100644 --- a/paddle/fluid/operators/parallel_do_op.cc +++ b/paddle/fluid/operators/parallel_do_op.cc @@ -295,7 +295,7 @@ class ParallelDoGradOp : public framework::OperatorBase { auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, - framework::AttributeMap{}); + framework::AttributeMap{{"use_mkldnn", {false}}}); VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]); sum_op->Run(*sub_scopes[0], places[0]); WaitOnPlace(places[0]); diff --git a/paddle/fluid/operators/recurrent_op.cc b/paddle/fluid/operators/recurrent_op.cc index 9c1cee7022..162bfcbb08 100644 --- a/paddle/fluid/operators/recurrent_op.cc +++ b/paddle/fluid/operators/recurrent_op.cc @@ -429,7 +429,8 @@ class RecurrentGradOp : public RecurrentBase { auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {pg_names[param_id], new_inside_name}}}, - {{"Out", {pg_names[param_id]}}}, framework::AttributeMap{}); + {{"Out", {pg_names[param_id]}}}, + framework::AttributeMap{{"use_mkldnn", {false}}}); sum_op->Run(cur_scope, place); cur_scope.Rename(new_inside_name, inside_grad_name); diff --git a/paddle/fluid/operators/sum_mkldnn_op.cc b/paddle/fluid/operators/sum_mkldnn_op.cc new file mode 100644 index 0000000000..f78d977760 --- /dev/null +++ b/paddle/fluid/operators/sum_mkldnn_op.cc @@ -0,0 +1,240 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*Licensed under the Apache License, Version 2.0(the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +#include "mkldnn.hpp" +#include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/operators/sum_op.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/mkldnn_helper.h" + +namespace paddle { +namespace operators { + +using paddle::framework::Tensor; +using paddle::platform::MKLDNNDeviceContext; +using paddle::platform::CPUDeviceContext; +using framework::DataLayout; +using mkldnn::memory; +using mkldnn::primitive; +using mkldnn::stream; +using mkldnn::sum; +using mkldnn::reorder; +using platform::to_void_cast; + +template +class SumMKLDNNOpKernel : public paddle::framework::OpKernel { + public: + void Compute(const paddle::framework::ExecutionContext& ctx) const override { + PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()), + "It must use CPUPlace."); + auto& dev_ctx = ctx.template device_context(); + const auto& mkldnn_engine = dev_ctx.GetEngine(); + auto in_vars = ctx.MultiInputVar("X"); + + const int N = in_vars.size(); + auto out_var = ctx.OutputVar("Out"); + bool in_place = out_var == in_vars[0]; + + if (out_var->IsType()) { + LoDTensor* output = ctx.Output("Out"); + T* output_data = output->mutable_data(ctx.GetPlace()); + + std::vector dst_tz = framework::vectorize2int(output->dims()); + auto src_tz = dst_tz; + memory::format output_format{memory::format::format_undef}; + std::vector scales; + std::vector srcs_mpd; + std::vector srcs_mem; + + PADDLE_ENFORCE(in_vars[0]->IsType(), + "Input[0] must be LoDTensors"); + auto& input0 = in_vars[0]->Get(); + PADDLE_ENFORCE(input0.layout() == DataLayout::kMKLDNN && + input0.format() != memory::format::format_undef, + "Wrong layout/format for inputs[0]"); + + memory::format input_format = input0.format(); + + if (src_tz.size() == 1 && (input_format == memory::format::nchw || + input_format == memory::format::nhwc)) { + input_format = memory::format::x; + } + if (src_tz.size() == 2 && (input_format == memory::format::nchw || + input_format == memory::format::nhwc)) { + input_format = memory::format::nc; + } + + for (int i = in_place ? 1 : 0; i < N; i++) { + PADDLE_ENFORCE(in_vars[i]->IsType(), + "all inputs must be all LoDTensors"); + auto& input = in_vars[i]->Get(); + PADDLE_ENFORCE(input.layout() == DataLayout::kMKLDNN && + input.format() != memory::format::format_undef, + "Wrong layout/format for inputs"); + + if (input.numel() == 0) { + continue; + } + + const T* input_data = input.data(); + + auto src_md = + memory::desc(src_tz, memory::data_type::f32, input_format); + auto src_mpd = memory::primitive_desc(src_md, mkldnn_engine); + auto src_mem = memory(src_mpd, to_void_cast(input_data)); + srcs_mpd.push_back(src_mpd); + srcs_mem.push_back(src_mem); + scales.push_back(1.0); + } + + auto dst_md = + memory::desc(dst_tz, memory::data_type::f32, memory::format::any); + + auto sum_pd = sum::primitive_desc(dst_md, scales, srcs_mpd); + + std::shared_ptr dst_mem; + if (in_place) { + dst_mem.reset(new memory(sum_pd.dst_primitive_desc())); + } else { + dst_mem.reset(new memory(sum_pd.dst_primitive_desc(), output_data)); + } + std::vector inputs; + for (size_t i = 0; i < srcs_mem.size(); ++i) { + inputs.push_back(srcs_mem[i]); + } + + auto sum_prim = mkldnn::sum(sum_pd, inputs, *dst_mem); + output_format = (memory::format)platform::GetMKLDNNFormat(sum_pd); + + primitive reorder_prim; + std::shared_ptr target_mem; + if (in_place) { + output_format = input_format; + target_mem.reset(new memory( + {{{src_tz}, memory::data_type::f32, output_format}, mkldnn_engine}, + output_data)); + reorder_prim = reorder(*dst_mem, *target_mem); + } + + std::vector pipeline; + pipeline.push_back(sum_prim); + if (in_place) pipeline.push_back(reorder_prim); + stream(stream::kind::eager).submit(pipeline).wait(); + + output->set_layout(DataLayout::kMKLDNN); + output->set_format(output_format); + } else if (out_var->IsType()) { + // TODO(@mozga-intel) Add MKLDNN SelectedRows support + std::unique_ptr in0; + if (in_place) { + // If is in_place, we store the input[0] to in0 + auto& in_sel0 = in_vars[0]->Get(); + auto& rows = in_sel0.rows(); + in0.reset(new framework::SelectedRows(rows, in_sel0.height())); + in0->mutable_value()->ShareDataWith(in_sel0.value()); + } + + auto get_selected_row = [&](size_t i) -> const SelectedRows& { + if (i == 0 && in0) { + return *in0.get(); + } else { + return in_vars[i]->Get(); + } + }; + auto* out = ctx.Output("Out"); + out->mutable_rows()->clear(); + auto* out_value = out->mutable_value(); + + // Runtime InferShape + size_t first_dim = 0; + for (int i = 0; i < N; i++) { + auto& sel_row = get_selected_row(i); + first_dim += sel_row.rows().size(); + } + auto in_dim = + framework::vectorize(get_selected_row(N - 1).value().dims()); + in_dim[0] = static_cast(first_dim); + + out_value->Resize(framework::make_ddim(in_dim)); + + // if all the input sparse vars are empty, no need to + // merge these vars. + if (first_dim == 0UL) { + return; + } + out_value->mutable_data(ctx.GetPlace()); + math::SelectedRowsAddTo functor; + int64_t offset = 0; + for (int i = 0; i < N; i++) { + auto& sel_row = get_selected_row(i); + if (sel_row.rows().size() == 0) { + continue; + } + PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); + functor(ctx.template device_context(), sel_row, + offset, out); + offset += sel_row.value().numel(); + } + } else if (out_var->IsType()) { + // TODO(@mozga-intel) Add MKLDNN LoDTensorArray support + auto& out_array = *out_var->GetMutable(); + for (size_t i = in_place ? 1 : 0; i < in_vars.size(); ++i) { + PADDLE_ENFORCE(in_vars[i]->IsType(), + "Only support all inputs are TensorArray"); + auto& in_array = in_vars[i]->Get(); + + for (size_t i = 0; i < in_array.size(); ++i) { + if (in_array[i].numel() != 0) { + if (i >= out_array.size()) { + out_array.resize(i + 1); + } + if (out_array[i].numel() == 0) { + framework::TensorCopy(in_array[i], in_array[i].place(), + ctx.device_context(), &out_array[i]); + out_array[i].set_lod(in_array[i].lod()); + } else { + PADDLE_ENFORCE(out_array[i].lod() == in_array[i].lod()); + auto in = EigenVector::Flatten(in_array[i]); + auto result = EigenVector::Flatten(out_array[i]); + result.device(*ctx.template device_context() + .eigen_device()) = result + in; + } + } + } + } + } else { + PADDLE_THROW("Unexpected branch, output variable type is %s", + out_var->Type().name()); + } + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OP_KERNEL(sum, MKLDNN, ::paddle::platform::CPUPlace, + paddle::operators::SumMKLDNNOpKernel); diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index 863baba9ea..fe7c7039c7 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -18,6 +18,10 @@ limitations under the License. */ #include "paddle/fluid/framework/var_type_inference.h" #include "paddle/fluid/operators/detail/safe_ref.h" +#ifdef PADDLE_WITH_MKLDNN +#include "paddle/fluid/platform/mkldnn_helper.h" +#endif + namespace paddle { namespace operators { using framework::Tensor; @@ -63,6 +67,18 @@ class SumOp : public framework::OperatorWithKernel { framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { auto x_vars = ctx.MultiInputVar("X"); + + framework::LibraryType library{framework::LibraryType::kPlain}; + framework::DataLayout layout{framework::DataLayout::kAnyLayout}; + +#ifdef PADDLE_WITH_MKLDNN + if (library == framework::LibraryType::kPlain && + platform::CanMKLDNNBeUsed(ctx)) { + library = framework::LibraryType::kMKLDNN; + layout = framework::DataLayout::kMKLDNN; + } +#endif + if (x_vars[0]->IsType()) { int dtype = -1; for (auto& x_var : x_vars) { @@ -80,26 +96,27 @@ class SumOp : public framework::OperatorWithKernel { "Sum operator should have at least one tensor"); return framework::OpKernelType( - static_cast(dtype), - ctx.device_context()); + static_cast(dtype), ctx.GetPlace(), + layout, library); } else if (x_vars[0]->IsType()) { for (auto& var : x_vars) { auto& value = var->Get().value(); if (value.IsInitialized()) { return framework::OpKernelType(framework::ToDataType(value.type()), - ctx.device_context()); + ctx.device_context(), layout, library); } } // if input sparse vars are not initialized, use an default kernel type. return framework::OpKernelType(framework::proto::VarType::FP32, - ctx.device_context()); + ctx.device_context(), layout, library); } else if (x_vars[0]->IsType()) { for (auto& x_var : x_vars) { auto& array = x_var->Get(); for (auto& each : array) { if (each.numel() != 0) { return framework::OpKernelType(framework::ToDataType(each.type()), - ctx.device_context()); + ctx.device_context(), layout, + library); } } } @@ -116,6 +133,9 @@ class SumOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("X", "(vector) The input tensors of sum operator.") .AsDuplicable(); AddOutput("Out", "(Tensor) The output tensor of sum operator.").Reuse("X"); + AddAttr("use_mkldnn", + "(bool, default false) Only used in mkldnn kernel") + .SetDefault(false); AddComment(R"DOC( Sum operator. @@ -132,7 +152,6 @@ class SumOpVarTypeInference : public framework::VarTypeInference { framework::BlockDesc* block) const override { auto& inputs = op_desc.Input("X"); auto var_type = framework::proto::VarType::SELECTED_ROWS; - for (auto& name : op_desc.Input("X")) { VLOG(10) << name << " " << block->FindRecursiveOrCreateVar(name).GetType(); @@ -206,6 +225,7 @@ namespace ops = paddle::operators; REGISTER_OPERATOR(sum, ops::SumOp, ops::SumOpMaker, ops::SumGradMaker, ops::SumOpVarTypeInference); + REGISTER_OP_CPU_KERNEL( sum, ops::SumKernel, ops::SumKernel, diff --git a/paddle/fluid/operators/while_op.cc b/paddle/fluid/operators/while_op.cc index 175c3ac5d7..f440058e8d 100644 --- a/paddle/fluid/operators/while_op.cc +++ b/paddle/fluid/operators/while_op.cc @@ -203,11 +203,11 @@ class WhileGradOp : public framework::OperatorBase { ->set_lod(inside_tensor.lod()); } } - auto new_inside_name = cur_scope.Rename(inside_grad_name); auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {pg_names[param_id], new_inside_name}}}, - {{"Out", {pg_names[param_id]}}}, framework::AttributeMap{}); + {{"Out", {pg_names[param_id]}}}, + framework::AttributeMap{{"use_mkldnn", {false}}}); sum_op->Run(cur_scope, dev_place); cur_scope.Rename(new_inside_name, inside_grad_name); } diff --git a/paddle/fluid/platform/mkldnn_helper.h b/paddle/fluid/platform/mkldnn_helper.h index de711b7d23..2689d5e078 100644 --- a/paddle/fluid/platform/mkldnn_helper.h +++ b/paddle/fluid/platform/mkldnn_helper.h @@ -99,5 +99,11 @@ inline mkldnn::memory::format GetMKLDNNFormat(const mkldnn::memory memory) { memory.get_primitive_desc().desc().data.format); } +inline mkldnn::memory::format GetMKLDNNFormat( + const mkldnn::sum::primitive_desc& memory) { + return static_cast( + memory.dst_primitive_desc().desc().data.format); +} + } // namespace platform } // namespace paddle diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index f7bbc98fe1..4faa063031 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -132,9 +132,9 @@ def _addup_repetitive_outputs_(op_descs): for idx, op_desc in enumerate(op_descs): for var_name in op_desc.input_arg_names(): if len(renamed_vars[var_name]) > 1: - pending_sum_ops.append( - (_create_op_desc_("sum", {"X": renamed_vars[var_name]}, - {"Out": [var_name]}, {}), idx)) + pending_sum_ops.append((_create_op_desc_( + "sum", {"X": renamed_vars[var_name]}, {"Out": [var_name]}, + {"use_mkldnn": False}), idx)) renamed_vars[var_name] = [var_name] for var_name in op_desc.output_arg_names(): if var_name == core.empty_var_name( @@ -161,8 +161,9 @@ def _addup_repetitive_outputs_(op_descs): renamed_vars[var_name].append(new_name) for var_name, inputs in renamed_vars.iteritems(): if len(inputs) > 1: - pending_sum_ops.append((_create_op_desc_( - "sum", {"X": inputs}, {"Out": [var_name]}, {}), len(op_descs))) + pending_sum_ops.append( + (_create_op_desc_("sum", {"X": inputs}, {"Out": [var_name]}, + {"use_mkldnn": False}), len(op_descs))) # sum_op descs are sorted according to their insert position for p in reversed(pending_sum_ops): op_descs.insert(p[1], p[0]) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 097fbe982f..a87bead14c 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -All layers just related to the neural network. +All layers just related to the neural network. """ from ..layer_helper import LayerHelper @@ -109,14 +109,14 @@ def fc(input, """ **Fully Connected Layer** - This function creates a fully connected layer in the network. It can take - multiple tensors as its inputs. It creates a variable called weights for - each input tensor, which represents a fully connected weight matrix from - each input unit to each output unit. The fully connected layer multiplies - each input tensor with its coresponding weight to produce an output Tensor. - If multiple input tensors are given, the results of multiple multiplications - will be sumed up. If bias_attr is not None, a bias variable will be created - and added to the output. Finally, if activation is not None, it will be applied + This function creates a fully connected layer in the network. It can take + multiple tensors as its inputs. It creates a variable called weights for + each input tensor, which represents a fully connected weight matrix from + each input unit to each output unit. The fully connected layer multiplies + each input tensor with its coresponding weight to produce an output Tensor. + If multiple input tensors are given, the results of multiple multiplications + will be sumed up. If bias_attr is not None, a bias variable will be created + and added to the output. Finally, if activation is not None, it will be applied to the output as well. This process can be formulated as follows: @@ -198,7 +198,10 @@ def fc(input, else: pre_bias = helper.create_tmp_variable(dtype) helper.append_op( - type="sum", inputs={"X": mul_results}, outputs={"Out": pre_bias}) + type="sum", + inputs={"X": mul_results}, + outputs={"Out": pre_bias}, + attrs={"use_mkldnn": use_mkldnn}) # add bias pre_activation = helper.append_bias_op(pre_bias, dim_start=num_flatten_dims) # add activation @@ -847,7 +850,7 @@ def crf_decoding(input, param_attr, label=None): Returns: Variable: ${viterbi_path_comment} - + Examples: .. code-block:: python @@ -1085,7 +1088,7 @@ def chunk_eval(input, Here is a NER example of labeling for these tagging schemes: .. code-block:: python - + ====== ====== ====== ===== == ============ ===== ===== ===== == ========= Li Ming works at Agricultural Bank of China in Beijing. ====== ====== ====== ===== == ============ ===== ===== ===== == ========= @@ -1111,7 +1114,7 @@ def chunk_eval(input, is the num of chunk types, and `tag_type` get its value from the following table. .. code-block:: python - + Scheme Begin Inside End Single plain 0 - - - IOB 0 1 - - @@ -1147,7 +1150,7 @@ def chunk_eval(input, tuple: tuple containing: precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks - + Examples: .. code-block:: python @@ -1247,7 +1250,7 @@ def sequence_softmax(input, param_attr=None, bias_attr=None, use_cudnn=True): """ This function computes the softmax activation among all time-steps for each sequence. The dimension of each time-step should be 1. Thus, the shape of - input Tensor can be either :math:`[N, 1]` or :math:`[N]`, where :math:`N` + input Tensor can be either :math:`[N, 1]` or :math:`[N]`, where :math:`N` is the sum of the length of all sequences. For i-th sequence in a mini-batch: @@ -1267,7 +1270,7 @@ def sequence_softmax(input, param_attr=None, bias_attr=None, use_cudnn=True): param_attr (ParamAttr|None): attributes for parameter use_cudnn (bool): Use cudnn kernel or not, it is valid only when the cudnn \ library is installed. Default: True - + Returns: Variable: output of sequence_softmax @@ -1828,11 +1831,11 @@ def pool2d(input, ${comment} Args: - input (Variable): The input tensor of pooling operator. The format of - input tensor is NCHW, where N is batch size, C is - the number of channels, H is the height of the + input (Variable): The input tensor of pooling operator. The format of + input tensor is NCHW, where N is batch size, C is + the number of channels, H is the height of the feature, and W is the width of the feature. - pool_size (int): The side length of pooling windows. All pooling + pool_size (int): The side length of pooling windows. All pooling windows are squares with pool_size on a side. pool_type: ${pooling_type_comment} pool_stride (int): stride of the pooling layer. @@ -1841,7 +1844,7 @@ def pool2d(input, use_cudnn: ${use_cudnn_comment} ceil_mode: ${ceil_mode_comment} use_mkldnn: ${use_mkldnn_comment} - name (str|None): A name for this layer(optional). If set None, the + name (str|None): A name for this layer(optional). If set None, the layer will be named automatically. Returns: @@ -1859,10 +1862,10 @@ def pool2d(input, data = fluid.layers.data( name='data', shape=[3, 32, 32], dtype='float32') conv2d = fluid.layers.pool2d( - input=data, - pool_size=2, - pool_type='max', - pool_stride=1, + input=data, + pool_size=2, + pool_type='max', + pool_stride=1, global_pooling=False) """ if pool_type not in ["max", "avg"]: @@ -2227,14 +2230,14 @@ def beam_search_decode(ids, scores, name=None): This layers is to pack the output of beam search layer into sentences and associated scores. It is usually called after the beam search layer. Typically, the output of beam search layer is a tensor of selected ids, with - a tensor of the score of each id. Beam search layer's output ids, however, - are generated directly during the tree search, and they are stacked by each - level of the search tree. Thus we need to reorganize them into sentences, + a tensor of the score of each id. Beam search layer's output ids, however, + are generated directly during the tree search, and they are stacked by each + level of the search tree. Thus we need to reorganize them into sentences, based on the score of each id. This layer takes the output of beam search layer as input and repack them into sentences. Args: - ids (Variable): The selected ids, output of beam search layer. + ids (Variable): The selected ids, output of beam search layer. scores (Variable): The associated scores of the ids, out put of beam search layer. name (str): The name of this layer. It is optional. @@ -2242,7 +2245,7 @@ def beam_search_decode(ids, scores, name=None): Returns: tuple(Variable): a tuple of two output tensors: sentence_ids, sentence_scores. sentence_ids is a tensor with shape [size, length], where size is the - beam size of beam search, and length is the length of each sentence. + beam size of beam search, and length is the length of each sentence. Note that the length of sentences may vary. sentence_scores is a tensor with the same shape as sentence_ids. @@ -2919,7 +2922,7 @@ def reduce_mean(input, dim=None, keep_dim=False, name=None): `None`, compute the mean over all elements of :attr:`input` and return a variable with a single element, otherwise it must be in the range :math:`[-rank(input), rank(input))`. If - :math:`dim[i] < 0`, the dimension to reduce is + :math:`dim[i] < 0`, the dimension to reduce is :math:`rank(input) + dim[i]`. keep_dim (bool): Whether to reserve the reduced dimension in the output Tensor. The result tensor will have one fewer dimension @@ -3390,16 +3393,16 @@ def topk(input, k, name=None): Args: input(Variable): The input variable which can be a vector or Tensor with higher rank. - k(int): The number of top elements to look for along the last dimension + k(int): The number of top elements to look for along the last dimension of input. name(str|None): A name for this layer(optional). If set None, the layer - will be named automatically. + will be named automatically. Default: None Returns: - Tuple[Variable]: A tuple with two elements. Each element is a Variable. - The first one is k largest elements along each last - dimensional slice. The second one is indices of values + Tuple[Variable]: A tuple with two elements. Each element is a Variable. + The first one is k largest elements along each last + dimensional slice. The second one is indices of values within the last dimension of input. Raises: @@ -3594,15 +3597,15 @@ def warpctc(input, label, blank=0, norm_by_times=False): It's shape is [Lp, num_classes + 1], where Lp is the sum of all input sequences' length and num_classes is the true number of classes. (not including the blank label). - label (Variable): The ground truth of variable-length sequence, + label (Variable): The ground truth of variable-length sequence, which is a 2-D Tensor with LoD information. It is of the shape [Lg, 1], where Lg is th sum of all labels' length. blank (int, default 0): The blank label index of Connectionist Temporal Classification (CTC) loss, which is in the half-opened interval [0, num_classes + 1). - norm_by_times(bool, default false): Whether to normalize the gradients - by the number of time-step, which is also the sequence's length. - There is no need to normalize the gradients if warpctc layer was + norm_by_times(bool, default false): Whether to normalize the gradients + by the number of time-step, which is also the sequence's length. + There is no need to normalize the gradients if warpctc layer was follewed by a mean_op. Returns: @@ -3708,8 +3711,8 @@ def nce(input, input (Variable): input variable. label (Variable): label. num_total_classes (int):${num_total_classes_comment} - sample_weight (Variable|None): A Variable of shape [batch_size, 1] - storing a weight for each sample. The default weight for each + sample_weight (Variable|None): A Variable of shape [batch_size, 1] + storing a weight for each sample. The default weight for each sample is 1.0. param_attr (ParamAttr|None): attributes for parameter bias_attr (ParamAttr|None): attributes for bias @@ -4099,7 +4102,7 @@ def smooth_l1(x, y, inside_weight=None, outside_weight=None, sigma=None): This layer computes the smooth L1 loss for Variable :attr:`x` and :attr:`y`. It takes the first dimension of :attr:`x` and :attr:`y` as batch size. For each instance, it computes the smooth L1 loss element by element first - and then sums all the losses. So the shape of ouput Variable is + and then sums all the losses. So the shape of ouput Variable is [batch_size, 1]. Args: @@ -4108,14 +4111,14 @@ def smooth_l1(x, y, inside_weight=None, outside_weight=None, sigma=None): y (Variable): A tensor with rank at least 2. The target value of smooth L1 loss op with same shape as :attr:`x`. inside_weight (Variable|None): A tensor with rank at least 2. This - input is optional and should have same shape with :attr:`x`. If - provided, the result of (:attr:`x` - :attr:`y`) will be multiplied + input is optional and should have same shape with :attr:`x`. If + provided, the result of (:attr:`x` - :attr:`y`) will be multiplied by this tensor element by element. outside_weight (Variable|None): A tensor with rank at least 2. This - input is optional and should have same shape with :attr:`x`. If - provided, the out smooth L1 loss will be multiplied by this tensor + input is optional and should have same shape with :attr:`x`. If + provided, the out smooth L1 loss will be multiplied by this tensor element by element. - sigma (float|None): Hyper parameter of smooth L1 loss layer. A float + sigma (float|None): Hyper parameter of smooth L1 loss layer. A float scalar with default value 1.0. Returns: @@ -4161,7 +4164,7 @@ def one_hot(input, depth): Examples: .. code-block:: python - + label = layers.data(name="label", shape=[1], dtype="float32") one_hot_label = layers.one_hot(input=label, depth=10) """ @@ -4315,10 +4318,10 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None): def lod_reset(x, y=None, target_lod=None): """ Set LoD of :attr:`x` to a new one specified by :attr:`y` or - :attr:`target_lod`. When :attr:`y` provided, :attr:`y.lod` would be - considered as target LoD first, otherwise :attr:`y.data` would be - considered as target LoD. If :attr:`y` is not provided, target LoD should - be specified by :attr:`target_lod`. If target LoD is specified by + :attr:`target_lod`. When :attr:`y` provided, :attr:`y.lod` would be + considered as target LoD first, otherwise :attr:`y.data` would be + considered as target LoD. If :attr:`y` is not provided, target LoD should + be specified by :attr:`target_lod`. If target LoD is specified by :attr:`Y.data` or :attr:`target_lod`, only one level LoD is supported. .. code-block:: text @@ -4372,7 +4375,7 @@ def lod_reset(x, y=None, target_lod=None): Args: x (Variable): Input variable which could be a Tensor or LodTensor. - y (Variable|None): If provided, output's LoD would be derived + y (Variable|None): If provided, output's LoD would be derived from :attr:`y`. target_lod (list|tuple|None): One level LoD which should be considered as target LoD when :attr:`y` not provided. @@ -4688,7 +4691,7 @@ def image_resize(input, """ **Resize a Batch of Images** - The input must be a tensor of the shape (num_batches, channels, in_h, in_w), + The input must be a tensor of the shape (num_batches, channels, in_h, in_w), and the resizing only applies on the last two dimensions(hight and width). Supporting resample methods: @@ -4784,9 +4787,9 @@ def resize_bilinear(input, out_shape=None, scale=None, name=None): def image_resize_short(input, out_short_len, resample='BILINEAR'): """ - Resize a batch of images. The short edge of input images will be - resized to the given 'out_short_len'. The long edge of input images - will be resized proportionately to make images' length-width ratio + Resize a batch of images. The short edge of input images will be + resized to the given 'out_short_len'. The long edge of input images + will be resized proportionately to make images' length-width ratio constant. Args: @@ -4819,7 +4822,7 @@ def gather(input, index): """ **Gather Layer** - Output is obtained by gathering entries of the outer-most dimension + Output is obtained by gathering entries of the outer-most dimension of X indexed by `index` and concatenate them together. .. math:: @@ -4844,7 +4847,7 @@ def gather(input, index): [5, 6]] Args: - input (Variable): The source input with rank>=1. + input (Variable): The source input with rank>=1. index (Variable): The index input with rank=1. Returns: @@ -4880,7 +4883,7 @@ def random_crop(x, shape, seed=None): Returns: ${out_comment} - + Examples: >>> img = fluid.layers.data("img", [3, 256, 256]) >>> cropped_img = fluid.layers.random_crop(img, shape=[3, 224, 224]) @@ -4926,7 +4929,7 @@ def log(x): Out = \\ln(x) Args: - x (Variable): Input tensor. + x (Variable): Input tensor. Returns: Variable: The natural log of the input tensor computed element-wise. @@ -4955,7 +4958,7 @@ def relu(x): Out = \\max(0, x) Args: - x (Variable): The input tensor. + x (Variable): The input tensor. Returns: Variable: The output tensor with the same shape as input. @@ -4976,15 +4979,15 @@ def relu(x): def mean_iou(input, label, num_classes): """ Mean Intersection-Over-Union is a common evaluation metric for - semantic image segmentation, which first computes the IOU for each - semantic class and then computes the average over classes. - IOU is defined as follows: - + semantic image segmentation, which first computes the IOU for each + semantic class and then computes the average over classes. + IOU is defined as follows: + .. math:: IOU = \\frac{true\_positiv}{(true\_positive + false\_positive + false\_negative)}. - The predictions are accumulated in a confusion matrix and mean-IOU + The predictions are accumulated in a confusion matrix and mean-IOU is then calculated from it. @@ -4997,12 +5000,12 @@ def mean_iou(input, label, num_classes): Returns: mean_iou (Variable): A Tensor representing the mean intersection-over-union with shape [1]. out_wrong(Variable): A Tensor with shape [num_classes]. The wrong numbers of each class. - out_correct(Variable): A Tensor with shape [num_classes]. The correct numbers of each class. + out_correct(Variable): A Tensor with shape [num_classes]. The correct numbers of each class. Examples: .. code-block:: python - + iou, wrongs, corrects = fluid.layers.mean_iou(predict, label, num_classes) """ helper = LayerHelper('mean_iou', **locals()) diff --git a/python/paddle/fluid/layers/tensor.py b/python/paddle/fluid/layers/tensor.py index 149e77b524..b7a8bff30d 100644 --- a/python/paddle/fluid/layers/tensor.py +++ b/python/paddle/fluid/layers/tensor.py @@ -230,7 +230,11 @@ def sums(input, out=None): helper = LayerHelper('sum', **locals()) if out is None: out = helper.create_tmp_variable(dtype=helper.input_dtype()) - helper.append_op(type='sum', inputs={'X': input}, outputs={'Out': out}) + helper.append_op( + type='sum', + inputs={'X': input}, + outputs={'Out': out}, + attrs={'use_mkldnn': False}) return out @@ -380,7 +384,7 @@ def argmin(x, axis=0): """ **argmin** - This function computes the indices of the min elements + This function computes the indices of the min elements of the input tensor's element along the provided axis. Args: @@ -395,7 +399,7 @@ def argmin(x, axis=0): .. code-block:: python out = fluid.layers.argmin(x=in, axis=0) - out = fluid.layers.argmin(x=in, axis=-1) + out = fluid.layers.argmin(x=in, axis=-1) """ helper = LayerHelper("arg_min", **locals()) out = helper.create_tmp_variable(VarDesc.VarType.INT64) @@ -411,7 +415,7 @@ def argmax(x, axis=0): """ **argmax** - This function computes the indices of the max elements + This function computes the indices of the max elements of the input tensor's element along the provided axis. Args: @@ -426,7 +430,7 @@ def argmax(x, axis=0): .. code-block:: python out = fluid.layers.argmax(x=in, axis=0) - out = fluid.layers.argmax(x=in, axis=-1) + out = fluid.layers.argmax(x=in, axis=-1) """ helper = LayerHelper("arg_max", **locals()) out = helper.create_tmp_variable(VarDesc.VarType.INT64) @@ -495,9 +499,9 @@ def reverse(x, axis): Args: x(Vairbale): the input to be reversed. - axis(int|tuple|list): Axis that along which order of elements - is reversed. If it is a tuple or a list, reversing - will be apply on each axis in the tuple or list. + axis(int|tuple|list): Axis that along which order of elements + is reversed. If it is a tuple or a list, reversing + will be apply on each axis in the tuple or list. Returns: Variable: The reversed tensor. @@ -528,9 +532,9 @@ def save(x, file_path, overwrite=True): Args: x(variable): The Tensor/LoDTensor to be saved. file_path(str): The file path where the variable will be saved. - overwrite(bool): Whether or not cover the given file when it has already - existed. If it's set 'False' and the file is existed, a runtime - error will be thrown. + overwrite(bool): Whether or not cover the given file when it has already + existed. If it's set 'False' and the file is existed, a runtime + error will be thrown. """ helper = LayerHelper("save", **locals()) helper.append_op( @@ -550,8 +554,8 @@ def save_combine(x, file_path, overwrite=True): a single file. file_path(str): The file path where variables will be saved. overwrite(bool): Whether or not cover the given file when it has already - existed. If it's set 'False' and the file is existed, a runtime - error will be thrown. + existed. If it's set 'False' and the file is existed, a runtime + error will be thrown. Returns: There is no return value. diff --git a/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py new file mode 100644 index 0000000000..7956897d68 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py @@ -0,0 +1,26 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from test_sum_op import TestSumOp + + +class TestMKLDNN(TestSumOp): + def init_kernel_type(self): + self.use_mkldnn = True + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_sum_op.py b/python/paddle/fluid/tests/unittests/test_sum_op.py index 2faf5b1064..1d90414e13 100644 --- a/python/paddle/fluid/tests/unittests/test_sum_op.py +++ b/python/paddle/fluid/tests/unittests/test_sum_op.py @@ -20,12 +20,15 @@ from op_test import OpTest class TestSumOp(OpTest): def setUp(self): self.op_type = "sum" + self.use_mkldnn = False + self.init_kernel_type() x0 = np.random.random((3, 4)).astype('float32') x1 = np.random.random((3, 4)).astype('float32') x2 = np.random.random((3, 4)).astype('float32') self.inputs = {"X": [("x0", x0), ("x1", x1), ("x2", x2)]} y = x0 + x1 + x2 self.outputs = {'Out': y} + self.attrs = {'use_mkldnn': self.use_mkldnn} def test_check_output(self): self.check_output() @@ -33,6 +36,9 @@ class TestSumOp(OpTest): def test_check_grad(self): self.check_grad(['x0'], 'Out') + def init_kernel_type(self): + pass + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 041f0aa42f..d8d6a7e941 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -872,7 +872,8 @@ class DistributeTranspiler(object): table_opt_block.append_op( type="sum", inputs={"X": pserver_side_table_grad_list}, - outputs={"Out": [grad_var]}) + outputs={"Out": [grad_var]}, + attrs={"use_mkldnn": False}) else: # in async_mode, for table gradient, it also need to be splited to each parameter server origin_grad_name = grad_var.name @@ -1104,7 +1105,8 @@ class DistributeTranspiler(object): optimize_block.append_op( type="sum", inputs={"X": vars2merge}, - outputs={"Out": merged_var}) + outputs={"Out": merged_var}, + attrs={"use_mkldnn": False}) # TODO(panyx0718): What if it's SELECTED_ROWS. if not merged_var.type == core.VarDesc.VarType.SELECTED_ROWS: optimize_block.append_op( diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py index 44a6e34463..1f83cabb84 100644 --- a/python/paddle/reader/decorator.py +++ b/python/paddle/reader/decorator.py @@ -336,7 +336,7 @@ def _buf2lines(buf, line_break="\n"): class PipeReader: """ - PipeReader read data by stream from a command, take it's + PipeReader read data by stream from a command, take it's stdout into a pipe buffer and redirect it to the parser to parse, then yield data as your desired format. @@ -352,7 +352,7 @@ class PipeReader: An example: .. code-block:: python - + def example_reader(): for f in myfiles: pr = PipeReader("cat %s"%f) From 064ca35285c11dd1966d8345edd3629511ac707a Mon Sep 17 00:00:00 2001 From: tensor-tang Date: Thu, 21 Jun 2018 21:12:09 +0800 Subject: [PATCH 14/28] fix indentation error --- python/paddle/fluid/layers/tensor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/paddle/fluid/layers/tensor.py b/python/paddle/fluid/layers/tensor.py index b7a8bff30d..2a4fcf8ac1 100644 --- a/python/paddle/fluid/layers/tensor.py +++ b/python/paddle/fluid/layers/tensor.py @@ -230,11 +230,11 @@ def sums(input, out=None): helper = LayerHelper('sum', **locals()) if out is None: out = helper.create_tmp_variable(dtype=helper.input_dtype()) - helper.append_op( - type='sum', - inputs={'X': input}, - outputs={'Out': out}, - attrs={'use_mkldnn': False}) + helper.append_op( + type='sum', + inputs={'X': input}, + outputs={'Out': out}, + attrs={'use_mkldnn': False}) return out From 98f3ad3ba1d113651911f7cb08c521304f7cc3b8 Mon Sep 17 00:00:00 2001 From: Jacek Czaja Date: Tue, 8 May 2018 10:34:01 -0700 Subject: [PATCH 15/28] - MKLDNN Softmax Grad Op - Added hash function inside of MKLDNN softmax op to be used as handle for primitives stroing in a context - Style fixes to softmax mkldnn op - Fixes after review - Coding style - Fix to style - style fixes - style fix - style fixes - Fix to cody style check - Rephrasing a comment fix t obroken merge Fixes to rebase Conflicts: benchmark/fluid/models/machine_translation.py cmake/external/mkldnn.cmake paddle/fluid/operators/softmax_mkldnn_op.cc - Bumped revision of MKL-DNN up to have softmax backward primitive - Added choosing MKLDNN softmax grad operator - First reuse of softmax backward - Reinvented reusing for softmax - Fix to crash in reinvented reuse - Clang format fixes - Clang format fixes - Improved softmax mkldnn reuse mechanism - clang format fixes - Fix to broken merge - Fix --- cmake/external/mkldnn.cmake | 2 +- paddle/fluid/operators/softmax_mkldnn_op.cc | 217 +++++++++++++++----- paddle/fluid/operators/softmax_op.cc | 22 +- paddle/fluid/platform/mkldnn_helper.h | 132 ++++++++++++ 4 files changed, 318 insertions(+), 55 deletions(-) diff --git a/cmake/external/mkldnn.cmake b/cmake/external/mkldnn.cmake index 48d3d2db7f..20dda35c5c 100644 --- a/cmake/external/mkldnn.cmake +++ b/cmake/external/mkldnn.cmake @@ -54,7 +54,7 @@ ExternalProject_Add( ${EXTERNAL_PROJECT_LOG_ARGS} DEPENDS ${MKLDNN_DEPENDS} GIT_REPOSITORY "https://github.com/01org/mkl-dnn.git" - GIT_TAG "db3424ad44901513c03a1ea31ccaacdf633fbe9f" + GIT_TAG "a29d8487a63afca3d5b8c5bbdbb473cf8ccc6e51" PREFIX ${MKLDNN_SOURCES_DIR} UPDATE_COMMAND "" CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${MKLDNN_INSTALL_DIR} diff --git a/paddle/fluid/operators/softmax_mkldnn_op.cc b/paddle/fluid/operators/softmax_mkldnn_op.cc index 14b57b11fe..6668e6b9e9 100644 --- a/paddle/fluid/operators/softmax_mkldnn_op.cc +++ b/paddle/fluid/operators/softmax_mkldnn_op.cc @@ -27,8 +27,81 @@ using paddle::platform::MKLDNNMemDesc; using mkldnn::memory; // Note: paddle has also "memory" namespace using mkldnn::primitive; using mkldnn::softmax_forward; +using mkldnn::softmax_backward; using mkldnn::prop_kind; using mkldnn::stream; +using platform::to_void_cast; + +class SoftmaxMKLDNNHandler : public platform::MKLDNNHandler { + public: + SoftmaxMKLDNNHandler( + std::shared_ptr softmax_pd, + const platform::MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine, + const std::string& base_key) + : platform::MKLDNNHandler(dev_ctx, engine, base_key), + softmax_pd_(softmax_pd) {} + + SoftmaxMKLDNNHandler( + std::shared_ptr softmax_pd, + std::shared_ptr softmax_bwd_pd, + const platform::MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine, + const std::string& base_key) + : platform::MKLDNNHandler(dev_ctx, engine, base_key), + softmax_pd_(softmax_pd), + softmax_bwd_pd_(softmax_bwd_pd) { + // If we are in Grad operatgor then update a key with BWD suffix to + // distinguish from FWD memory primitives + key_ += "-BWD"; + } + + std::shared_ptr AcquireSoftmax( + std::shared_ptr dst_memory_p, + std::shared_ptr src_memory_p) { + /*Generate key*/ + auto prim_key = key_ + "@softmax_p"; + + auto softmax_p = std::static_pointer_cast( + dev_ctx_.GetBlob(prim_key)); + PADDLE_ENFORCE((softmax_p != nullptr) || (is_reusing_ == false), + "Fail to find softmax primitive in device context"); + if (softmax_p == nullptr) { + softmax_p = std::make_shared( + *(softmax_pd_.get()), + *(static_cast(src_memory_p.get())), + *(static_cast(dst_memory_p.get()))); + dev_ctx_.SetBlob(prim_key, softmax_p); + } else { + is_reusing_ = true; + } + + return softmax_p; + } + + std::shared_ptr AcquireSoftmaxBackward( + std::shared_ptr dst_memory_p, + std::shared_ptr diff_dst_memory_p, + std::shared_ptr diff_src_memory_p) { + auto prim_key = key_ + "@softmax_bwd_p"; + auto softmax_bwd_p = std::static_pointer_cast( + dev_ctx_.GetBlob(prim_key)); + PADDLE_ENFORCE((softmax_bwd_p != nullptr) || (is_reusing_ == false), + "Fail to find softmax backward primitive in device context"); + if (softmax_bwd_p == nullptr) { + softmax_bwd_p = std::make_shared( + *softmax_bwd_pd_, *(dst_memory_p.get()), *(diff_dst_memory_p.get()), + *(diff_src_memory_p.get())); + dev_ctx_.SetBlob(prim_key, softmax_bwd_p); + } else { + is_reusing_ = true; + } + + return softmax_bwd_p; + } + + private: + std::shared_ptr softmax_pd_; + std::shared_ptr softmax_bwd_pd_; +}; template class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel { @@ -54,56 +127,27 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel { // Same memory descriptor to be used for input and output memory::dims softmax_tz = {src_tz[0], src_tz[1]}; // Generate keys for storing/retriving primitives for this operator - // TODO(jczaja): Each MKLDNN operator may have diffrent hashing function - auto gethash = [](memory::dims& operand_dims) { - return std::string(std::to_string(operand_dims[0]) + "-" + - std::to_string(operand_dims[1])); - }; - const std::string key = gethash(softmax_tz); - const std::string key_softmax_p = key + "@softmax_p"; - const std::string key_softmax_src_mem_p = key + "@softmax_src_mem_p"; - const std::string key_softmax_dst_mem_p = key + "@softmax_dst_mem_p"; - - std::shared_ptr softmax_p = dev_ctx.GetBlob(key_softmax_p); - if (softmax_p == nullptr) { - // Currently only NC data format is supported - auto softmax_md = - MKLDNNMemDesc({softmax_tz}, memory::f32, memory::format::nc); - // Normalization is made after innermost dimension eg. C out of NC - auto softmax_desc = softmax_forward::desc(prop_kind::forward_scoring, - softmax_md, 1 /*dim: C*/); - // create memory primitives - auto softmax_src_memory_p = std::make_shared( - memory::primitive_desc{softmax_md, mkldnn_engine}, - static_cast(const_cast(input_data))); - dev_ctx.SetBlob(key_softmax_src_mem_p, softmax_src_memory_p); - auto softmax_dst_memory_p = std::make_shared( - memory::primitive_desc{softmax_md, mkldnn_engine}, - static_cast(output_data)); - dev_ctx.SetBlob(key_softmax_dst_mem_p, softmax_dst_memory_p); - - auto softmax_forward_pd = - std::make_shared(softmax_desc, - mkldnn_engine); - softmax_p = std::make_shared( - *(softmax_forward_pd.get()), - *(static_cast(softmax_src_memory_p.get())), - *(static_cast(softmax_dst_memory_p.get()))); - dev_ctx.SetBlob(key_softmax_p, softmax_p); - } else { - // Primitives already exist - auto src_memory_p = std::static_pointer_cast( - dev_ctx.GetBlob(key_softmax_src_mem_p)); - PADDLE_ENFORCE(src_memory_p != nullptr, - "Fail to find softmax src mem_p in device context"); - auto dst_memory_p = std::static_pointer_cast( - dev_ctx.GetBlob(key_softmax_dst_mem_p)); - PADDLE_ENFORCE(dst_memory_p != nullptr, - "Fail to find softmax dst mem_p in device context"); - src_memory_p->set_data_handle( - reinterpret_cast(const_cast(input_data))); - dst_memory_p->set_data_handle(output_data); - } + const std::string key = + platform::MKLDNNHandler::GetHash(softmax_tz, ctx.op().Output("Out")); + const std::string key_softmax_pd = key + "@softmax_pd"; + + // Currently only NC data format is supported + auto softmax_md = MKLDNNMemDesc( + {softmax_tz}, platform::MKLDNNGetDataType(), memory::format::nc); + // Normalization is made after innermost dimension eg. C out of NC + auto softmax_desc = softmax_forward::desc(prop_kind::forward_scoring, + softmax_md, 1 /*dim: C*/); + auto softmax_pd = std::make_shared( + softmax_desc, mkldnn_engine); + dev_ctx.SetBlob(key_softmax_pd, softmax_pd); + + SoftmaxMKLDNNHandler handler(softmax_pd, dev_ctx, mkldnn_engine, key); + auto softmax_src_memory_p = + handler.AcquireSrcMemory(softmax_md, to_void_cast(input_data)); + auto softmax_dst_memory_p = + handler.AcquireDstMemory(softmax_md, to_void_cast(output_data)); + auto softmax_p = + handler.AcquireSoftmax(softmax_dst_memory_p, softmax_src_memory_p); std::vector pipeline{ *(static_cast(softmax_p.get()))}; @@ -120,6 +164,77 @@ class SoftmaxMKLDNNKernel : public paddle::framework::OpKernel { } }; +template +class SoftmaxMKLDNNGradKernel : public paddle::framework::OpKernel { + public: + void Compute(const paddle::framework::ExecutionContext& ctx) const override { + PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()), + "It must use CPUPlace."); + + auto& dev_ctx = ctx.template device_context(); + auto mkldnn_engine = dev_ctx.GetEngine(); + const Tensor* output = ctx.Input("Out"); + const T* dst_data = output->data(); + + auto* dout = ctx.template Input(framework::GradVarName("Out")); + const auto* diff_dst_ptr = dout->template data(); + + auto* dx = + ctx.template Output(framework::GradVarName("X")); + T* diff_src_ptr = dx->template mutable_data(ctx.GetPlace()); + + std::vector dst_tz = paddle::framework::vectorize2int(output->dims()); + std::vector src_tz(dst_tz); + PADDLE_ENFORCE(output->dims().size() == 2UL, + "The input of softmax op must be a 2D matrix."); + // MKL-DNN does support softmax over selected axis. Having 2D Tensor, + // we will make normalization after final eg. axis: 1 + PADDLE_ENFORCE(((src_tz[0] == dst_tz[0]) && (src_tz[1] == dst_tz[1])), + "Softmax input and output dimensions should match"); + // Same memory descriptor to be used for input and output + memory::dims softmax_tz = {src_tz[0], src_tz[1]}; + // Currently only supports NC data format + // retrieve eltwise primitive desc from device context + const std::string key = + platform::MKLDNNHandler::GetHash(softmax_tz, ctx.op().Input("Out")); + const std::string key_softmax_pd = key + "@softmax_pd"; + + auto softmax_pd = + std::static_pointer_cast( + dev_ctx.GetBlob(key_softmax_pd)); + PADDLE_ENFORCE(softmax_pd != nullptr, + "Fail to find softmax_pd in device context"); + + // TODO(jczaja): Add layouts support when there is a need to do so + // Two dimensional softmax does support NC format + auto data_softmax_md = MKLDNNMemDesc( + {softmax_tz}, platform::MKLDNNGetDataType(), memory::format::nc); + auto diff_softmax_md = MKLDNNMemDesc( + {softmax_tz}, platform::MKLDNNGetDataType(), memory::format::nc); + // Normalization is made after innermost dimension eg. C out of NC + auto softmax_bwd_desc = + softmax_backward::desc(diff_softmax_md, data_softmax_md, 1 /* dim: C*/); + auto softmax_bwd_pd = + std::make_shared( + softmax_bwd_desc, mkldnn_engine, *softmax_pd); + + SoftmaxMKLDNNHandler handler(softmax_pd, softmax_bwd_pd, dev_ctx, + mkldnn_engine, key); + auto dst_memory_p = + handler.AcquireDstMemory(data_softmax_md, to_void_cast(dst_data)); + auto diff_dst_memory_p = handler.AcquireDiffDstMemory( + diff_softmax_md, to_void_cast(diff_dst_ptr)); + auto diff_src_memory_p = handler.AcquireDiffSrcMemory( + diff_softmax_md, to_void_cast(diff_src_ptr)); + + // Get primitve from device context + auto softmax_bwd_p = handler.AcquireSoftmaxBackward( + dst_memory_p, diff_dst_memory_p, diff_src_memory_p); + + std::vector pipeline{*softmax_bwd_p}; + stream(stream::kind::eager).submit(pipeline).wait(); + } +}; } // namespace operators } // namespace paddle @@ -127,3 +242,5 @@ namespace ops = paddle::operators; REGISTER_OP_KERNEL(softmax, MKLDNN, ::paddle::platform::CPUPlace, ops::SoftmaxMKLDNNKernel); +REGISTER_OP_KERNEL(softmax_grad, MKLDNN, ::paddle::platform::CPUPlace, + ops::SoftmaxMKLDNNGradKernel); diff --git a/paddle/fluid/operators/softmax_op.cc b/paddle/fluid/operators/softmax_op.cc index 847b3cbd1b..31a7458f63 100644 --- a/paddle/fluid/operators/softmax_op.cc +++ b/paddle/fluid/operators/softmax_op.cc @@ -145,16 +145,30 @@ class SoftmaxOpGrad : public framework::OperatorWithKernel { const framework::ExecutionContext& ctx) const override { // choose cudnn kernel if the runtime supported. framework::LibraryType library_{framework::LibraryType::kPlain}; + std::string data_format = ctx.Attr("data_format"); + framework::DataLayout layout_ = framework::StringToDataLayout(data_format); #ifdef PADDLE_WITH_CUDA if (platform::CanCUDNNBeUsed(ctx)) { library_ = framework::LibraryType::kCUDNN; } #endif - std::string data_format = ctx.Attr("data_format"); - return framework::OpKernelType( - framework::ToDataType(ctx.Input("X")->type()), ctx.GetPlace(), - framework::StringToDataLayout(data_format), library_); +#ifdef PADDLE_WITH_MKLDNN + if (library_ == framework::LibraryType::kPlain && + platform::CanMKLDNNBeUsed(ctx)) { + library_ = framework::LibraryType::kMKLDNN; + layout_ = framework::DataLayout::kMKLDNN; + } +#endif + auto input_data_type = + framework::ToDataType(ctx.Input("X")->type()); + if (input_data_type == framework::proto::VarType::FP16) { + PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()), + "float16 can only be used on GPU place"); + } + + return framework::OpKernelType(input_data_type, ctx.GetPlace(), layout_, + library_); } }; diff --git a/paddle/fluid/platform/mkldnn_helper.h b/paddle/fluid/platform/mkldnn_helper.h index 2689d5e078..ed99932546 100644 --- a/paddle/fluid/platform/mkldnn_helper.h +++ b/paddle/fluid/platform/mkldnn_helper.h @@ -105,5 +105,137 @@ inline mkldnn::memory::format GetMKLDNNFormat( memory.dst_primitive_desc().desc().data.format); } +class MKLDNNHandler { + public: + MKLDNNHandler(const MKLDNNDeviceContext& dev_ctx, mkldnn::engine engine, + const std::string& base_key) + : dev_ctx_(dev_ctx), + engine_(engine), + key_(base_key), + is_reusing_(false) {} + + std::shared_ptr AcquireSrcMemory( + const mkldnn::memory::desc& md, void* ptr) { + return this->AcquireMemory(md, ptr, "@user_src_mem_p"); + } + + std::shared_ptr AcquireWeightsMemory( + const mkldnn::memory::desc& md, void* ptr) { + return this->AcquireMemory(md, ptr, "@user_weights_mem_p"); + } + + std::shared_ptr AcquireDstMemory( + const mkldnn::memory::desc& md, void* ptr) { + return this->AcquireMemory(md, ptr, "@user_dst_mem_p"); + } + + std::shared_ptr AcquireDiffDstMemory( + const mkldnn::memory::desc& md, void* ptr) { + return this->AcquireMemory(md, ptr, "@user_diff_dst_mem_p"); + } + + std::shared_ptr AcquireDiffSrcMemory( + const mkldnn::memory::desc& md, void* ptr) { + return this->AcquireMemory(md, ptr, "@user_diff_src_mem_p"); + } + + std::shared_ptr AcquireMemoryFromPrimitive( + mkldnn::memory::primitive_desc mdp, void* ptr, + const std::string& suffix) { + auto local_key = key_ + suffix; + auto mem_p = + std::static_pointer_cast(dev_ctx_.GetBlob(local_key)); + PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false), + "Fail to find mem primitive in device context"); + if (mem_p == nullptr) { + mem_p = std::make_shared(mdp, ptr); + dev_ctx_.SetBlob(local_key, mem_p); + } else { + mem_p->set_data_handle(ptr); + // Mark that reusing happenned. All primitives from operator instance + // should be reused or none of them. So we check consistency + is_reusing_ = true; + } + return mem_p; + } + + std::shared_ptr AcquireMemory(const mkldnn::memory::desc& md, + void* ptr, + const std::string& suffix) { + /*Generate key*/ + auto local_key = key_ + suffix; + auto mem_p = + std::static_pointer_cast(dev_ctx_.GetBlob(local_key)); + PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false), + "Fail to find mem primitive in device context"); + if (mem_p == nullptr) { + mem_p = std::make_shared( + mkldnn::memory::primitive_desc{md, engine_}, ptr); + dev_ctx_.SetBlob(local_key, mem_p); + } else { + mem_p->set_data_handle(ptr); + // Mark that reusing happenned. All primitives from operator instance + // should be reused or none of them. So we check consistency + is_reusing_ = true; + } + return mem_p; + } + + std::shared_ptr AcquireMemory( + mkldnn::memory::primitive_desc& mpd, + mkldnn::memory::primitive_desc& user_mpd, + const std::shared_ptr user_memory_p, + const std::string& suffix, std::vector& pipeline) { + // create reorder primitive if the input format is not the preferred one + auto local_key = key_ + suffix; + auto key_reorder_p = key_ + suffix + "reorder_p"; + + auto target_memory_p = + std::static_pointer_cast(dev_ctx_.GetBlob(local_key)); + PADDLE_ENFORCE((target_memory_p != nullptr) || (is_reusing_ == false), + "Fail to find mem primitive in device context"); + if (target_memory_p == nullptr) { + target_memory_p = user_memory_p; + std::shared_ptr reorder_p; + if (mpd != user_mpd) { + target_memory_p = std::make_shared(mpd); + + auto reorder_p = + std::make_shared(*user_memory_p, *target_memory_p); + dev_ctx_.SetBlob(key_reorder_p, reorder_p); + pipeline.push_back(*reorder_p); + } + dev_ctx_.SetBlob(local_key, target_memory_p); + } else { + // Make reorder if needed + auto reorder_p = std::static_pointer_cast( + dev_ctx_.GetBlob(key_reorder_p)); + if (reorder_p != nullptr) { + pipeline.push_back(*reorder_p); + } + is_reusing_ = true; + } + return target_memory_p; + } + + static std::string GetHash(mkldnn::memory::dims& operand_dims, + const std::string& suffix) { + auto dims2str = [](const mkldnn::memory::dims& operand_dims) { + std::string dstr = ""; + for (size_t i = 0; i < operand_dims.size(); ++i) { + dstr += std::to_string(operand_dims[i]) + "-"; + } + return dstr; + }; + return dims2str(operand_dims) + suffix; + }; + + protected: + const MKLDNNDeviceContext& dev_ctx_; + mkldnn::engine engine_; + std::string key_; + bool is_reusing_; +}; + } // namespace platform } // namespace paddle From 073af6237adaeba7d0dcde689be5d14184f79cdd Mon Sep 17 00:00:00 2001 From: Kexin Zhao Date: Thu, 21 Jun 2018 15:32:57 -0700 Subject: [PATCH 16/28] add print lod_tensor int64 option (#11644) --- paddle/fluid/framework/lod_tensor.cc | 10 +++++++--- paddle/fluid/framework/lod_tensor_test.cc | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index e331c8128f..d29d8ce1c5 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -51,8 +51,6 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) { } std::ostream &operator<<(std::ostream &os, const LoDTensor &t) { - PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code()); - if (!platform::is_cpu_place(t.place())) { LoDTensor tt; framework::TensorCopy(t, platform::CPUPlace(), &tt); @@ -70,7 +68,13 @@ std::ostream &operator<<(std::ostream &os, const LoDTensor &t) { // only print first ten elements int64_t size = t.numel() < 10 ? t.numel() : 10; for (int64_t i = 0; i < size; ++i) { - os << t.data()[i] << " "; + if (t.type().hash_code() == typeid(float).hash_code()) { + os << t.data()[i] << " "; + } else if (t.type().hash_code() == typeid(int64_t).hash_code()) { + os << t.data()[i] << " "; + } else { + PADDLE_THROW("LoDTensor data type not in [float, int64_t]"); + } } return os; diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index 6dfe7d2d8c..38d3cd96d6 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -26,6 +26,20 @@ namespace paddle { namespace framework { +TEST(LoD, PrintLoDTensor) { + LoDTensor tensor1; + tensor1.mutable_data(platform::CPUPlace()); + tensor1.data()[0] = 0.2; + tensor1.data()[1] = 0.5; + LOG(INFO) << tensor1; + + LoDTensor tensor2; + tensor2.mutable_data(platform::CPUPlace()); + tensor2.data()[0] = 1; + tensor2.data()[1] = 2; + LOG(INFO) << tensor2; +} + TEST(LoD, data) { LoD lod{{0, 1, 2}}; lod.push_back({0, 2, 4, 5}); @@ -37,7 +51,7 @@ TEST(LoD, data) { } } -TEST(LodExpand, test) { +TEST(LoD, ExpandLoD) { LoD lod{{0, 2}}; LoDTensor tensor; tensor.set_lod(lod); From da556ed6d441f5438cab75c8e0dd82ed62344633 Mon Sep 17 00:00:00 2001 From: chengduo Date: Fri, 22 Jun 2018 09:39:39 +0800 Subject: [PATCH 17/28] enhance ParallelExecutor stable (#11637) --- .../framework/details/broadcast_op_handle.cc | 57 +++++-------------- .../fluid/framework/details/op_handle_base.cc | 36 +++--------- .../fluid/framework/details/op_handle_base.h | 4 -- .../framework/details/reduce_op_handle.cc | 4 +- paddle/fluid/platform/device_context.h | 8 --- 5 files changed, 25 insertions(+), 84 deletions(-) diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index b0bf641d9d..1d9f1bd6e4 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -103,50 +103,23 @@ void BroadcastOpHandle::RunImpl() { }); } - // FIXME(zcd): a temporary fix for some language model that has sparse - // parameter. - bool use_mutex = true; - if (in_var->IsType()) { - use_mutex = false; - } - if (use_mutex) { - this->RunAndRecordEvent([&] { - { - platform::NCCLGroupGuard guard; - for (auto &call : broadcast_calls) { - call(); - } - } - - if (!out_handle->IsTheSameVar(*in_var_handle)) { - auto out_var = var_scopes.at(in_var_handle->scope_idx_) - ->FindVar(out_var_handles[0]->name_); - paddle::framework::TensorCopy( - in_tensor, in_var_handle->place_, - *(dev_ctxes_.at(in_var_handle->place_)), - &VariableVisitor::GetMutableTensor(out_var)); - } - }); - } else { - this->RunAndRecordEventNoMutex([&] { - { - platform::NCCLGroupGuard guard; - for (auto &call : broadcast_calls) { - call(); - } - } - - if (!out_handle->IsTheSameVar(*in_var_handle)) { - auto out_var = var_scopes.at(in_var_handle->scope_idx_) - ->FindVar(out_var_handles[0]->name_); - paddle::framework::TensorCopy( - in_tensor, in_var_handle->place_, - *(dev_ctxes_.at(in_var_handle->place_)), - &VariableVisitor::GetMutableTensor(out_var)); + this->RunAndRecordEvent([&] { + { + platform::NCCLGroupGuard guard; + for (auto &call : broadcast_calls) { + call(); } - }); - } + } + if (!out_handle->IsTheSameVar(*in_var_handle)) { + auto out_var = var_scopes.at(in_var_handle->scope_idx_) + ->FindVar(out_var_handles[0]->name_); + paddle::framework::TensorCopy( + in_tensor, in_var_handle->place_, + *(dev_ctxes_.at(in_var_handle->place_)), + &VariableVisitor::GetMutableTensor(out_var)); + } + }); #else PADDLE_THROW("CUDA is not enabled."); #endif diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index a40a881508..1f84c3b9e2 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -11,8 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include "paddle/fluid/framework/details/op_handle_base.h" +#include namespace paddle { namespace framework { @@ -122,35 +122,17 @@ void OpHandleBase::RunAndRecordEvent(const std::function &callback) { #ifdef PADDLE_WITH_CUDA if (!events_.empty()) { // Use event std::function method = callback; - + // NOTE(zcd): device context must be ordered here because RecordEvent + // will use a mutex to ensure the safe of multi-threads. + std::map ordered_ctxes; for (auto &p : dev_ctxes_) { - method = [method, p, this]() { - static_cast(p.second)->RecordEvent( - events_.at(boost::get(p.first).device), - method); - }; + ordered_ctxes.emplace(p.second, p.first); } - method(); - } else { -#endif - callback(); -#ifdef PADDLE_WITH_CUDA - } -#endif -} - -void OpHandleBase::RunAndRecordEventNoMutex( - const std::function &callback) { -#ifdef PADDLE_WITH_CUDA - if (!events_.empty()) { // Use event - std::function method = callback; - - for (auto &p : dev_ctxes_) { + for (auto &p : ordered_ctxes) { method = [method, p, this]() { - static_cast(p.second) - ->RecordEventNoMutex( - events_.at(boost::get(p.first).device), - method); + static_cast(p.first)->RecordEvent( + events_.at(boost::get(p.second).device), + method); }; } method(); diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index 775be0233a..fbd90a3296 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -85,10 +85,6 @@ class OpHandleBase { protected: void RunAndRecordEvent(const std::function &callback); - // FIXME(zcd): A temporary fix for some language model that has sparse - // parameter. - void RunAndRecordEventNoMutex(const std::function &callback); - void RunAndRecordEvent(platform::Place p, const std::function &callback); diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc index 9a626c890f..7160e346da 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.cc +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -80,9 +80,7 @@ void ReduceOpHandle::RunImpl() { } if (pre_in_var->IsType()) { - // FIXME(zcd): A temporary fix for some language model that has sparse - // parameter. - this->RunAndRecordEventNoMutex([&] { + this->RunAndRecordEvent([&] { std::vector in_selected_rows = GetInputValues(in_var_handles, var_scopes); GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p, diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index d37e5ee578..292ffef1ae 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -106,14 +106,6 @@ class CUDADeviceContext : public DeviceContext { PADDLE_ENFORCE(cudaEventRecord(ev, stream_)); } - // FIXME(zcd): A temporary fix for some language model that has sparse - // parameter. - template - void RecordEventNoMutex(cudaEvent_t ev, Callback callback) { - callback(); - PADDLE_ENFORCE(cudaEventRecord(ev, stream_)); - } - private: CUDAPlace place_; From 56a903d3ac55c51d450f003878f504a41be5bc0b Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 12:31:46 +0800 Subject: [PATCH 18/28] use optimize block list instead of first optimize block --- paddle/fluid/framework/framework.proto | 1 + paddle/fluid/framework/op_desc.cc | 13 +++++++++++++ paddle/fluid/framework/op_desc.h | 2 ++ paddle/fluid/framework/type_defs.h | 3 ++- paddle/fluid/operators/listen_and_serv_op.cc | 17 +++++++---------- paddle/fluid/pybind/protobuf.cc | 1 + python/paddle/fluid/framework.py | 6 ++++++ .../fluid/transpiler/distribute_transpiler.py | 9 +++------ 8 files changed, 35 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 68fcc104d4..8f73b3d478 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -46,6 +46,7 @@ message OpDesc { repeated bool bools = 11; optional int32 block_idx = 12; optional int64 l = 13; + repeated int32 blocks_idx = 14; }; message Var { diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index f92769192c..a190199f1c 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) { need_update_ = true; } +void OpDesc::SetBlocksAttr(const std::string &name, + std::vector blocks) { + this->attrs_[name] = blocks; + need_update_ = true; +} + void OpDesc::SetAttrMap( const std::unordered_map &attr_map) { attrs_ = attr_map; @@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor { void operator()(const std::vector &v) const { VectorToRepeated(v, attr_->mutable_bools()); } + void operator()(const std::vector &v) const { + std::vector blocks_idx; + for (auto blk : v) { + blocks_idx.push_back(blk->ID()); + } + VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx()); + } void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); } void operator()(int64_t v) const { attr_->set_l(v); } void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); } diff --git a/paddle/fluid/framework/op_desc.h b/paddle/fluid/framework/op_desc.h index a02d3e2691..74dd8ec002 100644 --- a/paddle/fluid/framework/op_desc.h +++ b/paddle/fluid/framework/op_desc.h @@ -77,6 +77,8 @@ class OpDesc { void SetBlockAttr(const std::string &name, BlockDesc *block); + void SetBlocksAttr(const std::string &name, std::vector blocks); + Attribute GetAttr(const std::string &name) const; Attribute GetNullableAttr(const std::string &name) const; diff --git a/paddle/fluid/framework/type_defs.h b/paddle/fluid/framework/type_defs.h index 4879209ece..e099e40f12 100644 --- a/paddle/fluid/framework/type_defs.h +++ b/paddle/fluid/framework/type_defs.h @@ -35,7 +35,8 @@ using VariableNameMap = std::map>; using Attribute = boost::variant, std::vector, std::vector, bool, - std::vector, BlockDesc*, int64_t>; + std::vector, BlockDesc*, int64_t, + std::vector>; using AttributeMap = std::unordered_map; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 0f2863cc59..3fc5ae6f2c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,14 +101,11 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); - auto skip_sub_blks = Attr>("skip_sub_blks"); + auto optimize_blocks = + Attr>(kOptimizeBlocks); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector optimize_block_id_list; - for (auto *block : optimize_blocks) { - optimize_block_id_list.push_back(block->ID()); - } auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( @@ -136,10 +133,10 @@ void ListenAndServOp::RunSyncLoop( std::vector parallel_blkids; parallel_blkids.push_back(optimize_blocks[0]->ID()); double ts = GetTimestamp(); - for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { + for (size_t i = 1; i < optimize_blocks.size(); ++i) { // skip the first optimize block because it is already in the // parallel_blkids. - int blkid = optimize_block_id_list[i]; + int blkid = optimize_blocks[i]->ID(); if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -263,7 +260,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, Attr>(kOptimizeBlocks); PADDLE_ENFORCE(optimize_blocks.size() > 1, "optimize blocks should be 1 at least on the pserver side."); - auto *program = optimize_block[0]->Program(); + auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); // prepare for prefetch @@ -340,8 +337,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { "a map from grad name to it's optimize block id") .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); - AddAttr(kOptimizeBlocks, - "Optimize blocks to run on server side."); + AddAttr>( + kOptimizeBlocks, "Optimize blocks to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index bcf6d4dd30..2d44e1f63c 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -293,6 +293,7 @@ void BindOpDesc(pybind11::module *m) { .def("set_attr", &pd::OpDesc::SetAttr) .def("attr", &pd::OpDesc::GetAttr) .def("set_block_attr", &pd::OpDesc::SetBlockAttr) + .def("set_blocks_attr", &pd::OpDesc::SetBlocksAttr) .def("set_serialized_attr", [](pd::OpDesc &self, const std::string &name, const pybind11::bytes &seriralized) { diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index db21b1f3c0..1843072662 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -561,6 +561,10 @@ class Operator(object): if isinstance(self.attrs[attr_name], Block): self.desc.set_block_attr(attr_name, self.attrs[attr_name].desc) + elif isinstance(self.attrs[attr_name], list) and \ + all(isinstance(v, Block) for v in self.attrs[attr_name]): + self.desc.set_blocks_attr( + attr_name, [v.desc for v in self.attrs[attr_name]]) elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ isinstance(self.attrs[attr_name], core.ProgramDesc): self.desc.set_serialized_attr( @@ -715,6 +719,8 @@ class Operator(object): self.attrs[name] = val if isinstance(val, Block): self.desc.set_block_attr(name, val.desc) + elif isinstance(val, list) and all(isinstance(v, Block) for v in val): + self.desc.set_blocks_attr(name, [v.desc for v in val]) elif isinstance(val, core.BlockDesc) or \ isinstance(val, core.ProgramDesc): self.desc.set_serialized_attr(name, val.serialize_to_string()) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 391dddcf3e..676079144e 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,7 +396,7 @@ class DistributeTranspiler(object): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): + def __clone_lr_op_sub_block__(op, program, new_block): if not op.has_attr('sub_block'): return @@ -406,7 +406,6 @@ class DistributeTranspiler(object): # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) - skip_sub_blks.append(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -416,8 +415,7 @@ class DistributeTranspiler(object): for op in origin_block.ops: self._clone_lr_op(program, new_sub_block, op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, program, new_sub_block) # reset the block of op op.set_attr('sub_block', new_sub_block) @@ -433,8 +431,7 @@ class DistributeTranspiler(object): for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) # append op to the current block grad_to_block_id = [] From d723022e1b410bb1430d5328b5b7bafe30cafc7d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 13:13:21 +0800 Subject: [PATCH 19/28] fix compile error --- paddle/fluid/operators/listen_and_serv_op.cc | 6 +++++- paddle/fluid/operators/send_recv_op_test.cc | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 3fc5ae6f2c..f852bc14a3 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -106,7 +106,11 @@ void ListenAndServOp::RunSyncLoop( PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); + std::vector optimize_blocks_idx; + for (auto blk : optimize_blocks) { + optimize_blocks_idx.push_back(blk->ID()); + } + auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( optimize_prepared.begin(), diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index e550552b19..aee6180add 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -129,7 +129,10 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { // sub program run in listen_and_serv_op, for simple test we use sum f::ProgramDesc program; const auto &root_block = program.Block(0); + std::vector optimize_blocks; auto *optimize_block = program.AppendBlock(root_block); + optimize_blocks.push_back(optimize_block); + auto *prefetch_block = program.AppendBlock(root_block); // X for server side tensors, RX for received tensors, must be of same shape. AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block, @@ -139,7 +142,7 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { attrs.insert({"Fanin", 1}); attrs.insert({"ParamList", std::vector({"Out"})}); attrs.insert({"GradList", std::vector({"x1"})}); - attrs.insert({"OptimizeBlock", optimize_block}); + attrs.insert({"optimize_blocks", optimize_blocks}); attrs.insert({"PrefetchBlock", prefetch_block}); attrs.insert({"grad_to_block_id", std::vector({""})}); attrs.insert({"sync_mode", true}); From dbca7f166ddb62fbe3bd5dc50230f6347f0863ca Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 22 Jun 2018 02:37:53 -0500 Subject: [PATCH 20/28] tune logs (#11649) --- .../operators/distributed/grpc_client.cc | 15 ++++++++-- .../fluid/operators/distributed/grpc_client.h | 6 +++- .../operators/distributed/grpc_server.cc | 28 +++++++++++++------ .../distributed/variable_response.cc | 4 +++ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 65d63784c1..52f931188d 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -18,6 +18,7 @@ limitations under the License. */ #include +#include "glog/logging.h" // For VLOG #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/platform/profiler.h" @@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep, var_h.scope = p_scope; var_h.name = var_name_val; var_h.ctx = p_ctx; + var_h.method = "Send"; + + VLOG(3) << var_h.String() << " begin"; // stub context SendProcessor* s = new SendProcessor(ch); @@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep, var_h.scope = p_scope; var_h.name = var_name_val; var_h.ctx = p_ctx; + var_h.method = "Get"; + + VLOG(3) << var_h.String() << " begin"; // stub context GetProcessor* s = new GetProcessor(ch); @@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep, var_h.scope = p_scope; var_h.name = out_var_name_val; var_h.ctx = p_ctx; + var_h.method = "Prefetch"; + + VLOG(3) << var_h.String() << " begin"; // stub context GetProcessor* s = new GetProcessor(ch); @@ -243,10 +253,11 @@ void GRPCClient::Proceed() { GPR_ASSERT(ok); PADDLE_ENFORCE(c); if (c->status_.ok()) { + VLOG(3) << c->var_h_.String() << " process"; c->Process(); } else { - LOG(FATAL) << "var: " << c->var_h_.String() - << " grpc error:" << c->status_.error_message(); + LOG(FATAL) << c->var_h_.String() + << " meets grpc error:" << c->status_.error_message(); } delete c; { diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index a6efa7dfd1..7875939ff5 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -47,14 +47,18 @@ namespace operators { namespace distributed { struct VarHandle { + // RPC endpoint. std::string ep; const platform::DeviceContext* ctx; const framework::Scope* scope; + // Variable name. std::string name; + // RPC method name. + std::string method; std::string String() const { std::ostringstream s; - s << "name:[" << name << "] ep:[" << ep << "]"; + s << method << " name:[" << name << "], ep:[" << ep << "]"; return s.str(); } }; diff --git a/paddle/fluid/operators/distributed/grpc_server.cc b/paddle/fluid/operators/distributed/grpc_server.cc index 707f665a29..b9a9b12cec 100644 --- a/paddle/fluid/operators/distributed/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc_server.cc @@ -41,6 +41,19 @@ class RequestBase { virtual ~RequestBase() {} virtual void Process() = 0; + std::string Status2String(const std::string& method) { + std::string status = "Process"; + if (status_ == FINISH) { + status = "Finish"; + } + + std::ostringstream s; + s << method << " name:[" << GetReqName() << "]" + << ", ep:[" << ctx_.peer() << "]" + << " " << status << " using req_id:" << req_id_; + return s.str(); + } + CallStatus Status() const { std::lock_guard l(status_mu_); return status_; @@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, int req_id) { std::unique_lock lock(cq_mutex_); if (is_shut_down_) { - VLOG(3) << "shutdown, do not TryToRegisterNewSendOne"; + LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne"; return; } @@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest( bool ok = false; while (true) { - VLOG(3) << "HandleRequest " << rpc_name << " wait next"; + VLOG(4) << "HandleRequest " << rpc_name << " wait next"; if (!cq->Next(&tag, &ok)) { LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!"; break; } int req_id = static_cast(reinterpret_cast(tag)); - VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id + VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id << " get next"; auto& reqs = rpc_reqs_[rpc_name]; @@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest( base = reqs[req_id]; } + VLOG(3) << base->Status2String(rpc_name); + // reference: // https://github.com/tensorflow/tensorflow/issues/5596 // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I if (!ok) { LOG(WARNING) << "completion queue:" << rpc_name - << " recv no regular event:argument name[" - << base->GetReqName() << "]"; + << " recv no regular event" + << " context:" << base->Status2String(rpc_name); TryToRegisterNewOne(rpc_name, req_id); delete base; continue; } - VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id - << ", status:" << base->Status(); - switch (base->Status()) { case PROCESS: { base->Process(); diff --git a/paddle/fluid/operators/distributed/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc index 619890b193..45832c60bf 100644 --- a/paddle/fluid/operators/distributed/variable_response.cc +++ b/paddle/fluid/operators/distributed/variable_response.cc @@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, if (total_written + size_to_write > length) { size_to_write = length - total_written; } + // This log is useful to see how long a internal block size is of rpc. + VLOG(7) << "copy " << size_to_write << " data to CUDAPlace"; memory::Copy(boost::get(place), reinterpret_cast(p), cpu, data, size_to_write, gpu_dev_ctx.stream()); @@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, } // TODO(gongwb): can we avoid copy? platform::CPUPlace cpu; + // This log is useful to see how long a internal block size is of rpc. + VLOG(7) << "copy " << size_to_write << " data to CPUPlace"; memory::Copy(cpu, reinterpret_cast(p), cpu, data, size_to_write); p += size_to_write; From aa84b21e3b39f6aeb800f6a539f5a8972ee7a7c2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 15:42:46 +0800 Subject: [PATCH 21/28] fix unit tests --- paddle/fluid/operators/listen_and_serv_op.cc | 2 +- python/paddle/fluid/framework.py | 22 +++++++++++--------- python/paddle/fluid/layers/io.py | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index f852bc14a3..f350e0f3f1 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -262,7 +262,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, auto optimize_blocks = Attr>(kOptimizeBlocks); - PADDLE_ENFORCE(optimize_blocks.size() > 1, + PADDLE_ENFORCE(optimize_blocks.size() >= 1, "optimize blocks should be 1 at least on the pserver side."); auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 1843072662..9f307f6cb4 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -558,19 +558,20 @@ class Operator(object): if (attr_name not in self.attrs) or ( self.attrs[attr_name] is None): continue - if isinstance(self.attrs[attr_name], Block): + attr_val = self.attrs[attr_name] + if isinstance(attr_val, Block): self.desc.set_block_attr(attr_name, self.attrs[attr_name].desc) - elif isinstance(self.attrs[attr_name], list) and \ - all(isinstance(v, Block) for v in self.attrs[attr_name]): - self.desc.set_blocks_attr( - attr_name, [v.desc for v in self.attrs[attr_name]]) - elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ - isinstance(self.attrs[attr_name], core.ProgramDesc): + elif isinstance(attr_val, list) and attr_val and \ + all(isinstance(v, Block) for v in attr_val): + self.desc.set_blocks_attr(attr_name, + [v.desc for v in attr_val]) + elif isinstance(attr_val, core.BlockDesc) or \ + isinstance(attr_val, core.ProgramDesc): self.desc.set_serialized_attr( - attr_name, self.attrs[attr_name].serialize_to_string()) + attr_name, attr_val.serialize_to_string()) else: - self.desc.set_attr(attr_name, self.attrs[attr_name]) + self.desc.set_attr(attr_name, attr_val) self.desc.check_attrs() if self.has_kernel(type): self.desc.infer_var_type(self.block.desc) @@ -719,7 +720,8 @@ class Operator(object): self.attrs[name] = val if isinstance(val, Block): self.desc.set_block_attr(name, val.desc) - elif isinstance(val, list) and all(isinstance(v, Block) for v in val): + elif isinstance(val, list) and val and all( + isinstance(v, Block) for v in val): self.desc.set_blocks_attr(name, [v.desc for v in val]) elif isinstance(val, core.BlockDesc) or \ isinstance(val, core.ProgramDesc): diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8d153b75cd..f3ab47c96b 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -186,7 +186,6 @@ class ListenAndServ(object): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() - empty_block = Program().global_block() parent_block.append_op( type='listen_and_serv', @@ -195,8 +194,9 @@ class ListenAndServ(object): attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'OptimizeBlock': current_block, - 'PrefetchBlock': empty_block, + 'optimize_blocks': [ + current_block + ], # did not support multiple optimize blocks in layers 'sync_mode': True, # did not support async now in layers 'grad_to_block_id': [""] }) From 8cb494f79cd1853f1d5eb1c0d7abcc36c59563ea Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Jun 2018 17:05:03 +0800 Subject: [PATCH 22/28] add blocks attr type in proto --- paddle/fluid/framework/framework.proto | 1 + paddle/fluid/operators/listen_and_serv_op.cc | 3 ++- paddle/fluid/pybind/protobuf.cc | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 8f73b3d478..2cf14bd371 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -27,6 +27,7 @@ enum AttrType { BOOLEANS = 7; BLOCK = 8; LONG = 9; + BLOCKS = 10; } // OpDesc describes an instance of a C++ framework::OperatorBase diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index f350e0f3f1..d98bf807a9 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -342,7 +342,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr>( - kOptimizeBlocks, "Optimize blocks to run on server side."); + kOptimizeBlocks, "Optimize blocks to run on server side.") + .SetDefault({}); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 2d44e1f63c..fcd3356d44 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -268,7 +268,8 @@ void BindOpDesc(pybind11::module *m) { .value("STRINGS", pd::proto::AttrType::STRINGS) .value("BOOL", pd::proto::AttrType::BOOLEAN) .value("BOOLS", pd::proto::AttrType::BOOLEANS) - .value("BLOCK", pd::proto::AttrType::BLOCK); + .value("BLOCK", pd::proto::AttrType::BLOCK) + .value("BLOCKS", pd::proto::AttrType::BLOCKS); pybind11::class_ op_desc(*m, "OpDesc", ""); op_desc From 50e750a2a183ed9dacf299f07af6d3181a59f54f Mon Sep 17 00:00:00 2001 From: Wojciech Uss Date: Fri, 22 Jun 2018 11:28:31 +0200 Subject: [PATCH 23/28] added cycling the cifar and flowers datasets --- python/paddle/v2/dataset/cifar.py | 27 ++++++++++------ python/paddle/v2/dataset/flowers.py | 50 ++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 25 deletions(-) diff --git a/python/paddle/v2/dataset/cifar.py b/python/paddle/v2/dataset/cifar.py index 0a2a1ced11..662655c836 100644 --- a/python/paddle/v2/dataset/cifar.py +++ b/python/paddle/v2/dataset/cifar.py @@ -43,7 +43,7 @@ CIFAR100_URL = URL_PREFIX + 'cifar-100-python.tar.gz' CIFAR100_MD5 = 'eb9058c3a382ffc7106e4002c42a8d85' -def reader_creator(filename, sub_name): +def reader_creator(filename, sub_name, cycle=False): def read_batch(batch): data = batch['data'] labels = batch.get('labels', batch.get('fine_labels', None)) @@ -56,10 +56,13 @@ def reader_creator(filename, sub_name): names = (each_item.name for each_item in f if sub_name in each_item.name) - for name in names: - batch = cPickle.load(f.extractfile(name)) - for item in read_batch(batch): - yield item + while True: + for name in names: + batch = cPickle.load(f.extractfile(name)) + for item in read_batch(batch): + yield item + if not cycle: + break return reader @@ -94,34 +97,40 @@ def test100(): 'test') -def train10(): +def train10(cycle=False): """ CIFAR-10 training set creator. It returns a reader creator, each sample in the reader is image pixels in [0, 1] and label in [0, 9]. + :param cycle: whether to cycle through the dataset + :type cycle: bool :return: Training reader creator :rtype: callable """ return reader_creator( paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5), - 'data_batch') + 'data_batch', + cycle=cycle) -def test10(): +def test10(cycle=False): """ CIFAR-10 test set creator. It returns a reader creator, each sample in the reader is image pixels in [0, 1] and label in [0, 9]. + :param cycle: whether to cycle through the dataset + :type cycle: bool :return: Test reader creator. :rtype: callable """ return reader_creator( paddle.v2.dataset.common.download(CIFAR10_URL, 'cifar', CIFAR10_MD5), - 'test_batch') + 'test_batch', + cycle=cycle) def fetch(): diff --git a/python/paddle/v2/dataset/flowers.py b/python/paddle/v2/dataset/flowers.py index 357a4e9b00..db12076d54 100644 --- a/python/paddle/v2/dataset/flowers.py +++ b/python/paddle/v2/dataset/flowers.py @@ -76,7 +76,8 @@ def reader_creator(data_file, dataset_name, mapper, buffered_size=1024, - use_xmap=True): + use_xmap=True, + cycle=False): ''' 1. read images from tar file and merge images into batch files in 102flowers.tgz_batch/ @@ -96,6 +97,8 @@ def reader_creator(data_file, :type mapper: callable :param buffered_size: the size of buffer used to process images :type buffered_size: int + :param cycle: whether to cycle through the dataset + :type cycle: bool :return: data reader :rtype: callable ''' @@ -108,15 +111,18 @@ def reader_creator(data_file, file_list = batch_images_from_tar(data_file, dataset_name, img2label) def reader(): - for file in open(file_list): - file = file.strip() - batch = None - with open(file, 'r') as f: - batch = cPickle.load(f) - data = batch['data'] - labels = batch['label'] - for sample, label in itertools.izip(data, batch['label']): - yield sample, int(label) - 1 + while True: + for file in open(file_list): + file = file.strip() + batch = None + with open(file, 'r') as f: + batch = cPickle.load(f) + data = batch['data'] + labels = batch['label'] + for sample, label in itertools.izip(data, batch['label']): + yield sample, int(label) - 1 + if not cycle: + break if use_xmap: cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) @@ -125,7 +131,7 @@ def reader_creator(data_file, return map_readers(mapper, reader) -def train(mapper=train_mapper, buffered_size=1024, use_xmap=True): +def train(mapper=train_mapper, buffered_size=1024, use_xmap=True, cycle=False): ''' Create flowers training set reader. It returns a reader, each sample in the reader is @@ -138,17 +144,23 @@ def train(mapper=train_mapper, buffered_size=1024, use_xmap=True): :type mapper: callable :param buffered_size: the size of buffer used to process images :type buffered_size: int + :param cycle: whether to cycle through the dataset + :type cycle: bool :return: train data reader :rtype: callable ''' return reader_creator( download(DATA_URL, 'flowers', DATA_MD5), download(LABEL_URL, 'flowers', LABEL_MD5), - download(SETID_URL, 'flowers', SETID_MD5), TRAIN_FLAG, mapper, - buffered_size, use_xmap) + download(SETID_URL, 'flowers', SETID_MD5), + TRAIN_FLAG, + mapper, + buffered_size, + use_xmap, + cycle=cycle) -def test(mapper=test_mapper, buffered_size=1024, use_xmap=True): +def test(mapper=test_mapper, buffered_size=1024, use_xmap=True, cycle=False): ''' Create flowers test set reader. It returns a reader, each sample in the reader is @@ -161,14 +173,20 @@ def test(mapper=test_mapper, buffered_size=1024, use_xmap=True): :type mapper: callable :param buffered_size: the size of buffer used to process images :type buffered_size: int + :param cycle: whether to cycle through the dataset + :type cycle: bool :return: test data reader :rtype: callable ''' return reader_creator( download(DATA_URL, 'flowers', DATA_MD5), download(LABEL_URL, 'flowers', LABEL_MD5), - download(SETID_URL, 'flowers', SETID_MD5), TEST_FLAG, mapper, - buffered_size, use_xmap) + download(SETID_URL, 'flowers', SETID_MD5), + TEST_FLAG, + mapper, + buffered_size, + use_xmap, + cycle=cycle) def valid(mapper=test_mapper, buffered_size=1024, use_xmap=True): From dda24f18b7c5aa754e7f707d259f2e15d15f29c8 Mon Sep 17 00:00:00 2001 From: Qiyang Min Date: Fri, 22 Jun 2018 05:20:18 -0500 Subject: [PATCH 24/28] Fix kill fail bug (#11635) * 1. Remove PYTHON_FLAGS from paddle_build.sh in paddlepaddle/paddle:latest-dev * 1. Add PYTHON_FLAGS back 2. Change SIGKILL to SIGINT and SIGTERM * 1. Add setup.py.in back * 1. add pip install open-cv in Dockerfile to avoid libusb_exit hanging up which is caused by the opencv-python package missing * 1. Add the && \ to line above * 1. Remove the notice comment --- Dockerfile | 3 ++- paddle/scripts/paddle_build.sh | 18 +++++++++--------- .../fluid/tests/unittests/CMakeLists.txt | 5 ++--- .../tests/unittests/test_listen_and_serv_op.py | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index 752fea5951..fc5069a6c0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -76,7 +76,8 @@ RUN easy_install -U pip && \ pip install sphinx-rtd-theme==0.1.9 recommonmark RUN pip install pre-commit 'ipython==5.3.0' && \ - pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' + pip install 'ipykernel==4.6.0' 'jupyter==1.0.0' && \ + pip install opencv-python #For docstring checker RUN pip install pylint pytest astroid isort diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index e8b3053267..ff46c5f846 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -22,7 +22,7 @@ function print_usage() { echo -e "\n${RED}Usage${NONE}: ${BOLD}${SCRIPT_NAME}${NONE} [OPTION]" - + echo -e "\n${RED}Options${NONE}: ${BLUE}build${NONE}: run build for x86 platform ${BLUE}build_android${NONE}: run build for android platform @@ -198,7 +198,7 @@ function build_android() { fi ANDROID_STANDALONE_TOOLCHAIN=$ANDROID_TOOLCHAINS_DIR/$ANDROID_ARCH-android-$ANDROID_API - + cat < Date: Fri, 22 Jun 2018 17:27:26 +0800 Subject: [PATCH 25/28] add example of clone(for_test) --- python/paddle/fluid/framework.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index db21b1f3c0..4494e30eed 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -1387,7 +1387,11 @@ class Program(object): * Set for_test to True when we want to clone the program for testing. Notes: This API DOES NOT prune any operator. Use - :code:`clone(for_test=True)` before backward and optimization please. + :code:`clone(for_test=True)` before backward and optimization please. e.g. + + >>> test_program = fluid.default_main_program().clone(for_test=True) + >>> optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) + >>> optimizer.minimize() Args: for_test(bool): True if change the :code:`is_test` attribute of From 2625178addd006a1b6a5292ff0aba310cce719b6 Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Fri, 22 Jun 2018 19:19:24 -0700 Subject: [PATCH 26/28] No NCCL on macOS (#11652) * Make paddle no longer depend on boost * Update enforce.h --- paddle/fluid/platform/dynload/CMakeLists.txt | 9 +++++++-- paddle/fluid/platform/enforce.h | 7 +++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/platform/dynload/CMakeLists.txt b/paddle/fluid/platform/dynload/CMakeLists.txt index 364c4901b2..6dd19aaeff 100644 --- a/paddle/fluid/platform/dynload/CMakeLists.txt +++ b/paddle/fluid/platform/dynload/CMakeLists.txt @@ -1,11 +1,16 @@ cc_library(dynamic_loader SRCS dynamic_loader.cc DEPS glog gflags enforce) -list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc nccl.cc) +list(APPEND CUDA_SRCS cublas.cc cudnn.cc curand.cc) + +# There is no macOS version of NCCL. +if (NOT APPLE) + list(APPEND CUDA_SRCS nccl.cc) +endif() + if (TENSORRT_FOUND) list(APPEND CUDA_SRCS tensorrt.cc) endif() - configure_file(cupti_lib_path.h.in ${CMAKE_CURRENT_BINARY_DIR}/cupti_lib_path.h) if (CUPTI_FOUND) list(APPEND CUDA_SRCS cupti.cc) diff --git a/paddle/fluid/platform/enforce.h b/paddle/fluid/platform/enforce.h index 7b8c29e1e6..a34e4371cc 100644 --- a/paddle/fluid/platform/enforce.h +++ b/paddle/fluid/platform/enforce.h @@ -44,8 +44,10 @@ limitations under the License. */ #include "paddle/fluid/platform/dynload/cublas.h" #include "paddle/fluid/platform/dynload/cudnn.h" #include "paddle/fluid/platform/dynload/curand.h" +#ifndef __APPLE__ #include "paddle/fluid/platform/dynload/nccl.h" -#endif +#endif // __APPLE__ +#endif // PADDLE_WITH_CUDA namespace paddle { namespace platform { @@ -174,6 +176,7 @@ inline typename std::enable_if::type throw_on_error( throw std::runtime_error(err + string::Sprintf(args...)); } +#ifndef __APPLE__ template inline typename std::enable_if::type throw_on_error( ncclResult_t stat, const Args&... args) { @@ -184,7 +187,7 @@ inline typename std::enable_if::type throw_on_error( string::Sprintf(args...)); } } - +#endif // __APPLE__ #endif // PADDLE_WITH_CUDA template From aebf120959e25b9c30977e2791282f031bafcc35 Mon Sep 17 00:00:00 2001 From: guochaorong Date: Sun, 24 Jun 2018 10:58:58 +0800 Subject: [PATCH 27/28] anakin build adapt --- paddle/scripts/paddle_build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index ff46c5f846..037688bde9 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -133,7 +133,7 @@ EOF -DWITH_FLUID_ONLY=${WITH_FLUID_ONLY:-OFF} \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DWITH_CONTRIB=${WITH_CONTRIB:-ON} \ - -DWITH_ANAKIN=ON + -DWITH_ANAKIN=${WITH_ANAKIN:-ON} } function abort(){ From 76e3ec600a050938d4f64b25e5ee271fdd897d8d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Sun, 24 Jun 2018 12:18:22 +0800 Subject: [PATCH 28/28] fix cloned op --- .../fluid/transpiler/distribute_transpiler.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 676079144e..f003992f3c 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,7 +396,7 @@ class DistributeTranspiler(object): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block): + def __clone_lr_op_sub_block__(op, program, lr_block): if not op.has_attr('sub_block'): return @@ -405,17 +405,17 @@ class DistributeTranspiler(object): assert isinstance(origin_block, Block) # we put the new sub block to new block to follow the block # hierarchy of the original blocks - new_sub_block = program.create_block(new_block.idx) + new_sub_block = program.create_block(lr_block.idx) # clone vars for var in origin_block.vars: new_sub_block.clone_variable(var) # clone ops - for op in origin_block.ops: - self._clone_lr_op(program, new_sub_block, op) + for origin_op in origin_block.ops: + cloned_op = self._clone_lr_op(program, new_sub_block, origin_op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block) + __clone_lr_op_sub_block__(cloned_op, program, new_sub_block) # reset the block of op op.set_attr('sub_block', new_sub_block) @@ -429,9 +429,10 @@ class DistributeTranspiler(object): pserver_program.num_blocks - 1) optimize_blocks.append(lr_decay_block) for _, op in enumerate(lr_ops): - self._append_pserver_non_opt_ops(lr_decay_block, op) + cloned_op = self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) + __clone_lr_op_sub_block__(cloned_op, pserver_program, + lr_decay_block) # append op to the current block grad_to_block_id = [] @@ -1214,7 +1215,7 @@ class DistributeTranspiler(object): if var not in program.global_block().vars: block.clone_variable(var) - block.append_op( + return block.append_op( type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs) def _append_pserver_non_opt_ops(self, optimize_block, opt_op): @@ -1252,7 +1253,7 @@ class DistributeTranspiler(object): elif not program.global_block().vars.has_key(var.name): program.global_block().clone_variable(var) - optimize_block.append_op( + return optimize_block.append_op( type=opt_op.type, inputs=inputs, outputs=outputs,