From a57e8a43380fe6e302853b5a2d78422837f3dfb1 Mon Sep 17 00:00:00 2001
From: chengduoZH <zhaochengduo@163.com>
Date: Mon, 11 Jun 2018 11:07:21 +0800
Subject: [PATCH] add cpu test

---
 .../fluid/framework/details/all_reduce_op_handle.cc   |  2 +-
 python/paddle/fluid/parallel_executor.py              |  7 +++----
 .../tests/unittests/parallel_executor_test_base.py    |  6 +++++-
 .../tests/unittests/test_parallel_executor_mnist.py   |  8 +++++---
 .../unittests/test_parallel_executor_seresnext.py     | 11 ++++++-----
 5 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc
index 74801e2a0d..f4c21c7958 100644
--- a/paddle/fluid/framework/details/all_reduce_op_handle.cc
+++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc
@@ -67,7 +67,7 @@ void AllReduceOpHandle::RunImpl() {
 
     if (platform::is_gpu_place(lod_tensors[0]->place())) {
 #ifdef PADDLE_WITH_CUDA
-      PADDLE_ENFORCE(nccl_ctxs_);
+      PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
       int dtype = -1;
       size_t numel = 0;
       std::vector<std::function<void()>> all_reduce_calls;
diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py
index d5445f0e17..11e4353d44 100644
--- a/python/paddle/fluid/parallel_executor.py
+++ b/python/paddle/fluid/parallel_executor.py
@@ -119,11 +119,10 @@ class ParallelExecutor(object):
             if use_cuda:
                 # Experiments on se-resnext shows that too many threads hurt
                 # performance. Worth tunning for other models in the future.
-                exec_strategy.num_threads = len(self._places) * 2
+                exec_strategy.num_threads = len(self._places) * 4
             else:
-                cpu_num = int(
-                    os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
-                exec_strategy.num_threads = min(len(self._places) * 2, cpu_num)
+                # Currently num_threads must be 1.
+                exec_strategy.num_threads = 1
 
         if build_strategy is None:
             build_strategy = BuildStrategy()
diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py
index 566b676777..829c5a1a5f 100644
--- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py
+++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import multiprocessing
+import os
 import unittest
 import paddle.fluid as fluid
 import time
@@ -73,7 +75,9 @@ class TestParallelExecutorBase(unittest.TestCase):
                 exe = fluid.Executor(place=place)
 
             if batch_size is not None:
-                batch_size *= fluid.core.get_cuda_device_count()
+                batch_size *= fluid.core.get_cuda_device_count(
+                ) if use_cuda else int(
+                    os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
             begin = time.time()
             first_loss, = run_executor(
                 exe=exe, feed=feed_dict, fetch_list=[loss.name])
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py
index 9b11a69912..a801d99aa1 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py
@@ -104,8 +104,9 @@ class TestMNIST(TestParallelExecutorBase):
     def check_simple_fc_convergence(self,
                                     balance_parameter_opt_between_cards,
                                     use_cuda=True):
-        self.check_network_convergence(simple_fc_net)
-        self.check_network_convergence(simple_fc_net, allow_op_delay=True)
+        self.check_network_convergence(simple_fc_net, use_cuda=use_cuda)
+        self.check_network_convergence(
+            simple_fc_net, use_cuda=use_cuda, allow_op_delay=True)
 
         img = np.zeros(shape=[32, 784], dtype='float32')
         label = np.ones(shape=[32, 1], dtype='int64')
@@ -142,6 +143,7 @@ class TestMNIST(TestParallelExecutorBase):
             seed=1000,
             feed_dict={"image": img,
                        "label": label},
+            use_cuda=use_cuda,
             use_parallel_executor=True,
             balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
         )
@@ -161,7 +163,7 @@ class TestMNIST(TestParallelExecutorBase):
 
     def check_batchnorm_fc_convergence(
             self, balance_parameter_opt_between_cards, use_cuda):
-        self.check_network_convergence(fc_with_batchnorm)
+        self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda)
         img = np.zeros(shape=[32, 784], dtype='float32')
         label = np.ones(shape=[32, 1], dtype='int64')
         self.check_network_convergence(
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
index e3e497c899..d178b77bed 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
@@ -133,27 +133,28 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False):
 class TestResnet(TestParallelExecutorBase):
     def check_resnet_convergence(self,
                                  balance_parameter_opt_between_cards,
-                                 use_cuda=True):
+                                 use_cuda=True,
+                                 iter=20):
         import functools
         batch_size = 2
         self.check_network_convergence(
             functools.partial(
                 SE_ResNeXt50Small, batch_size=batch_size),
-            iter=20,
+            iter=iter,
             batch_size=batch_size,
             use_cuda=use_cuda,
             balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
         )
 
     def test_resnet(self):
-        # os.environ['CPU_NUM'] = str(4)
+        os.environ['CPU_NUM'] = str(4)
         self.check_resnet_convergence(False, use_cuda=True)
-        # self.check_resnet_convergence(False,use_cuda=False)
+        self.check_resnet_convergence(False, use_cuda=False, iter=5)
 
     def test_resnet_with_new_strategy(self):
         os.environ['CPU_NUM'] = str(4)
         self.check_resnet_convergence(True, use_cuda=True)
-        self.check_resnet_convergence(True, use_cuda=False)
+        self.check_resnet_convergence(True, use_cuda=False, iter=5)
 
 
 if __name__ == '__main__':