!13119 fix get rank id error

From: @anancds
Reviewed-by: @cristoval
Signed-off-by:
pull/13119/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit afbfd35f27

@ -21,12 +21,7 @@ namespace ps {
namespace core {
std::string Node::node_id() const { return node_info_.node_id_; }
uint32_t Node::rank_id() const {
if (!is_ready_.load()) {
MS_LOG(EXCEPTION) << "The cluster is not ready yet to get rank id!";
}
return node_info_.rank_id_;
}
uint32_t Node::rank_id() const { return node_info_.rank_id_; }
NodeRole Node::role() const { return node_info_.node_role_; }

@ -33,8 +33,7 @@ void ParameterServer::Run(const FuncGraphPtr &func_graph) {
}
Init(func_graph);
server_node_->Start();
rank_id_ = server_node_->rank_id();
PSContext::instance()->SetPSRankId(rank_id_);
PSContext::instance()->SetPSRankId(server_node_->rank_id());
thread_->join();
SyncEmbeddingTables();
MS_LOG(INFO) << "PServer finished updating models, starts finalizing...";
@ -118,22 +117,22 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &value
MS_EXCEPTION_IF_NULL(cnode);
if (optim_name == kSparseAdam) {
std::shared_ptr<PServerKernel> optimizer =
std::make_shared<kernel::ps::SparseApplyAdamPSKernel>(rank_id_, pserver_num_, worker_num_);
std::make_shared<kernel::ps::SparseApplyAdamPSKernel>(server_node_->rank_id(), pserver_num_, worker_num_);
optimizer->InitKernel(cnode, optim_inputs_shape_[key]);
optimizers_[key] = optimizer;
} else if (optim_name == kSparseLazyAdam) {
std::shared_ptr<PServerKernel> optimizer =
std::make_shared<kernel::ps::SparseApplyLazyAdamPSKernel>(rank_id_, pserver_num_, worker_num_);
std::make_shared<kernel::ps::SparseApplyLazyAdamPSKernel>(server_node_->rank_id(), pserver_num_, worker_num_);
optimizer->InitKernel(cnode, optim_inputs_shape_[key]);
optimizers_[key] = optimizer;
} else if (optim_name == kApplyMomentum) {
std::shared_ptr<PServerKernel> optimizer =
std::make_shared<kernel::ps::ApplyMomentumPSKernel>(rank_id_, pserver_num_, worker_num_);
std::make_shared<kernel::ps::ApplyMomentumPSKernel>(server_node_->rank_id(), pserver_num_, worker_num_);
optimizer->InitKernel(cnode, optim_inputs_shape_[key]);
optimizers_[key] = optimizer;
} else if (optim_name == kSparseFtrl) {
std::shared_ptr<PServerKernel> optimizer =
std::make_shared<kernel::ps::SparseApplyFtrlPSKernel>(rank_id_, pserver_num_, worker_num_);
std::make_shared<kernel::ps::SparseApplyFtrlPSKernel>(server_node_->rank_id(), pserver_num_, worker_num_);
optimizer->InitKernel(cnode, optim_inputs_shape_[key]);
optimizers_[key] = optimizer;
}
@ -144,7 +143,7 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &value
void ParameterServer::InitWeight(const Key &key, const WeightPtr &weight) {
MS_EXCEPTION_IF_NULL(weight);
if ((weights_.count(key) == 0) || (is_embedding_[key] && weights_.count(key) != 0)) {
MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << rank_id_;
MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << server_node_->rank_id();
weights_[key] = weight;
tokens_[key] = 0;
is_embedding_[key] = false;
@ -165,7 +164,7 @@ void ParameterServer::InitEmbeddingTable(
MS_EXCEPTION_IF_NULL(shapes);
if (weights_.count(key) == 0) {
std::shared_ptr<PServerKernel> lookup =
std::make_shared<kernel::ps::EmbeddingLookUpPSKernel>(rank_id_, pserver_num_, worker_num_);
std::make_shared<kernel::ps::EmbeddingLookUpPSKernel>(server_node_->rank_id(), pserver_num_, worker_num_);
lookup->InitKernel(shapes);
embedding_lookup_ops_[key] = lookup;
@ -244,7 +243,7 @@ void ParameterServer::UpdateWeights() {
[](std::shared_ptr<std::vector<size_t>> input_shapes) -> std::vector<size_t> { return *input_shapes; });
}
optimizer->ReInit(shapes);
optim_info->ComputeMean(shapes, worker_num_, pserver_num_, rank_id_);
optim_info->ComputeMean(shapes, worker_num_, pserver_num_, server_node_->rank_id());
optimizer->Execute(inputs, workspaces, outputs);
optim_info->Reset();
}
@ -296,7 +295,6 @@ WeightPtr ParameterServer::weight(const Key &key) {
MS_LOG(EXCEPTION) << "Invalid weight key " << key;
}
WeightPtr weight_ptr = weights_[key];
MS_LOG(DEBUG) << "The weight ptr size is:" << weight_ptr->size();
MS_EXCEPTION_IF_NULL(weight_ptr);
WeightPtr copy_weight_ptr = std::make_shared<std::vector<float>>(weight_ptr->size(), 0);
MS_EXCEPTION_IF_NULL(copy_weight_ptr);

@ -77,7 +77,6 @@ class ParameterServer {
ParameterServer()
: pserver_num_(0),
worker_num_(0),
rank_id_(0),
grad_accum_count_(0),
handler_(nullptr),
func_graph_(nullptr),
@ -145,7 +144,6 @@ class ParameterServer {
size_t pserver_num_;
size_t worker_num_;
size_t rank_id_;
size_t grad_accum_count_;
std::unique_ptr<ServerHandler> handler_;
FuncGraphPtr func_graph_;

@ -306,6 +306,7 @@ void Worker::DoPSEmbeddingLookup(const Key &key, const std::vector<int> &lookup_
int64_t single_id_len = SizeToLong(lookup_result->size() / lookup_ids.size());
std::unordered_map<Key, std::shared_ptr<std::pair<float *, int64_t>>> id_addr_map;
std::shared_ptr<std::vector<float>> values = std::make_shared<std::vector<float>>();
std::shared_ptr<std::vector<Key>> keys = std::make_shared<std::vector<Key>>();
int64_t value_offset = 0;
for (size_t i = 0; i < resp.size(); ++i) {
KVMessage message;
@ -315,12 +316,17 @@ void Worker::DoPSEmbeddingLookup(const Key &key, const std::vector<int> &lookup_
}
for (auto k = 0; k < message.keys_size(); k++) {
const Key &key = message.keys(k);
float *addr = values->data() + value_offset;
value_offset += single_id_len;
id_addr_map[key] = std::make_shared<std::pair<float *, int64_t>>(std::make_pair(addr, single_id_len));
keys->push_back(key);
}
}
for (size_t i = 0; i < keys->size(); i++) {
const Key &key = keys->at(i);
float *addr = values->data() + value_offset;
value_offset += single_id_len;
id_addr_map[key] = std::make_shared<std::pair<float *, int64_t>>(std::make_pair(addr, single_id_len));
}
float *result_addr = lookup_result->data();
MS_EXCEPTION_IF_NULL(result_addr);
int64_t offset = 0;

Loading…
Cancel
Save