From af8108c9e179c859b36ed551734df1d8fce2df5f Mon Sep 17 00:00:00 2001 From: chenjianping Date: Thu, 4 Jun 2020 17:20:47 +0800 Subject: [PATCH] support host reduce scatter and allgather --- build.sh | 3 +- cmake/mind_expression.cmake | 4 +- cmake/package.cmake | 15 +- mindspore/ccsrc/CMakeLists.txt | 10 +- mindspore/ccsrc/device/CMakeLists.txt | 14 +- .../device/ascend/ascend_kernel_runtime.cc | 14 +- mindspore/ccsrc/device/cpu/mpi/mpi_adapter.cc | 191 ++++++++++++++++++ mindspore/ccsrc/device/cpu/mpi/mpi_adapter.h | 55 +++++ .../gpu/distribution/collective_fake_init.h | 1 - .../ccsrc/device/gpu/mpi/mpi_initializer.cc | 18 ++ .../ccsrc/device/gpu/mpi/mpi_initializer.h | 2 +- mindspore/ccsrc/kernel/CMakeLists.txt | 5 + .../ccsrc/kernel/cpu/allgather_cpu_kernel.cc | 62 ++++++ .../ccsrc/kernel/cpu/allgather_cpu_kernel.h | 45 +++++ .../kernel/cpu/reduce_scatter_cpu_kernel.cc | 62 ++++++ .../kernel/cpu/reduce_scatter_cpu_kernel.h | 46 +++++ tests/st/ops/cpu/test_reduce_scatter.py | 70 +++++++ 17 files changed, 597 insertions(+), 20 deletions(-) create mode 100644 mindspore/ccsrc/device/cpu/mpi/mpi_adapter.cc create mode 100644 mindspore/ccsrc/device/cpu/mpi/mpi_adapter.h create mode 100644 mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.cc create mode 100644 mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.h create mode 100644 mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.cc create mode 100644 mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.h create mode 100644 tests/st/ops/cpu/test_reduce_scatter.py diff --git a/build.sh b/build.sh index dd909e9f51..0c4d1ff90c 100755 --- a/build.sh +++ b/build.sh @@ -86,7 +86,7 @@ checkopts() ENABLE_DUMPE2E="off" ENABLE_DUMP_IR="on" COMPILE_MINDDATA="on" - ENABLE_MPI="on" + ENABLE_MPI="off" CUDA_VERSION="9.2" COMPILE_PREDICT="off" USE_GLOG="on" @@ -168,6 +168,7 @@ checkopts() if [[ "X$OPTARG" == "Xgpu" ]]; then ENABLE_GPU="on" ENABLE_CPU="on" + ENABLE_MPI="on" elif [[ "X$OPTARG" == "Xd" || "X$OPTARG" == "Xascend" ]]; then ENABLE_D="on" ENABLE_CPU="on" diff --git a/cmake/mind_expression.cmake b/cmake/mind_expression.cmake index f20683a2d8..242e9c2136 100644 --- a/cmake/mind_expression.cmake +++ b/cmake/mind_expression.cmake @@ -26,6 +26,9 @@ include_directories(${Python3_INCLUDE_DIRS}) include_directories(${CMAKE_SOURCE_DIR}/third_party) if (ENABLE_CPU) include(${CMAKE_SOURCE_DIR}/cmake/external_libs/mkl_dnn.cmake) + if (ENABLE_MPI) + include(${CMAKE_SOURCE_DIR}/cmake/external_libs/ompi.cmake) + endif() endif() if (ENABLE_GPU) @@ -36,7 +39,6 @@ if (ENABLE_GPU) if (ENABLE_MPI) include(${CMAKE_SOURCE_DIR}/cmake/external_libs/nccl.cmake) - include(${CMAKE_SOURCE_DIR}/cmake/external_libs/ompi.cmake) endif() endif() diff --git a/cmake/package.cmake b/cmake/package.cmake index 875ba5217d..653f26ffa5 100644 --- a/cmake/package.cmake +++ b/cmake/package.cmake @@ -109,19 +109,20 @@ if (ENABLE_CPU) ) endif () +if (ENABLE_MPI) + install( + TARGETS _ms_mpi + DESTINATION ${INSTALL_BASE_DIR} + COMPONENT mindspore + ) +endif () + if (ENABLE_GPU) - if (ENABLE_MPI) - install( - TARGETS _ms_mpi - DESTINATION ${INSTALL_BASE_DIR} - COMPONENT mindspore - ) install( TARGETS gpu_collective DESTINATION ${INSTALL_LIB_DIR} COMPONENT mindspore ) - endif () install( TARGETS gpu_queue DESTINATION ${INSTALL_LIB_DIR} diff --git a/mindspore/ccsrc/CMakeLists.txt b/mindspore/ccsrc/CMakeLists.txt index 1e1c650239..2085cad397 100644 --- a/mindspore/ccsrc/CMakeLists.txt +++ b/mindspore/ccsrc/CMakeLists.txt @@ -8,6 +8,10 @@ if (CMAKE_SYSTEM_NAME MATCHES "Windows") add_compile_definitions(BUILDING_DLL) endif() +if (ENABLE_MPI) + add_compile_definitions(ENABLE_MPI) +endif () + if(ENABLE_GPU) find_package(CUDA REQUIRED) find_package(Threads) @@ -120,7 +124,11 @@ endforeach () set_property(SOURCE ${SUB_OBJECTS_SRC} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_ME) add_library(mindspore STATIC ${SUB_OBJECTS_SRC}) target_link_libraries(mindspore proto_input) -target_link_libraries(mindspore securec mindspore::flatbuffers) +if (ENABLE_CPU AND ENABLE_MPI) + target_link_libraries(mindspore securec mindspore::flatbuffers mindspore::ompi) +else () + target_link_libraries(mindspore securec mindspore::flatbuffers) +endif () if (NOT WIN32) target_link_libraries(mindspore dl) endif() diff --git a/mindspore/ccsrc/device/CMakeLists.txt b/mindspore/ccsrc/device/CMakeLists.txt index 2ade0f0ef3..9f64d0e02b 100644 --- a/mindspore/ccsrc/device/CMakeLists.txt +++ b/mindspore/ccsrc/device/CMakeLists.txt @@ -14,6 +14,15 @@ endif () if (ENABLE_CPU) file(GLOB_RECURSE CPU_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "cpu/*.cc") + if (ENABLE_MPI) + # _ms_mpi + set_property(SOURCE "gpu/mpi/mpi_initializer.cc" + PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_DEVICE) + pybind11_add_module(_ms_mpi "gpu/mpi/mpi_initializer.cc") + target_link_libraries(_ms_mpi PRIVATE mindspore::pybind11_module mindspore::ompi) + else () + list(REMOVE_ITEM CPU_SRC_LIST "cpu/mpi/mpi_adapter.cc") + endif () endif () # gpu @@ -39,11 +48,6 @@ if (ENABLE_GPU) set_property(SOURCE ${GPU_COLLECTIVE_SRCS} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_DEVICE) add_library(gpu_collective SHARED ${GPU_COLLECTIVE_SRCS}) - # _ms_mpi - set_property(SOURCE "gpu/mpi/mpi_initializer.cc" - PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_DEVICE) - pybind11_add_module(_ms_mpi "gpu/mpi/mpi_initializer.cc") - target_link_libraries(_ms_mpi PRIVATE mindspore::pybind11_module mindspore::ompi) target_link_libraries(gpu_collective PRIVATE mindspore::ompi mindspore::nccl) endif () diff --git a/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc b/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc index b0442884d4..9ceb8676b2 100644 --- a/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc +++ b/mindspore/ccsrc/device/ascend/ascend_kernel_runtime.cc @@ -15,7 +15,6 @@ */ #include "device/ascend/ascend_kernel_runtime.h" - #include #include #include @@ -24,6 +23,7 @@ #include #include "device/ascend/ascend_device_address.h" +#include "device/cpu/mpi/mpi_adapter.h" #include "utils/context/ms_context.h" #include "device/ascend/profiling/profiling_manager.h" #include "hccl/hcom.h" @@ -510,11 +510,19 @@ bool AscendKernelRuntime::HcclInit() { MS_LOG(ERROR) << "file path " << config_path_str << " does not exist"; return false; } - +#ifdef ENABLE_MPI + int rank_id = device::cpu::MPIAdapter::Instance().GetRankId(); + const char *offset = std::getenv("RANK_OFFSET"); + if (offset != nullptr) { + int rank_offset = std::stoi(offset); + rank_id += rank_offset; + } + const char *identify = reinterpret_cast(std::to_string(rank_id).c_str()); +#else const char *identify = std::getenv("RANK_ID"); +#endif if (identify == nullptr) { MS_LOG(ERROR) << "get hccl rankid failed, please set env RANK_ID"; - free(full_path); return false; } MS_LOG(INFO) << "MINDSPORE_HCCL_CONFIG_PATH : " << full_path << ", RANK_ID: " << identify; diff --git a/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.cc b/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.cc new file mode 100644 index 0000000000..283885e9ba --- /dev/null +++ b/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.cc @@ -0,0 +1,191 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "device/cpu/mpi/mpi_adapter.h" +#include +#include "utils/log_adapter.h" + +namespace mindspore { +namespace device { +namespace cpu { +namespace { +MPI_Op GetMpiOp(const std::string &op_type) { + if (op_type == "sum") { + return MPI_SUM; + } else if (op_type == "max") { + return MPI_MAX; + } else if (op_type == "min") { + return MPI_MIN; + } else if (op_type == "prod") { + return MPI_PROD; + } + MS_LOG(EXCEPTION) << "unsupport op_type:" << op_type; + return MPI_SUM; +} +} // namespace + +MPIAdapter::MPIAdapter() : rank_id_(0), rank_size_(0), comm_group_world_(MPI_GROUP_NULL) { Init(); } + +MPIAdapter::~MPIAdapter() { + for (auto iter = ranks_group_.begin(); iter != ranks_group_.end(); ++iter) { + MPI_Group_free(&iter->second); + } + if (comm_group_world_ != MPI_GROUP_NULL) { + MPI_Group_free(&comm_group_world_); + } + int finalized; + MPI_Finalized(&finalized); + if (finalized == 0) { + MPI_Finalize(); + } +} + +MPIAdapter &MPIAdapter::Instance() { + static MPIAdapter instance; + return instance; +} + +int MPIAdapter::GetRankId() const { return rank_id_; } + +void MPIAdapter::Init() { + static bool init = false; + if (init) { + return; + } + int init_flag = 0; + if (MPI_Initialized(&init_flag) != MPI_SUCCESS) { + MS_LOG(EXCEPTION) << "Check mpi initialized fail!"; + } + if (init_flag == 0) { + auto ret = MPI_Init(nullptr, nullptr); + if (ret != MPI_SUCCESS) { + MS_LOG(EXCEPTION) << "Failed to init mpi!"; + } + } + + MPI_Comm_group(MPI_COMM_WORLD, &comm_group_world_); + if (comm_group_world_ == MPI_GROUP_NULL) { + MS_LOG(EXCEPTION) << "comm_group_world_ init fail!"; + } + auto ret = MPI_Comm_rank(MPI_COMM_WORLD, &rank_id_); + if (ret != MPI_SUCCESS) { + MS_LOG(EXCEPTION) << "Failed to init mpi rank id!"; + } + + ret = MPI_Comm_size(MPI_COMM_WORLD, &rank_size_); + if (ret != MPI_SUCCESS) { + MS_LOG(EXCEPTION) << "Failed to init mpi rank size!rankid:" << rank_id_; + } + init = true; +} + +MPI_Group MPIAdapter::AddGroup(const std::vector &ranks) { + if (ranks.size() > static_cast(rank_size_) || ranks.empty()) { + MS_LOG(EXCEPTION) << "input rank size: " << ranks.size() << ", max rank size: " << rank_size_; + } + + if (std::find(ranks.begin(), ranks.end(), rank_id_) == ranks.end()) { + MS_LOG(ERROR) << "rankid:" << rank_id_ << " is not in the input group."; + return MPI_GROUP_NULL; + } + std::lock_guard lock(group_mutex_); + auto iter = ranks_group_.find(ranks); + if (iter != ranks_group_.end()) { + return iter->second; + } + const auto ranks_size = ranks.size(); + std::vector ranks_input(ranks_size, 0); + for (size_t i = 0; i < ranks_size; ++i) { + ranks_input[i] = ranks[i]; + } + + MPI_Group group = MPI_GROUP_NULL; + MPI_Group_incl(comm_group_world_, ranks.size(), ranks_input.data(), &group); + if (group == MPI_GROUP_NULL) { + MS_LOG(EXCEPTION) << "create mpi group fail!rankid:" << rank_id_; + } + + ranks_group_[ranks] = group; + MS_LOG(INFO) << "rank:" << rank_id_ << " add group:" << group; + return group; +} + +bool MPIAdapter::ReduceScatter(float *input, float *output, const std::vector &ranks_group, size_t data_num, + const std::string &op_type) { + if (ranks_group.empty()) { + MS_LOG(ERROR) << "input rank group is empty!"; + return false; + } + + auto group = AddGroup(ranks_group); + if (group == MPI_GROUP_NULL) { + MS_LOG(EXCEPTION) << "Get mpi group fail!rankid:" << rank_id_; + } + MPI_Comm comm; + MPI_Comm_create_group(MPI_COMM_WORLD, group, 0, &comm); + if (comm == MPI_COMM_NULL) { + MS_LOG(EXCEPTION) << "create mpi comm fail!rankid:" << rank_id_; + } + std::vector receive_count(ranks_group.size(), 0); + for (size_t i = 0; i < ranks_group.size(); ++i) { + receive_count[i] = data_num; + } + + auto op = GetMpiOp(op_type); + auto ret = MPI_Reduce_scatter(input, output, receive_count.data(), MPI_FLOAT, op, comm); + bool result = true; + if (ret != MPI_SUCCESS) { + MS_LOG(ERROR) << "mpi reduce_scatter fail!ret = " << ret << ", rankid:" << rank_id_; + result = false; + } + + ret = MPI_Comm_free(&comm); + if (ret != MPI_SUCCESS) { + MS_LOG(WARNING) << "mpi comm free fail! ret = " << ret << ", rankid:" << rank_id_; + } + return result; +} + +bool MPIAdapter::AllGather(float *input, float *output, const std::vector &ranks_group, size_t data_num) { + if (ranks_group.empty()) { + MS_LOG(ERROR) << "input rank group is empty!"; + return false; + } + auto group = AddGroup(ranks_group); + if (group == MPI_GROUP_NULL) { + MS_LOG(EXCEPTION) << "Get mpi group fail! rankid:" << rank_id_; + } + MPI_Comm comm; + MPI_Comm_create_group(MPI_COMM_WORLD, group, 0, &comm); + if (comm == MPI_COMM_NULL) { + MS_LOG(EXCEPTION) << "create mpi comm fail! rankid:" << rank_id_; + } + + auto ret = MPI_Allgather(input, data_num, MPI_FLOAT, output, data_num, MPI_FLOAT, comm); + bool result = true; + if (ret != MPI_SUCCESS) { + MS_LOG(ERROR) << "mpi allgater fail!ret = " << ret << ", rankid:" << rank_id_; + result = false; + } + ret = MPI_Comm_free(&comm); + if (ret != MPI_SUCCESS) { + MS_LOG(WARNING) << "mpi comm free fail!ret = " << ret << ",rankid:" << rank_id_; + } + return result; +} +} // namespace cpu +} // namespace device +} // namespace mindspore diff --git a/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.h b/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.h new file mode 100644 index 0000000000..c2ed3192a9 --- /dev/null +++ b/mindspore/ccsrc/device/cpu/mpi/mpi_adapter.h @@ -0,0 +1,55 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_DEVICE_CPU_MPI_MPI_ADAPTER_H_ +#define MINDSPORE_CCSRC_DEVICE_CPU_MPI_MPI_ADAPTER_H_ +#ifdef ENABLE_MPI +#include +#include +#include +#include +#include + +namespace mindspore { +namespace device { +namespace cpu { +constexpr auto kOpTypeSum = "sum"; +class MPIAdapter { + public: + ~MPIAdapter(); + static MPIAdapter &Instance(); + int GetRankId() const; + bool ReduceScatter(float *input, float *output, const std::vector &ranks_group, size_t data_num, + const std::string &op_type = kOpTypeSum); + bool AllGather(float *input, float *output, const std::vector &ranks_group, size_t data_num); + + private: + MPIAdapter(); + void Init(); + MPI_Group AddGroup(const std::vector &ranks); + + int rank_id_; + int rank_size_; + MPI_Group comm_group_world_; + // key:ranks group, value: mpi group + std::map, MPI_Group> ranks_group_; + std::mutex group_mutex_; +}; +} // namespace cpu +} // namespace device +} // namespace mindspore +#endif // ENABLE_MPI +#endif // MINDSPORE_CCSRC_DEVICE_CPU_MPI_MPI_ADAPTER_H_ diff --git a/mindspore/ccsrc/device/gpu/distribution/collective_fake_init.h b/mindspore/ccsrc/device/gpu/distribution/collective_fake_init.h index 65467139c0..c8405f12f6 100644 --- a/mindspore/ccsrc/device/gpu/distribution/collective_fake_init.h +++ b/mindspore/ccsrc/device/gpu/distribution/collective_fake_init.h @@ -20,7 +20,6 @@ namespace mindspore { namespace device { namespace gpu { - class CollectiveFakeInitializer { public: CollectiveFakeInitializer() = default; diff --git a/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.cc b/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.cc index f2dbd4491b..bcad74e5b5 100644 --- a/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.cc +++ b/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.cc @@ -24,10 +24,28 @@ namespace mindspore { namespace device { namespace gpu { MPIInitializer::MPIInitializer() { + int init_flag = 0; + if (MPI_Initialized(&init_flag) != MPI_SUCCESS) { + return; + } + if (init_flag == 0) { + auto ret = MPI_Init(nullptr, nullptr); + if (ret != MPI_SUCCESS) { + return; + } + } MPI_Comm_rank(MPI_COMM_WORLD, &rank_id_); MPI_Comm_size(MPI_COMM_WORLD, &rank_size_); } +MPIInitializer::~MPIInitializer() { + int finalized_flag = 0; + (void)MPI_Finalized(&finalized_flag); + if (finalized_flag == 0) { + (void)MPI_Finalize(); + } +} + MPIInitializer &MPIInitializer::GetInstance() { static MPIInitializer instance; return instance; diff --git a/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.h b/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.h index 00f3b9d713..bd0a4aa948 100644 --- a/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.h +++ b/mindspore/ccsrc/device/gpu/mpi/mpi_initializer.h @@ -30,7 +30,7 @@ class MPIInitializer { private: MPIInitializer(); - ~MPIInitializer() = default; + ~MPIInitializer(); int rank_id_; int rank_size_; diff --git a/mindspore/ccsrc/kernel/CMakeLists.txt b/mindspore/ccsrc/kernel/CMakeLists.txt index 76e1631d57..226692314c 100644 --- a/mindspore/ccsrc/kernel/CMakeLists.txt +++ b/mindspore/ccsrc/kernel/CMakeLists.txt @@ -21,6 +21,11 @@ if (ENABLE_CPU) file(GLOB_RECURSE CPU_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "cpu/*.cc" ) + + if (NOT ENABLE_MPI) + list(REMOVE_ITEM CPU_SRC_LIST "cpu/allgather_cpu_kernel.cc") + list(REMOVE_ITEM CPU_SRC_LIST "cpu/reduce_scatter_cpu_kernel.cc") + endif () endif () if (ENABLE_GPU) diff --git a/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.cc new file mode 100644 index 0000000000..043fb7a0df --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.cc @@ -0,0 +1,62 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "kernel/cpu/allgather_cpu_kernel.h" +#include "device/cpu/cpu_device_address.h" +#include "device/cpu/mpi/mpi_adapter.h" +#include "ir/primitive.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace kernel { +namespace { +constexpr auto kRanksGroup = "group"; +constexpr auto kAllGatherInputNum = 1; +} // namespace + +AllGatherCPUKernel::AllGatherCPUKernel() : input_data_number_(0) {} + +void AllGatherCPUKernel::InitKernel(const CNodePtr &kernel_node) { + size_t input_num = AnfAlgo::GetInputTensorNum(kernel_node); + if (input_num != kAllGatherInputNum) { + MS_LOG(EXCEPTION) << "allgather input num:" << input_num; + } + for (size_t i = 0; i < input_num; ++i) { + auto shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, i); + size_t count = 1; + for (size_t j = 0; j < shape.size(); j++) { + count *= IntToSize(shape[j]); + } + input_data_number_ += count; + } + + auto ranks_group = AnfAlgo::GetCNodePrimitive(kernel_node)->GetAttr(kRanksGroup); + if (ranks_group != nullptr) { + ranks_group_ = GetValue>(ranks_group); + } else { + MS_LOG(EXCEPTION) << "Miss attribute " << kRanksGroup; + } +} + +bool AllGatherCPUKernel::Launch(const std::vector &inputs, + const std::vector & /*workspace*/, + const std::vector &outputs) { + auto input_addr = reinterpret_cast(inputs[0]->addr); + auto output_addr = reinterpret_cast(outputs[0]->addr); + + return device::cpu::MPIAdapter::Instance().AllGather(input_addr, output_addr, ranks_group_, input_data_number_); +} +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.h b/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.h new file mode 100644 index 0000000000..6c06d3e851 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/allgather_cpu_kernel.h @@ -0,0 +1,45 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ +#define MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ +#include +#include +#include "kernel/cpu/cpu_kernel.h" +#include "kernel/cpu/cpu_kernel_factory.h" + +namespace mindspore { +namespace kernel { +class AllGatherCPUKernel : public CPUKernel { + public: + AllGatherCPUKernel(); + ~AllGatherCPUKernel() override = default; + + void InitKernel(const CNodePtr &kernel_node) override; + + bool Launch(const std::vector &inputs, const std::vector &workspace, + const std::vector &outputs) override; + + private: + size_t input_data_number_; + std::vector ranks_group_; +}; + +MS_REG_CPU_KERNEL(HostAllGather, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), + AllGatherCPUKernel); +} // namespace kernel +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ diff --git a/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.cc b/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.cc new file mode 100644 index 0000000000..69cea98a47 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.cc @@ -0,0 +1,62 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "kernel/cpu/reduce_scatter_cpu_kernel.h" +#include "device/cpu/cpu_device_address.h" +#include "device/cpu/mpi/mpi_adapter.h" +#include "ir/primitive.h" + +namespace mindspore { +namespace kernel { +namespace { +constexpr auto kRanksGroup = "group"; +} // namespace + +ReduceScatterCPUKernel::ReduceScatterCPUKernel() : output_data_number_(0), op_type_(device::cpu::kOpTypeSum) {} + +void ReduceScatterCPUKernel::InitKernel(const CNodePtr &kernel_node) { + size_t output_num = AnfAlgo::GetOutputTensorNum(kernel_node); + for (size_t i = 0; i < output_num; ++i) { + auto shape = AnfAlgo::GetOutputInferShape(kernel_node, i); + size_t size = 1; + for (size_t j = 0; j < shape.size(); j++) { + size *= IntToSize(shape[j]); + } + output_data_number_ += size; + } + auto op = AnfAlgo::GetCNodePrimitive(kernel_node)->GetAttr("op"); + if (op != nullptr) { + op_type_ = GetValue(op); + } + + auto ranks_group = AnfAlgo::GetCNodePrimitive(kernel_node)->GetAttr(kRanksGroup); + if (ranks_group != nullptr) { + ranks_group_ = GetValue>(ranks_group); + } else { + MS_LOG(EXCEPTION) << "Miss attribute " << kRanksGroup; + } +} + +bool ReduceScatterCPUKernel::Launch(const std::vector &inputs, + const std::vector & /*workspace*/, + const std::vector &outputs) { + auto input_addr = reinterpret_cast(inputs[0]->addr); + auto output_addr = reinterpret_cast(outputs[0]->addr); + + return device::cpu::MPIAdapter::Instance().ReduceScatter(input_addr, output_addr, ranks_group_, output_data_number_, + op_type_); +} +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.h b/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.h new file mode 100644 index 0000000000..cebf649954 --- /dev/null +++ b/mindspore/ccsrc/kernel/cpu/reduce_scatter_cpu_kernel.h @@ -0,0 +1,46 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ +#define MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ +#include +#include +#include "kernel/cpu/cpu_kernel.h" +#include "kernel/cpu/cpu_kernel_factory.h" + +namespace mindspore { +namespace kernel { +class ReduceScatterCPUKernel : public CPUKernel { + public: + ReduceScatterCPUKernel(); + ~ReduceScatterCPUKernel() override = default; + + void InitKernel(const CNodePtr &kernel_node) override; + + bool Launch(const std::vector &inputs, const std::vector &workspace, + const std::vector &outputs) override; + + private: + size_t output_data_number_; + std::string op_type_; + std::vector ranks_group_; +}; + +MS_REG_CPU_KERNEL(HostReduceScatter, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), + ReduceScatterCPUKernel); +} // namespace kernel +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_KERNEL_CPU_REDUCE_SCATTER_CPU_KERNEL_H_ diff --git a/tests/st/ops/cpu/test_reduce_scatter.py b/tests/st/ops/cpu/test_reduce_scatter.py new file mode 100644 index 0000000000..2e1f019af9 --- /dev/null +++ b/tests/st/ops/cpu/test_reduce_scatter.py @@ -0,0 +1,70 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +import numpy as np +import pytest + +import mindspore.context as context +import mindspore.nn as nn +from mindspore import Tensor +from mindspore.common import dtype as mstype +from mindspore.ops import operations as P +import mindspore._ms_mpi as mpi +# run comand: +# mpirun -np 3 python test_reduce_scatter.py + +context.set_context(mode=context.GRAPH_MODE, device_target='CPU') + +class Net(nn.Cell): + def __init__(self): + super(Net, self).__init__() + self.op = "sum" + + self.reducescatter = P.HostReduceScatter(op=self.op, group=[0,1,2]) + + def construct(self, x): + return self.reducescatter(x) + +class AllGatherNet(nn.Cell): + def __init__(self): + super(AllGatherNet, self).__init__() + self.hostallgather = P.HostAllGather(group=(0, 1, 2)) + + def construct(self, x): + return self.hostallgather(x) + +def test_net_reduce_scatter(): + x = np.ones(12).astype(np.float32) * 0.1 + + reducescatter = Net() + rankid = mpi.get_rank_id() + print("self rankid:", rankid) + output = reducescatter(Tensor(x, mstype.float32)) + print("output:\n", output) + expect_result = np.ones(4).astype(np.float32) * 0.3 + diff = abs(output.asnumpy() - expect_result) + error = np.ones(shape=expect_result.shape) * 1.0e-6 + assert np.all(diff < error) + + allgather = AllGatherNet() + allgather_output = allgather(output) + print("allgather result:\n", allgather_output) + expect_allgather_result = np.ones(12).astype(np.float32) * 0.3 + diff = abs(allgather_output.asnumpy() - expect_allgather_result) + error = np.ones(shape=expect_allgather_result.shape) * 1.0e-6 + assert np.all(diff < error) + +if __name__ == '__main__': + test_net_reduce_scatter()