From 33965527fde82a0fb0c70b50a8af0d03be1098bc Mon Sep 17 00:00:00 2001
From: chengduo <zhaochengduo@baidu.com>
Date: Thu, 21 Mar 2019 22:34:22 -0500
Subject: [PATCH] Add unit test for fuse all reduce (#16354)

* refine fused_all_reduce_op

* add unit test in test_parallel_executor_seresnext
test=develop
---
 .../details/fused_all_reduce_op_handle.cc     | 23 ++++-----
 .../test_parallel_executor_seresnext.py       | 48 ++++++++++++++++++-
 2 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc
index b90655184b..644cd4e150 100644
--- a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc
+++ b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc
@@ -112,19 +112,20 @@ void FusedAllReduceOpHandle::RunImpl() {
         });
 
     for (size_t k = 1; k < g_tensor.size(); ++k) {
-      const void *pre_address = g_tensor.at(k - 1).second->data<void>();
+      const void *cur_address = g_tensor.at(k - 1).second->data<void>();
       int64_t len = g_tensor.at(k - 1).second->numel();
       auto offset = len * framework::SizeOfType(dtype);
-      void *next_address = reinterpret_cast<void *>(
-          reinterpret_cast<uintptr_t>(pre_address) + offset);
-      const void *cur_address = g_tensor.at(k).second->data<void>();
-      VLOG(10) << k << ", "
-               << " pre_address(" << g_tensor.at(k - 1).first
-               << "): " << pre_address << ", cur_address("
-               << g_tensor.at(k).first << "): " << cur_address
-               << ", offset:" << offset << ", " << next_address << ", "
-               << cur_address;
-      PADDLE_ENFORCE_EQ(next_address, cur_address);
+      void *infer_next_address = reinterpret_cast<void *>(
+          reinterpret_cast<uintptr_t>(cur_address) + offset);
+      const void *next_address = g_tensor.at(k).second->data<void>();
+
+      VLOG(10) << string::Sprintf(
+          "Input[%d](%s) address: 0X%02x, Input[%d](%s) address: 0X%02x, Infer "
+          "input[%d] address: 0X%02x. The offset: %d",
+          k - 1, g_tensor.at(k - 1).first, cur_address, g_tensor.at(k).first, k,
+          next_address, k, infer_next_address, offset);
+      PADDLE_ENFORCE_EQ(infer_next_address, next_address,
+                        "The address is not consistent.");
     }
   }
 
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
index 9548598d75..1f23fae92c 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py
@@ -13,6 +13,9 @@
 # limitations under the License.
 
 from __future__ import print_function
+import os
+os.environ['FLAGS_fuse_parameter_memory_size'] = "131072"
+os.environ['FLAGS_fuse_parameter_groups_size'] = "3"
 
 import paddle.fluid as fluid
 import paddle.fluid.layers.ops as ops
@@ -22,7 +25,6 @@ import paddle.fluid.core as core
 from parallel_executor_test_base import TestParallelExecutorBase
 import unittest
 import math
-import os
 import numpy as np
 
 # FIXME(zcd): If the neural net has dropout_op, the output of ParallelExecutor
@@ -312,17 +314,59 @@ class TestResnet(TestParallelExecutorBase):
         self.assertAlmostEquals(
             np.mean(parallel_last_loss), single_last_loss[0], delta=delta2)
 
+    def _compare_with_fused_all_reduce(self,
+                                       model,
+                                       use_cuda,
+                                       iter=20,
+                                       delta2=1e-5):
+        if use_cuda and not core.is_compiled_with_cuda():
+            return
+
+        global remove_bn
+        remove_bn = True
+
+        img, label = self._init_data(batch_size=batch_size)
+        all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
+            model,
+            feed_dict={"image": img,
+                       "label": label},
+            iter=iter,
+            batch_size=batch_size,
+            use_cuda=use_cuda,
+            fuse_all_reduce_ops=False,
+            optimizer=optimizer)
+        reduce_first_loss, reduce_last_loss = self.check_network_convergence(
+            model,
+            feed_dict={"image": img,
+                       "label": label},
+            iter=iter,
+            batch_size=batch_size,
+            use_cuda=use_cuda,
+            fuse_all_reduce_ops=True,
+            optimizer=optimizer)
+
+        for loss in zip(all_reduce_first_loss, reduce_first_loss):
+            self.assertAlmostEquals(loss[0], loss[1], delta=1e-5)
+        for loss in zip(all_reduce_last_loss, reduce_last_loss):
+            self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
+
     def test_seresnext_with_learning_rate_decay(self):
         self._check_resnet_convergence(model=SE_ResNeXt50Small, use_cuda=True)
         self._check_resnet_convergence(
             model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3)
 
-    def test_seresnext_with_new_strategy(self):
+    def test_seresnext_with_reduce(self):
         self._compare_reduce_and_allreduce(
             model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-2)
         self._compare_reduce_and_allreduce(
             model=SE_ResNeXt50Small, use_cuda=False, iter=5)
 
+    def test_seresnext_with_fused_all_reduce(self):
+        self._compare_with_fused_all_reduce(
+            model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-3)
+        self._compare_with_fused_all_reduce(
+            model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3)
+
 
 if __name__ == '__main__':
     unittest.main()