From 830172201ad07f08ba4c739bc8e66d30acad9637 Mon Sep 17 00:00:00 2001 From: ZPaC Date: Fri, 21 Aug 2020 18:20:21 +0800 Subject: [PATCH] Fix multi server precision error. --- .../kernel_compiler/cpu/ps/pserver_kernel.h | 19 +++++++++++++++ .../cpu/ps/sparse_apply_adam_ps_kernel.cc | 4 +++- .../cpu/ps/sparse_apply_ftrl_ps_kernel.cc | 4 +++- .../ps/sparse_apply_lazy_adam_ps_kernel.cc | 4 +++- .../frontend/parallel/ps/parameter_server.h | 1 + mindspore/ccsrc/frontend/parallel/ps/util.cc | 6 +++++ mindspore/ccsrc/frontend/parallel/ps/util.h | 3 +++ .../ccsrc/frontend/parallel/ps/worker_proxy.h | 1 + mindspore/ccsrc/pipeline/jit/init.cc | 6 +++++ mindspore/parallel/_ps_utils.py | 23 +++++++++++++++++++ mindspore/train/callback/_checkpoint.py | 3 +++ 11 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 mindspore/parallel/_ps_utils.py diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.h b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.h index af0a1fe342..ba0f6c8f8f 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.h +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.h @@ -45,6 +45,22 @@ class PServerKernel { protected: virtual void ReInit(const std::vector &) {} + + void SetTotalRowCnt(size_t total_cnt) { + MS_LOG(INFO) << "Total row count of server " << rank_id_ << " is " << total_cnt; + total_row_cnt_ = total_cnt; + } + + void CalOffset() { + size_t rem = total_row_cnt_ % pserver_num_; + if (rem == 0) { + row_offset_ = total_row_cnt_ / pserver_num_ * rank_id_; + } else { + row_offset_ = std::round((static_cast(total_row_cnt_)) / pserver_num_) * rank_id_; + } + MS_LOG(INFO) << "Row offset of server " << rank_id_ << " is " << row_offset_; + } + void Shard(std::vector *shape, int axis) { (*shape)[axis] = Util::LocalShard((*shape)[axis], rank_id_, pserver_num_); } @@ -52,6 +68,9 @@ class PServerKernel { size_t rank_id_; size_t pserver_num_; size_t worker_num_; + + size_t total_row_cnt_; + size_t row_offset_; }; } // namespace ps } // namespace kernel diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc index 4ae5745133..a9c423973d 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.cc @@ -31,6 +31,8 @@ void SparseApplyAdamPSKernel::InitKernel( const std::vector &grad_shape = *(shape_vec[9]); const std::vector &indices_shape = *(shape_vec[10]); + SetTotalRowCnt(var_shape[0]); + CalOffset(); Shard(&var_shape, 0); Shard(&m_shape, 0); Shard(&v_shape, 0); @@ -85,7 +87,7 @@ bool SparseApplyAdamPSKernel::Execute(const std::vector &inputs, con ReInit(inputs); int *indices = reinterpret_cast(inputs[10]->addr); for (size_t i = 0; i < inputs[10]->size / sizeof(int); i++) { - indices[i] -= rank_id_ * var_first_dim_size_; + indices[i] -= row_offset_; } return Launch(inputs, workspace, outputs); } diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc index b5f5f0259c..0e0ca08518 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.cc @@ -28,6 +28,8 @@ void SparseApplyFtrlPSKernel::InitKernel( std::vector grad_shape = *(shape_vec[3]); std::vector indices_shape = *(shape_vec[4]); + SetTotalRowCnt(var_shape[0]); + CalOffset(); Shard(&var_shape, 0); Shard(&accum_shape, 0); Shard(&linear_shape, 0); @@ -88,7 +90,7 @@ bool SparseApplyFtrlPSKernel::Execute(const std::vector &inputs, con ReInit(inputs); int *indices = reinterpret_cast(inputs[4]->addr); for (size_t i = 0; i < inputs[4]->size / sizeof(int); i++) { - indices[i] -= rank_id_ * var_first_dim_size_; + indices[i] -= row_offset_; } return Launch(inputs, workspace, outputs); } diff --git a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc index 0c5c5947a8..a148f0f3d8 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc @@ -31,6 +31,8 @@ void SparseApplyLazyAdamPSKernel::InitKernel( const std::vector &grad_shape = *(shape_vec[9]); const std::vector &indices_shape = *(shape_vec[10]); + SetTotalRowCnt(var_shape[0]); + CalOffset(); Shard(&var_shape, 0); Shard(&m_shape, 0); Shard(&v_shape, 0); @@ -86,7 +88,7 @@ bool SparseApplyLazyAdamPSKernel::Execute(const std::vector &inputs, ReInit(inputs); int *indices = reinterpret_cast(inputs[10]->addr); for (size_t i = 0; i < inputs[10]->size / sizeof(int); i++) { - indices[i] -= rank_id_ * var_first_dim_size_; + indices[i] -= row_offset_; } return Launch(inputs, workspace, outputs); } diff --git a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h index 495866bd43..0de16b13f2 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h +++ b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h @@ -721,6 +721,7 @@ void ParameterServer::Run(const FuncGraphPtr &func_graph) { return; } Init(func_graph); + Util::SetRankId(rank_id_); thread_->join(); ::ps::Finalize(0, true); } diff --git a/mindspore/ccsrc/frontend/parallel/ps/util.cc b/mindspore/ccsrc/frontend/parallel/ps/util.cc index c951006a8c..2d96c3f127 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/util.cc +++ b/mindspore/ccsrc/frontend/parallel/ps/util.cc @@ -22,6 +22,8 @@ namespace mindspore { namespace parallel { namespace ps { +int Util::rank_id_ = -1; + std::unordered_map Util::optimizer_to_ids{ {kApplyMomentum, 0}, {kSparseAdam, 1}, @@ -140,6 +142,10 @@ int Util::LocalShard(int first_dim, int rank_id, int server_num) { return first_dim - (shard_size * (server_num - 1)); } } + +void Util::SetRankId(int rank_id) { rank_id_ = rank_id; } + +int Util::GetRankId() { return rank_id_; } } // namespace ps } // namespace parallel } // namespace mindspore diff --git a/mindspore/ccsrc/frontend/parallel/ps/util.h b/mindspore/ccsrc/frontend/parallel/ps/util.h index c56ab8264e..242012912f 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/util.h +++ b/mindspore/ccsrc/frontend/parallel/ps/util.h @@ -37,11 +37,14 @@ class Util { static std::string optimizer_node_name(int id); static bool is_optimizer(std::string name); static int LocalShard(int first_dim, int rank_id, int server_num); + static void SetRankId(int rank_id); + static int GetRankId(); private: static std::unordered_map optimizer_to_ids; static std::unordered_map id_to_optimizers; static std::unordered_map id_to_optimizer_nodes; + static int rank_id_; }; } // namespace ps } // namespace parallel diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h index 244d5b4d08..f0a2adbc48 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h @@ -41,6 +41,7 @@ class WorkerProxy : public ::ps::KVWorker { explicit WorkerProxy(int app_id, int customer_id, int lookup_customer_id, int general_customer_id) : Worker(app_id, customer_id) { server_num_ = ::ps::NumServers(); + Util::SetRankId(::ps::MyRank()); using std::placeholders::_1; using std::placeholders::_2; using std::placeholders::_3; diff --git a/mindspore/ccsrc/pipeline/jit/init.cc b/mindspore/ccsrc/pipeline/jit/init.cc index 3d4e7c6bdc..8849fa39e5 100644 --- a/mindspore/ccsrc/pipeline/jit/init.cc +++ b/mindspore/ccsrc/pipeline/jit/init.cc @@ -33,6 +33,9 @@ #else #include "runtime/device/gpu/distribution/collective_fake_init.h" #endif +#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU)) +#include "frontend/parallel/ps/util.h" +#endif namespace py = pybind11; using EnvInstance = mindspore::EnvInstance; @@ -322,7 +325,10 @@ PYBIND11_MODULE(_c_expression, m) { "Init gpu collective communication mode."); (void)m.def("finalize_gpu_collective", &mindspore::device::gpu::CollectiveFakeInitializer::FinalizeCollective, "Finalize gpu collective communication mode."); +#endif +#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU)) + (void)m.def("get_ps_mode_rank", &mindspore::parallel::ps::Util::GetRankId, "Get Worker and PServer rank id."); #endif (void)py::class_>(m, "OpInfoLoaderPy") diff --git a/mindspore/parallel/_ps_utils.py b/mindspore/parallel/_ps_utils.py new file mode 100644 index 0000000000..7a62d41f45 --- /dev/null +++ b/mindspore/parallel/_ps_utils.py @@ -0,0 +1,23 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Utils for parameter server training mode""" + +from mindspore._c_expression import get_ps_mode_rank + +def _get_ps_mode_rank(): + ps_rank = get_ps_mode_rank() + if ps_rank == -1: + raise RuntimeError("The parameter server mode training is not launched yet.") + return ps_rank diff --git a/mindspore/train/callback/_checkpoint.py b/mindspore/train/callback/_checkpoint.py index 106fa736b2..c9926f7b1e 100644 --- a/mindspore/train/callback/_checkpoint.py +++ b/mindspore/train/callback/_checkpoint.py @@ -280,6 +280,9 @@ class ModelCheckpoint(Callback): if save_ckpt: cur_ckpoint_file = self._prefix + "-" + str(cb_params.cur_epoch_num) + "_" \ + str(step_num_in_epoch) + ".ckpt" + if os.getenv("MS_ROLE") == "MS_PSERVER": + from mindspore.parallel._ps_utils import _get_ps_mode_rank + cur_ckpoint_file = "PServer_" + str(_get_ps_mode_rank()) + "_" + cur_ckpoint_file # update checkpoint file list. self._manager.update_ckpoint_filelist(self._directory, self._prefix) # keep checkpoint files number equal max number.