From e896926b9c3c8bed544f92ad82d42daac9fac0c0 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 5 Jun 2018 15:42:29 +0800 Subject: [PATCH 1/7] add unit test for dist mnist --- .../fluid/tests/unittests/CMakeLists.txt | 1 + .../fluid/tests/unittests/test_dist_mnist.py | 208 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_dist_mnist.py diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index c33539f6b5..2f2e9c96c1 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -52,3 +52,4 @@ py_test_modules(test_dist_train MODULES test_dist_train SERIAL) # since load cudnn libraries, so we use a longer timeout to make this # unit test stability. set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 30) +set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 60) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py new file mode 100644 index 0000000000..e6bc56cce1 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -0,0 +1,208 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import argparse +import time + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal + +SEED = 1 +DTYPE = "float32" + + +# random seed must set before configuring the network. +# fluid.default_startup_program().random_seed = SEED +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu") + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu") + + # TODO(dzhwinter) : refine the initializer and random seed settting + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale))) + return predict + + +def get_model(batch_size): + # Input data + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + # Train program + predict = cnn_model(images) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + # Optimization + opt = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, beta1=0.9, beta2=0.999) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.train(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + opt.minimize(avg_cost) + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): + t = fluid.DistributeTranspiler() + t.transpile( + trainer_id=trainer_id, + program=main_program, + pservers=pserver_endpoints, + trainers=trainers) + return t + + +def run_pserver(pserver_endpoints, trainers, current_endpoint): + get_model(batch_size=20) + t = get_transpiler(0, + fluid.default_main_program(), pserver_endpoints, + trainers) + pserver_prog = t.get_pserver_program(current_endpoint) + startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + + exe.run(pserver_prog) + + +class TestDistMnist(unittest.TestCase): + def setUp(self): + self._trainers = 1 + self._pservers = 1 + self._ps_endpoints = "127.0.0.1:9123" + + def start_pserver(self, endpoint): + p = Process( + target=run_pserver, + args=(self._ps_endpoints, self._trainers, endpoint)) + p.start() + return p.pid + + def _wait_ps_ready(self, pid): + retry_times = 5 + while True: + assert retry_times >= 0, "wait ps ready failed" + time.sleep(1) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + retry_times -= 1 + + def stop_pserver(self, pid): + os.kill(pid, signal.SIGTERM) + + def test_with_place(self): + p = fluid.CUDAPlace() if core.is_compiled_with_cuda( + ) else fluid.CPUPlace() + + pserver_pid = self.start_pserver(self._ps_endpoints) + self._wait_ps_ready(pserver_pid) + + self.run_trainer(p, 0) + + self.stop_pserver(pserver_pid) + + def run_trainer(self, place, trainer_id): + test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model( + batch_size=20) + t = get_transpiler(trainer_id, + fluid.default_main_program(), self._ps_endpoints, + self._trainers) + + trainer_prog = t.get_trainer_program() + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + feed_var_list = [ + var for var in trainer_prog.global_block().vars.itervalues() + if var.is_data + ] + + feeder = fluid.DataFeeder(feed_var_list, place) + for pass_id in xrange(10): + for batch_id, data in enumerate(train_reader()): + exe.run(trainer_prog, feed=feeder.feed(data)) + + if (batch_id + 1) % 10 == 0: + acc_set = [] + avg_loss_set = [] + for test_data in test_reader(): + acc_np, avg_loss_np = exe.run( + program=test_program, + feed=feeder.feed(test_data), + fetch_list=[batch_acc, avg_cost]) + acc_set.append(float(acc_np)) + avg_loss_set.append(float(avg_loss_np)) + # get test acc and loss + acc_val = np.array(acc_set).mean() + avg_loss_val = np.array(avg_loss_set).mean() + if float(acc_val + ) > 0.2: # Smaller value to increase CI speed + return + else: + print( + 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. + format(pass_id, batch_id + 1, + float(avg_loss_val), float(acc_val))) + if math.isnan(float(avg_loss_val)): + assert ("got Nan loss, training failed.") + + +if __name__ == "__main__": + unittest.main() From a158bd9173d745b4969e39fcc1c00681a791d251 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 5 Jun 2018 17:15:18 +0800 Subject: [PATCH 2/7] fix ci --- python/paddle/fluid/tests/unittests/test_dist_mnist.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index e6bc56cce1..e4d39c8b3f 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -27,6 +27,7 @@ import signal SEED = 1 DTYPE = "float32" +paddle.dataset.mnist.fetch() # random seed must set before configuring the network. @@ -147,7 +148,7 @@ class TestDistMnist(unittest.TestCase): os.kill(pid, signal.SIGTERM) def test_with_place(self): - p = fluid.CUDAPlace() if core.is_compiled_with_cuda( + p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( ) else fluid.CPUPlace() pserver_pid = self.start_pserver(self._ps_endpoints) From df7a1471fd1c2c003a8df1af152cd1da28f7f568 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 5 Jun 2018 19:21:35 +0800 Subject: [PATCH 3/7] fix fetch mnist dataset failed --- python/paddle/v2/dataset/mnist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/v2/dataset/mnist.py b/python/paddle/v2/dataset/mnist.py index 9f675bed89..2b959c48e4 100644 --- a/python/paddle/v2/dataset/mnist.py +++ b/python/paddle/v2/dataset/mnist.py @@ -112,7 +112,7 @@ def fetch(): paddle.v2.dataset.common.download(TRAIN_IMAGE_URL, 'mnist', TRAIN_IMAGE_MD5) paddle.v2.dataset.common.download(TRAIN_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) paddle.v2.dataset.common.download(TEST_IMAGE_URL, 'mnist', TEST_IMAGE_MD5) - paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) + paddle.v2.dataset.common.download(TEST_LABEL_URL, 'mnist', TEST_LABEL_MD5) def convert(path): From cdb705d19360cdf2ad4de0bbe4f129a70f6ca28c Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 6 Jun 2018 13:27:22 +0800 Subject: [PATCH 4/7] fix mnist dataset md5 --- python/paddle/dataset/mnist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/dataset/mnist.py b/python/paddle/dataset/mnist.py index 6a1b8b5fac..9d05aeeb95 100644 --- a/python/paddle/dataset/mnist.py +++ b/python/paddle/dataset/mnist.py @@ -111,7 +111,7 @@ def fetch(): paddle.dataset.common.download(TRAIN_IMAGE_URL, 'mnist', TRAIN_IMAGE_MD5) paddle.dataset.common.download(TRAIN_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) paddle.dataset.common.download(TEST_IMAGE_URL, 'mnist', TEST_IMAGE_MD5) - paddle.dataset.common.download(TEST_LABEL_URL, 'mnist', TRAIN_LABEL_MD5) + paddle.dataset.common.download(TEST_LABEL_URL, 'mnist', TEST_LABEL_MD5) def convert(path): From 05b6aa180594e379adfa4d5ba44d3e9ac937f5b2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 6 Jun 2018 14:02:32 +0800 Subject: [PATCH 5/7] increase dist unit test timeout --- python/paddle/fluid/tests/unittests/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 2f2e9c96c1..32176fc7e5 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -51,5 +51,5 @@ py_test_modules(test_dist_train MODULES test_dist_train SERIAL) # FIXME(Yancey1989): this test would cost much more time on CUDAPlace # since load cudnn libraries, so we use a longer timeout to make this # unit test stability. -set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 30) -set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 60) +set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 60) +set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 180) From c45a4b8567ee6b7b30bbe4a5733775970e831a88 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 27 Jun 2018 14:15:35 +0800 Subject: [PATCH 6/7] use sigkill to stop pserver --- python/paddle/fluid/tests/unittests/test_dist_mnist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index e4d39c8b3f..450ec414d1 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -145,7 +145,7 @@ class TestDistMnist(unittest.TestCase): retry_times -= 1 def stop_pserver(self, pid): - os.kill(pid, signal.SIGTERM) + os.kill(pid, signal.SIGKILL) def test_with_place(self): p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( From 7d9c9a013be761f7d9827823fda106670fe1e899 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 27 Jun 2018 16:33:28 +0800 Subject: [PATCH 7/7] update by comment --- python/paddle/fluid/tests/unittests/test_dist_mnist.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 450ec414d1..ad2d57f7c5 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -15,6 +15,7 @@ import numpy as np import argparse import time +import math import paddle import paddle.fluid as fluid @@ -145,7 +146,7 @@ class TestDistMnist(unittest.TestCase): retry_times -= 1 def stop_pserver(self, pid): - os.kill(pid, signal.SIGKILL) + os.kill(pid, signal.SIGTERM) def test_with_place(self): p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( @@ -194,7 +195,7 @@ class TestDistMnist(unittest.TestCase): acc_val = np.array(acc_set).mean() avg_loss_val = np.array(avg_loss_set).mean() if float(acc_val - ) > 0.2: # Smaller value to increase CI speed + ) > 0.8: # Smaller value to increase CI speed return else: print(