From bcc67401113f04cbd52438d9a861f03725ad9a1a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 23 Jan 2018 19:32:44 +0800 Subject: [PATCH 1/8] WIP python binding of send recv --- paddle/operators/recv_op.cc | 18 ++-- python/paddle/v2/fluid/layers/io.py | 101 +++++++++++++++++++ python/paddle/v2/fluid/tests/test_recv_op.py | 45 +++++++++ 3 files changed, 155 insertions(+), 9 deletions(-) create mode 100644 python/paddle/v2/fluid/tests/test_recv_op.py diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 593c35879a..e3c86966b8 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -49,7 +49,7 @@ static void CreateTensorFromMessageType(framework::Variable *var, var->GetMutable(); } else { PADDLE_THROW( - "VariableMessage type %d is not in " + "VraibleMessage type %d is not in " "[LoDTensor, SelectedRows]", var_type); } @@ -121,17 +121,17 @@ class RecvOp : public framework::OperatorBase { if (it != grad_list.end()) { param_var_name = param_list[it - grad_list.begin()]; } else { - LOG(ERROR) << "grad has no paired param:" << grad_var_name; + LOG(ERROR) << "grad have no paired param:" << grad_var_name; } - VLOG(3) << "received grad: " << grad_var_name + VLOG(3) << "recved grad: " << grad_var_name << " updating param: " << param_var_name; if (fan_in > 1) { grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); } auto *var = recv_scope.FindVar(grad_var_name); if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; - PADDLE_THROW("Can not find server side var"); + LOG(ERROR) << "can not find server side var: " << grad_var_name; + PADDLE_THROW("can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); } @@ -161,11 +161,11 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); + // AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); AddComment(R"DOC( Recv operator -This operator will recieve tensor from send_op +This operator will recv tensor from send_op )DOC"); AddAttr("endpoint", "(string, default 127.0.0.1:6164)" @@ -176,11 +176,11 @@ This operator will recieve tensor from send_op kOptimizeBlock, "Serialized ProgramDesc string for recv to run."); AddAttr>( "ParamList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") + "grad->param name mapping to find which param to optimize.") .SetDefault({}); AddAttr>( "GradList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") + "grad->param name mapping to find which param to optimize.") .SetDefault({}); AddAttr("Fanin", "type int", "Number of trainers in the current cluster job") diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index 9af00e7de5..6a6c561641 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -74,3 +74,104 @@ def data(name, type=type, stop_gradient=stop_gradient, lod_level=lod_level) + + +class BlockGuardServ(BlockGuard): + """ + BlockGuardServ class. + + BlockGuardServ class is used to create an op with a block in a program. + """ + + def __init__(self, server): + if not (isinstance(server, ListenAndServ)): + raise TypeError("BlockGuardServ takes a ListenAndServ") + super(BlockGuardServ, self).__init__(server.helper.main_program) + self.server = server + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + + self.server.complete_op() + return super(BlockGuardServ, self).__exit__(exc_type, exc_val, exc_tb) + + +class ListenAndServ(object): + """ + ListenAndServ class. + + ListenAndServ class is used to wrap listen_and_serv op to create a server + which can receive variables from clients and run a block. + """ + + def __init__(self, endpoint, fan_in=1): + self.helper = LayerHelper("recv", name=name) + self.inputs = [] + self.outputs = [] + self.endpoint = endpoint + self.fan_in = fan_in + + def do(self): + return BlockGuardServ(self) + + def get_params_and_grads(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + # params and grads in the same order. + params = list() + grads = list() + for op in current_block.ops: + # FIXME(typhoonzero): op.inputs is None if it's cloned. + if "Grad" in op.inputs and "Param" in op.inputs: + params.append(op.inputs["Param"].name) + grads.append(op.inputs["Grad"].name) + + return params, grads + + def complete_op(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + params, grads = self.get_params_and_grads() + parent_block.append_op( + type='recv', + inputs={}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'ParamList': params, + 'GradList': grads, + 'OptimizeBlock': current_block + }) + + +def Send(endpoints, send_vars, get_vars): + """ + Send layer + + Args: + endpoints: comma seperated IP:PORT pairs in the order + of send_vars to send + send_vars: vars to send + get_vars: vars to get from server after send completes. + + Send variables to the server side, and get vars from server + side when server have finished running server side program. + """ + assert (type(send_vars) == list) + assert (type(get_vars) == list) + + epmap = endpoints.split(",") + endpoints = set(epmap) + + helper = LayerHelper("Send", **locals()) + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, + "epmap": epmap}) diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py new file mode 100644 index 0000000000..fbd182a716 --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -0,0 +1,45 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle.v2.fluid as fluid +import paddle.v2.fluid.layers as layers +import numpy + + +class TestRecvOp(unittest.TestCase): + def run_test(self): + # Run init_serv in a thread + pass + + def init_serv(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data(shape=[32, 32], dtype='float32', name='X') + serv = fluid.ListenAndServ("127.0.0.1:6174") + with serv.do(): + layers.scale(input=x, scale=10) + exe = fluid.Executor(place) + exe.run(main) + + def init_client(self, place): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data(shape=[32, 32], dtype='float32', name='X') + i = fluid.initializer.Constant(x=1.0) + i(x, main.global_block()) + layers.Send("127.0.0.1:6174", [x], [x]) + exe = fluid.Executor(place) + exe.run(main) From 0e850c7417084675dcc997768c7f854333625bfe Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 23 Jan 2018 20:26:00 +0800 Subject: [PATCH 2/8] WIP --- python/paddle/v2/fluid/layers/io.py | 23 +++++++++++++++----- python/paddle/v2/fluid/tests/test_recv_op.py | 21 +++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index 6a6c561641..be581531d1 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -14,8 +14,10 @@ from .. import core from ..layer_helper import LayerHelper +from control_flow import BlockGuard +from ..layer_helper import LayerHelper -__all__ = ['data'] +__all__ = ['data', 'BlockGuardServ', 'ListenAndServ', 'Send'] def data(name, @@ -105,12 +107,14 @@ class ListenAndServ(object): which can receive variables from clients and run a block. """ - def __init__(self, endpoint, fan_in=1): - self.helper = LayerHelper("recv", name=name) + def __init__(self, endpoint, fan_in=1, optimizer_mode=True): + self.helper = LayerHelper("recv") self.inputs = [] self.outputs = [] self.endpoint = endpoint self.fan_in = fan_in + # FIXME(typhoonzero): Add this switch is stupid + self.optimizer_mode = optimizer_mode def do(self): return BlockGuardServ(self) @@ -124,9 +128,16 @@ class ListenAndServ(object): grads = list() for op in current_block.ops: # FIXME(typhoonzero): op.inputs is None if it's cloned. - if "Grad" in op.inputs and "Param" in op.inputs: - params.append(op.inputs["Param"].name) - grads.append(op.inputs["Grad"].name) + if self.optimizer_mode: + if "Grad" in op.inputs and "Param" in op.inputs: + params.append(op.inputs["Param"].name) + grads.append(op.inputs["Grad"].name) + else: + # simple recv mode, recv operators inputs. + for iname in op.input_names: + for in_var_name in op.input(iname): + params.append(parent_block.var(name)) + grads.append(parent_block.var(name)) return params, grads diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py index fbd182a716..e06f468648 100644 --- a/python/paddle/v2/fluid/tests/test_recv_op.py +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -17,20 +17,27 @@ import unittest import paddle.v2.fluid as fluid import paddle.v2.fluid.layers as layers import numpy +import threading class TestRecvOp(unittest.TestCase): - def run_test(self): + def test_send(self): # Run init_serv in a thread - pass + place = fluid.CPUPlace() + t = threading.Thread(target=self.init_serv, args=(place, )) + t.start() + self.init_client(place) + t.join() def init_serv(self, place): main = fluid.Program() with fluid.program_guard(main): x = layers.data(shape=[32, 32], dtype='float32', name='X') - serv = fluid.ListenAndServ("127.0.0.1:6174") + i = fluid.initializer.Constant(value=1.0) + y = i(x, main.global_block()) + serv = layers.ListenAndServ("127.0.0.1:6174") with serv.do(): - layers.scale(input=x, scale=10) + layers.scale(input=y, scale=10.0) exe = fluid.Executor(place) exe.run(main) @@ -38,8 +45,12 @@ class TestRecvOp(unittest.TestCase): main = fluid.Program() with fluid.program_guard(main): x = layers.data(shape=[32, 32], dtype='float32', name='X') - i = fluid.initializer.Constant(x=1.0) + i = fluid.initializer.Constant(value=1.0) i(x, main.global_block()) layers.Send("127.0.0.1:6174", [x], [x]) exe = fluid.Executor(place) exe.run(main) + + +if __name__ == "__main__": + unittest.main() From 9a8517fd8b909baddac7945f403763ec1e7bd0c7 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 24 Jan 2018 14:43:56 +0800 Subject: [PATCH 3/8] daemonize the server process --- python/paddle/v2/fluid/layers/io.py | 19 ++++++--- python/paddle/v2/fluid/tests/test_recv_op.py | 44 ++++++++++++++------ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index be581531d1..bc804a4043 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -136,17 +136,26 @@ class ListenAndServ(object): # simple recv mode, recv operators inputs. for iname in op.input_names: for in_var_name in op.input(iname): - params.append(parent_block.var(name)) - grads.append(parent_block.var(name)) + params.append(parent_block.var(in_var_name)) + grads.append(parent_block.var(in_var_name)) return params, grads + def parent_block(self): + prog = self.helper.main_program + parent_idx = prog.current_block().parent_idx + assert parent_idx >= 0 + parent_block = prog.block(parent_idx) + return parent_block + def complete_op(self): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() params, grads = self.get_params_and_grads() + param_names = [p.name for p in params] + grad_names = [g.name for g in grads] parent_block.append_op( type='recv', inputs={}, @@ -154,8 +163,8 @@ class ListenAndServ(object): attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'ParamList': params, - 'GradList': grads, + 'ParamList': param_names, + 'GradList': grad_names, 'OptimizeBlock': current_block }) @@ -177,7 +186,7 @@ def Send(endpoints, send_vars, get_vars): assert (type(get_vars) == list) epmap = endpoints.split(",") - endpoints = set(epmap) + endpoints = list(set(epmap)) helper = LayerHelper("Send", **locals()) helper.append_op( diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py index e06f468648..6ebb58ed33 100644 --- a/python/paddle/v2/fluid/tests/test_recv_op.py +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -17,40 +17,60 @@ import unittest import paddle.v2.fluid as fluid import paddle.v2.fluid.layers as layers import numpy -import threading +from multiprocessing import Process +import os, sys class TestRecvOp(unittest.TestCase): def test_send(self): # Run init_serv in a thread place = fluid.CPUPlace() - t = threading.Thread(target=self.init_serv, args=(place, )) - t.start() + p = Process(target=self.init_serv, args=(place, )) + p.daemon = True + p.start() self.init_client(place) - t.join() + # FIXME(typhoonzero): find a way to gracefully shutdown the server. + os.system("kill -9 %d" % p.pid) + p.join() def init_serv(self, place): main = fluid.Program() with fluid.program_guard(main): - x = layers.data(shape=[32, 32], dtype='float32', name='X') - i = fluid.initializer.Constant(value=1.0) - y = i(x, main.global_block()) - serv = layers.ListenAndServ("127.0.0.1:6174") + x = layers.data( + shape=[32, 32], + dtype='float32', + name="X", + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) + serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False) with serv.do(): - layers.scale(input=y, scale=10.0) + o = layers.scale(x=x, scale=10.0) + main.global_block().create_var( + name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + print main exe = fluid.Executor(place) exe.run(main) def init_client(self, place): main = fluid.Program() with fluid.program_guard(main): - x = layers.data(shape=[32, 32], dtype='float32', name='X') - i = fluid.initializer.Constant(value=1.0) - i(x, main.global_block()) + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) layers.Send("127.0.0.1:6174", [x], [x]) + print main exe = fluid.Executor(place) exe.run(main) if __name__ == "__main__": unittest.main() + # test = TestRecvOp() + # place = fluid.CPUPlace() + # if sys.argv[1] == "server": + # test.init_serv(place) + # else: + # test.init_client(place) From b9d9b11c804c5f5efe645acd10fde7bb6641a317 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 24 Jan 2018 15:00:46 +0800 Subject: [PATCH 4/8] remove recv_op input --- paddle/operators/recv_op.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index e3c86966b8..381890d30b 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -161,7 +161,6 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { public: RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - // AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable(); AddComment(R"DOC( Recv operator From e206c636fd154435973f9eeb5a8800e500bea0fa Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 24 Jan 2018 16:39:35 +0800 Subject: [PATCH 5/8] update by comment, add WITH_DISTRIBUTE switch --- python/paddle/v2/fluid/tests/CMakeLists.txt | 5 +++++ python/paddle/v2/fluid/tests/test_recv_op.py | 8 -------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index 8305316082..628ce60b40 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -1,5 +1,10 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + +if(NOT WITH_DISTRIBUTE) + list(REMOVE_ITEM TEST_OPS test_recv_op) +endif(NOT WITH_DISTRIBUTE) + foreach(src ${TEST_OPS}) py_test(${src} SRCS ${src}.py) endforeach() diff --git a/python/paddle/v2/fluid/tests/test_recv_op.py b/python/paddle/v2/fluid/tests/test_recv_op.py index 6ebb58ed33..5c4cec028d 100644 --- a/python/paddle/v2/fluid/tests/test_recv_op.py +++ b/python/paddle/v2/fluid/tests/test_recv_op.py @@ -47,7 +47,6 @@ class TestRecvOp(unittest.TestCase): o = layers.scale(x=x, scale=10.0) main.global_block().create_var( name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) - print main exe = fluid.Executor(place) exe.run(main) @@ -61,16 +60,9 @@ class TestRecvOp(unittest.TestCase): append_batch_size=False) fluid.initializer.Constant(value=1.0)(x, main.global_block()) layers.Send("127.0.0.1:6174", [x], [x]) - print main exe = fluid.Executor(place) exe.run(main) if __name__ == "__main__": unittest.main() - # test = TestRecvOp() - # place = fluid.CPUPlace() - # if sys.argv[1] == "server": - # test.init_serv(place) - # else: - # test.init_client(place) From 35b4d42ab69ee598df0c973c8884a053c42adb21 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 24 Jan 2018 16:54:35 +0800 Subject: [PATCH 6/8] merge doc fixes --- paddle/operators/recv_op.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 381890d30b..5d1df566af 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -49,7 +49,7 @@ static void CreateTensorFromMessageType(framework::Variable *var, var->GetMutable(); } else { PADDLE_THROW( - "VraibleMessage type %d is not in " + "VariableMessage type %d is not in " "[LoDTensor, SelectedRows]", var_type); } @@ -121,17 +121,17 @@ class RecvOp : public framework::OperatorBase { if (it != grad_list.end()) { param_var_name = param_list[it - grad_list.begin()]; } else { - LOG(ERROR) << "grad have no paired param:" << grad_var_name; + LOG(ERROR) << "grad has no paired param:" << grad_var_name; } - VLOG(3) << "recved grad: " << grad_var_name + VLOG(3) << "received grad: " << grad_var_name << " updating param: " << param_var_name; if (fan_in > 1) { grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); } auto *var = recv_scope.FindVar(grad_var_name); if (var == nullptr) { - LOG(ERROR) << "can not find server side var: " << grad_var_name; - PADDLE_THROW("can not find server side var"); + LOG(ERROR) << "Can not find server side var: " << grad_var_name; + PADDLE_THROW("Can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); } @@ -164,7 +164,7 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker { AddComment(R"DOC( Recv operator -This operator will recv tensor from send_op +This operator will recieve tensor from send_op )DOC"); AddAttr("endpoint", "(string, default 127.0.0.1:6164)" @@ -175,11 +175,11 @@ This operator will recv tensor from send_op kOptimizeBlock, "Serialized ProgramDesc string for recv to run."); AddAttr>( "ParamList", "type list of string", - "grad->param name mapping to find which param to optimize.") + "grad->param name mapping to find which parameters to optimize.") .SetDefault({}); AddAttr>( "GradList", "type list of string", - "grad->param name mapping to find which param to optimize.") + "grad->param name mapping to find which parameters to optimize.") .SetDefault({}); AddAttr("Fanin", "type int", "Number of trainers in the current cluster job") From dc7073ded23126bde7a1e7d514b9174cce23a19c Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 25 Jan 2018 10:52:25 +0800 Subject: [PATCH 7/8] update by comment. --- python/paddle/v2/fluid/layers/io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/v2/fluid/layers/io.py b/python/paddle/v2/fluid/layers/io.py index bc804a4043..b7b2cf2296 100644 --- a/python/paddle/v2/fluid/layers/io.py +++ b/python/paddle/v2/fluid/layers/io.py @@ -113,7 +113,8 @@ class ListenAndServ(object): self.outputs = [] self.endpoint = endpoint self.fan_in = fan_in - # FIXME(typhoonzero): Add this switch is stupid + # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more + # general. self.optimizer_mode = optimizer_mode def do(self): From 82d924984f75cecce2626ecb7376f2424b50aaae Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 29 Jan 2018 14:45:10 +0800 Subject: [PATCH 8/8] update dist transpiler --- python/paddle/v2/fluid/distribute_transpiler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index abcad899bf..4e54ab806b 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -221,7 +221,7 @@ class DistributeTranspiler: if len(splited_vars) <= 1: continue orig_var = program.global_block().vars[varname] - if orig_var == core.VarDesc.VarType.SELECTED_ROWS: + if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS: height_sections = [] for v in splited_vars: height_sections.append(v.shape[0]) @@ -230,7 +230,7 @@ class DistributeTranspiler: inputs={"X": orig_var}, outputs={"Out": splited_vars}, attrs={"height_sections": height_sections}) - elif orig_var == core.VarDesc.VarType.LOD_TENSOR: + elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR: sections = [] for v in splited_vars: sections.append(v.shape[0]) @@ -470,8 +470,7 @@ class DistributeTranspiler: # Append the recv op pserver_program.global_block().append_op( type="recv", - inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] - }, # grads to recv + inputs={}, outputs={}, attrs={ "OptimizeBlock": optimize_sub_program.global_block(),