From fa84ba23507f3211f48ca142619c83843cf2f106 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 30 Oct 2018 19:29:17 +0800 Subject: [PATCH 1/6] set en empty optimize block if pserver has no optimize block --- python/paddle/fluid/transpiler/distribute_transpiler.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 8daac0f43b..b71bd48baf 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -767,6 +767,13 @@ in a single call.") prefetch_var_name_to_block_id.extend( lookup_table_var_name_to_block_id) + if optimize_blocks.size() == 0: + pre_block_idx = pserver_program.num_blocks - 1 + empty_block = pserver_program._create_block(pre_block_idx) + optimize_blocks.append(empty_block) + + # In some case, some parameter server will have no parameter to optimize + # So we give an empty optimize block to parameter server. attrs = { "optimize_blocks": optimize_blocks, "endpoint": endpoint, From a11d4f300e39fbc0b167ab5c6a4a8f7beae02fde Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 31 Oct 2018 11:37:02 +0800 Subject: [PATCH 2/6] use len instead of size for python list --- python/paddle/fluid/transpiler/distribute_transpiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index b71bd48baf..c10a1348ec 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -767,7 +767,7 @@ in a single call.") prefetch_var_name_to_block_id.extend( lookup_table_var_name_to_block_id) - if optimize_blocks.size() == 0: + if len(optimize_blocks) == 0: pre_block_idx = pserver_program.num_blocks - 1 empty_block = pserver_program._create_block(pre_block_idx) optimize_blocks.append(empty_block) From bf9764898d5844a8739189a31a29e8a7bdf2538a Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 31 Oct 2018 17:05:11 +0800 Subject: [PATCH 3/6] add TestEmptyPserverOptimizeBlocks --- .../tests/unittests/test_dist_transpiler.py | 25 +++++++++++++++++++ .../fluid/transpiler/distribute_transpiler.py | 3 ++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index c4511a98b0..2b2769cc1b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -405,6 +405,31 @@ class TestL2DecayWithPiecewise(TranspilerTest): ["sum", "scale", "scale", "elementwise_add", "momentum"]) +class TestEmptyPserverOptimizeBlocks(TranspilerTest): + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + # only one parameter + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w'), + bias_attr=False) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD(learning_rate=1.0) + sgd_optimizer.minimize(avg_cost) + + def transpiler_test_impl(self): + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + + pserver, startup = self.get_pserver(ep=self.pserver2_ep, config=config) + + self.assertEqual(len(pserver.blocks), 2) + self.assertEqual(len(pserver.blocks[1].ops), 0) + + class TestDistLookupTableBase(TranspilerTest): def network_with_table(self, is_sparse, is_distributed): self.table_size = 1000 diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index c10a1348ec..fecae9898c 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -35,6 +35,7 @@ import sys import numpy as np import collections import six +import logging from .ps_dispatcher import RoundRobin, HashName, PSDispatcher from .. import core, framework @@ -768,6 +769,7 @@ in a single call.") lookup_table_var_name_to_block_id) if len(optimize_blocks) == 0: + logging.warn("pserver [" + str(endpoint) + "] has no optimize block!!") pre_block_idx = pserver_program.num_blocks - 1 empty_block = pserver_program._create_block(pre_block_idx) optimize_blocks.append(empty_block) @@ -1282,7 +1284,6 @@ to transpile() call.") } outputs = {"ParamOut": [param_var]} # only support sgd now - import logging logging.warn( "distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of " + table_opt_op.type) From f2a205c2f52d444bf30295c07f47489a589b0907 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 31 Oct 2018 17:23:57 +0800 Subject: [PATCH 4/6] add test_pserver_run_empty_optimize_block --- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../test_pserver_run_empty_optimize_block.py | 117 ++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index e53c49b13e..9a5b7d4850 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,6 +15,7 @@ if(NOT WITH_DISTRIBUTE) list(REMOVE_ITEM TEST_OPS test_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_simple_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) + list(REMOVE_ITEM TEST_OPS test_pserver_run_empty_optimize_block) LIST(REMOVE_ITEM TEST_OPS test_dist_mnist) LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec) endif(NOT WITH_DISTRIBUTE) @@ -74,6 +75,7 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$ if(WITH_DISTRIBUTE) py_test_modules(test_dist_train MODULES test_dist_train SERIAL) set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20) + set_tests_properties(test_pserver_run_empty_optimize_block PROPERTIES TIMEOUT 20) if(NOT APPLE) set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) diff --git a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py new file mode 100644 index 0000000000..197ce9de56 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py @@ -0,0 +1,117 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import paddle +import paddle.fluid as fluid +import os +import signal +import subprocess +import time +import unittest +from multiprocessing import Process +from op_test import OpTest + + +def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + # loss function + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + # optimizer + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + ps1 = ip + ":" + str(int(port) + 1) + ps2 = ip + ":" + port + pserver_endpoints = ps1 + "," + ps2 + + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + pservers=pserver_endpoints, + trainers=trainers, + sync_mode=sync_mode) + pserver_prog = t.get_pserver_program(ps2) + + # pserver2 have no parameter + assert (len(pserver_prog.blocks), 2) + assert (len(pserver_prog.blocks[1].ops), 0) + + pserver_startup = t.get_startup_program(ps2, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + + +class TestListenAndServOp(OpTest): + def setUp(self): + self.ps_timeout = 5 + self.ip = "127.0.0.1" + self.port = "0" + self.trainers = 1 + self.trainer_id = 0 + + def _start_pserver(self, use_cuda, sync_mode): + p = Process( + target=run_pserver, + args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, + self.trainer_id)) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def test_handle_signal_in_serv_op(self): + # run pserver on CPU in sync mode + p1 = self._start_pserver(False, True) + self._wait_ps_ready(p1.pid) + + # raise SIGTERM to pserver + os.kill(p1.pid, signal.SIGINT) + p1.join() + + # run pserver on CPU in async mode + p2 = self._start_pserver(False, False) + self._wait_ps_ready(p2.pid) + + # raise SIGTERM to pserver + os.kill(p2.pid, signal.SIGTERM) + p2.join() + + +if __name__ == '__main__': + unittest.main() From ba8bbe159b99162ae28e36aff1bc2f81fcec5713 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 31 Oct 2018 17:32:04 +0800 Subject: [PATCH 5/6] add test pserver run empty block into test_listen_and_serv_op --- .../fluid/tests/unittests/CMakeLists.txt | 2 - .../unittests/test_listen_and_serv_op.py | 65 +++++++++- .../test_pserver_run_empty_optimize_block.py | 117 ------------------ 3 files changed, 61 insertions(+), 123 deletions(-) delete mode 100644 python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 9a5b7d4850..e53c49b13e 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,7 +15,6 @@ if(NOT WITH_DISTRIBUTE) list(REMOVE_ITEM TEST_OPS test_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_simple_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) - list(REMOVE_ITEM TEST_OPS test_pserver_run_empty_optimize_block) LIST(REMOVE_ITEM TEST_OPS test_dist_mnist) LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec) endif(NOT WITH_DISTRIBUTE) @@ -75,7 +74,6 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$ if(WITH_DISTRIBUTE) py_test_modules(test_dist_train MODULES test_dist_train SERIAL) set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20) - set_tests_properties(test_pserver_run_empty_optimize_block PROPERTIES TIMEOUT 20) if(NOT APPLE) set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) diff --git a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py index 48b52a5412..a0358f8b40 100644 --- a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py @@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): exe.run(pserver_prog) +def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, + trainer_id): + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + # loss function + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + # optimizer + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + ps1 = ip + ":" + str(int(port) + 1) + ps2 = ip + ":" + port + pserver_endpoints = ps1 + "," + ps2 + + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + pservers=pserver_endpoints, + trainers=trainers, + sync_mode=sync_mode) + pserver_prog = t.get_pserver_program(ps2) + + # pserver2 have no parameter + assert (len(pserver_prog.blocks) == 2) + assert (len(pserver_prog.blocks[1].ops) == 0) + + pserver_startup = t.get_startup_program(ps2, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + + class TestListenAndServOp(OpTest): def setUp(self): self.ps_timeout = 5 @@ -63,9 +103,9 @@ class TestListenAndServOp(OpTest): self.trainers = 1 self.trainer_id = 0 - def _start_pserver(self, use_cuda, sync_mode): + def _start_pserver(self, use_cuda, sync_mode, pserver_func): p = Process( - target=run_pserver, + target=pserver_func, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, self.trainer_id)) p.daemon = True @@ -92,7 +132,24 @@ class TestListenAndServOp(OpTest): def test_handle_signal_in_serv_op(self): # run pserver on CPU in sync mode - p1 = self._start_pserver(False, True) + p1 = self._start_pserver(False, True, run_pserver) + self._wait_ps_ready(p1.pid) + + # raise SIGTERM to pserver + os.kill(p1.pid, signal.SIGINT) + p1.join() + + # run pserver on CPU in async mode + p2 = self._start_pserver(False, False, run_pserver) + self._wait_ps_ready(p2.pid) + + # raise SIGTERM to pserver + os.kill(p2.pid, signal.SIGTERM) + p2.join() + + def test_list_and_serv_run_empty_optimize_block(self): + # run pserver on CPU in sync mode + p1 = self._start_pserver(False, True, run_pserver_with_empty_block) self._wait_ps_ready(p1.pid) # raise SIGTERM to pserver @@ -100,7 +157,7 @@ class TestListenAndServOp(OpTest): p1.join() # run pserver on CPU in async mode - p2 = self._start_pserver(False, False) + p2 = self._start_pserver(False, False, run_pserver_with_empty_block) self._wait_ps_ready(p2.pid) # raise SIGTERM to pserver diff --git a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py deleted file mode 100644 index 197ce9de56..0000000000 --- a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import paddle -import paddle.fluid as fluid -import os -import signal -import subprocess -import time -import unittest -from multiprocessing import Process -from op_test import OpTest - - -def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): - x = fluid.layers.data(name='x', shape=[1], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - - # loss function - cost = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_cost = fluid.layers.mean(cost) - - # optimizer - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - - ps1 = ip + ":" + str(int(port) + 1) - ps2 = ip + ":" + port - pserver_endpoints = ps1 + "," + ps2 - - config = fluid.DistributeTranspilerConfig() - config.slice_var_up = False - t = fluid.DistributeTranspiler(config=config) - t.transpile( - trainer_id, - pservers=pserver_endpoints, - trainers=trainers, - sync_mode=sync_mode) - pserver_prog = t.get_pserver_program(ps2) - - # pserver2 have no parameter - assert (len(pserver_prog.blocks), 2) - assert (len(pserver_prog.blocks[1].ops), 0) - - pserver_startup = t.get_startup_program(ps2, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - - -class TestListenAndServOp(OpTest): - def setUp(self): - self.ps_timeout = 5 - self.ip = "127.0.0.1" - self.port = "0" - self.trainers = 1 - self.trainer_id = 0 - - def _start_pserver(self, use_cuda, sync_mode): - p = Process( - target=run_pserver, - args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, - self.trainer_id)) - p.daemon = True - p.start() - return p - - def _wait_ps_ready(self, pid): - start_left_time = self.ps_timeout - sleep_time = 0.5 - while True: - assert start_left_time >= 0, "wait ps ready failed" - time.sleep(sleep_time) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - start_left_time -= sleep_time - - def test_handle_signal_in_serv_op(self): - # run pserver on CPU in sync mode - p1 = self._start_pserver(False, True) - self._wait_ps_ready(p1.pid) - - # raise SIGTERM to pserver - os.kill(p1.pid, signal.SIGINT) - p1.join() - - # run pserver on CPU in async mode - p2 = self._start_pserver(False, False) - self._wait_ps_ready(p2.pid) - - # raise SIGTERM to pserver - os.kill(p2.pid, signal.SIGTERM) - p2.join() - - -if __name__ == '__main__': - unittest.main() From d78e8f23a6c469766315e863da7aa1531ab0d491 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 1 Nov 2018 10:17:47 +0800 Subject: [PATCH 6/6] code format test=develop --- python/paddle/fluid/transpiler/distribute_transpiler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index fecae9898c..7ae98b4920 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -769,7 +769,8 @@ in a single call.") lookup_table_var_name_to_block_id) if len(optimize_blocks) == 0: - logging.warn("pserver [" + str(endpoint) + "] has no optimize block!!") + logging.warn("pserver [" + str(endpoint) + + "] has no optimize block!!") pre_block_idx = pserver_program.num_blocks - 1 empty_block = pserver_program._create_block(pre_block_idx) optimize_blocks.append(empty_block)