From d9bcdac3dc614194d2a3de441c6d6179488eed7c Mon Sep 17 00:00:00 2001 From: ZPaC Date: Mon, 25 May 2020 14:46:08 +0800 Subject: [PATCH] Fix result error when calling AllReduce serially. --- .../ccsrc/device/gpu/gpu_stream_assign.cc | 30 ++++++------ .../ccsrc/device/gpu/gpu_stream_assign.h | 2 +- tests/st/nccl/test_nccl_all_reduce_op.py | 46 +++++++++++++++++++ 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/mindspore/ccsrc/device/gpu/gpu_stream_assign.cc b/mindspore/ccsrc/device/gpu/gpu_stream_assign.cc index 2550b543ec..3594081cc7 100644 --- a/mindspore/ccsrc/device/gpu/gpu_stream_assign.cc +++ b/mindspore/ccsrc/device/gpu/gpu_stream_assign.cc @@ -40,21 +40,24 @@ void AssignGpuStream(const std::shared_ptr &kernel_graph) } } if (allreduce_kernels.size() > 1) { - DeviceStream comm_stream = nullptr; - GPUDeviceManager::GetInstance().CreateStream(&comm_stream); - std::transform( - allreduce_kernels.begin(), allreduce_kernels.end(), allreduce_kernels.begin(), [&](CNodePtr allreduce_kernel) { - AnfAlgo::SetNodeAttr("stream_id", MakeValue(reinterpret_cast(comm_stream)), allreduce_kernel); - return allreduce_kernel; - }); - + // Assign multiple streams only when there's Recv node for AllReduce. std::vector send_recv_pairs; - FindAllReduceStreamSwitchPos(kernel_graph, &send_recv_pairs); - InsertStreamSwitchNode(kernel_graph, send_recv_pairs); + if (FindAllReduceStreamSwitchPos(kernel_graph, &send_recv_pairs)) { + DeviceStream comm_stream = nullptr; + GPUDeviceManager::GetInstance().CreateStream(&comm_stream); + std::transform( + allreduce_kernels.begin(), allreduce_kernels.end(), allreduce_kernels.begin(), [&](CNodePtr allreduce_kernel) { + AnfAlgo::SetNodeAttr("stream_id", MakeValue(reinterpret_cast(comm_stream)), allreduce_kernel); + return allreduce_kernel; + }); + InsertStreamSwitchNode(kernel_graph, send_recv_pairs); + } else { + return; + } } } -void FindAllReduceStreamSwitchPos(const std::shared_ptr &kernel_graph, +bool FindAllReduceStreamSwitchPos(const std::shared_ptr &kernel_graph, std::vector *send_recv_pairs) { auto execution_kernels = kernel_graph->execution_order(); std::vector::iterator iter, iter_begin; @@ -77,14 +80,15 @@ void FindAllReduceStreamSwitchPos(const std::shared_ptr &k std::vector::iterator mock_recv_node_iter = FindRecvNodePos(iter, iter_end, *iter, kAllReduceStreamSwitch); if (mock_recv_node_iter == iter_end) { - MS_LOG(WARNING) << "Can't find send node place before AllReduce node."; - continue; + MS_LOG(WARNING) << "Can't find recv node place after AllReduce node."; + return false; } SendRecvPair pair2 = {kAllReduceStreamSwitch, *iter, *mock_recv_node_iter, IntToSize(iter - iter_begin + 1), IntToSize(mock_recv_node_iter - iter_begin)}; send_recv_pairs->push_back(pair2); } } + return true; } std::vector::iterator FindSendNodePos(std::vector::iterator begin, diff --git a/mindspore/ccsrc/device/gpu/gpu_stream_assign.h b/mindspore/ccsrc/device/gpu/gpu_stream_assign.h index e3d98d68da..f8041878b2 100644 --- a/mindspore/ccsrc/device/gpu/gpu_stream_assign.h +++ b/mindspore/ccsrc/device/gpu/gpu_stream_assign.h @@ -48,7 +48,7 @@ struct StreamSwitchNode { } }; void AssignGpuStream(const std::shared_ptr &kernel_graph); -void FindAllReduceStreamSwitchPos(const std::shared_ptr &kernel_graph, +bool FindAllReduceStreamSwitchPos(const std::shared_ptr &kernel_graph, std::vector *send_recv_pairs); // Find Send node position according to "mock" recv node. // "mock" recv node is a gpu kernel node after a real Recv node, e.g. AllReduce node. diff --git a/tests/st/nccl/test_nccl_all_reduce_op.py b/tests/st/nccl/test_nccl_all_reduce_op.py index 6dee522e3f..13df46254c 100644 --- a/tests/st/nccl/test_nccl_all_reduce_op.py +++ b/tests/st/nccl/test_nccl_all_reduce_op.py @@ -75,3 +75,49 @@ def test_AllReduce(): error2 = np.ones(shape=expect2.shape) * 1.0e-5 assert np.all(diff2 < error2) assert output[2].shape() == expect2.shape + + +class Net2(nn.Cell): + def __init__(self): + super(Net2, self).__init__() + self.x1 = Parameter(initializer(Tensor(x), x.shape), name='x1') + + self.op0 = "sum" + self.op1 = "sum" + self.op2 = "sum" + + self.all_reduce1 = P.AllReduce(self.op0, group=NCCL_WORLD_COMM_GROUP) + self.all_reduce2 = P.AllReduce(self.op1, group=NCCL_WORLD_COMM_GROUP) + self.all_reduce3 = P.AllReduce(self.op2, group=NCCL_WORLD_COMM_GROUP) + + def construct(self): + x = self.all_reduce1(self.x1) + y = self.all_reduce2(x) + z = self.all_reduce3(y) + return (x, y, z) + + +def test_AllReduce2(): + all_reduce = Net2() + output = all_reduce() + + expect0 = np.ones([3, 1, 3, 3]).astype(np.float32) * 0 + for i in range(size): + part = np.ones([3, 1, 3, 3]).astype(np.float32) * 0.01 * (i + 1) + expect0 += part + diff0 = abs(output[0].asnumpy() - expect0) + error0 = np.ones(shape=expect0.shape) * 1.0e-5 + assert np.all(diff0 < error0) + assert output[0].shape() == expect0.shape + + expect1 = expect0 * size + diff1 = abs(output[1].asnumpy() - expect1) + error1 = np.ones(shape=expect1.shape) * 1.0e-5 + assert np.all(diff1 < error1) + assert output[1].shape() == expect1.shape + + expect2 = expect1 * size + diff2 = abs(output[2].asnumpy() - expect2) + error2 = np.ones(shape=expect2.shape) * 1.0e-5 + assert np.all(diff2 < error2) + assert output[2].shape() == expect2.shape