Optimize PS code.

pull/6079/head
ZPaC 4 years ago
parent c882e4863c
commit 4c1f9983d7

@ -42,8 +42,8 @@ void EmbeddingLookUpProxyKernel::InitKernel(const CNodePtr &kernel_node) {
<< ", indices_shape:" << indices_shape << ", output_shape:" << output_shape;
std::vector<int> lens{SizeToInt(input_shape.size()), SizeToInt(indices_shape.size()), SizeToInt(output_shape.size())};
if (mindspore::parallel::ps::Util::IsRoleOfWorker()) {
parallel::ps::Worker<float>::GetInstance().AddEmbeddingTable(key_, input_shape[axis]);
parallel::ps::Worker<float>::GetInstance().InitPSEmbeddingTable(keys, values, lens);
parallel::ps::worker.AddEmbeddingTable(key_, input_shape[axis]);
parallel::ps::worker.InitPSEmbeddingTable(keys, values, lens);
}
}
@ -64,8 +64,8 @@ bool EmbeddingLookUpProxyKernel::Launch(const std::vector<kernel::AddressPtr> &i
if (ret != EOK) {
MS_LOG(EXCEPTION) << "Lookup id memcpy failed.";
}
parallel::ps::Worker<float>::GetInstance().DoPSEmbeddingLookup({key_}, lookup_ids, lengths, &lookup_result,
parallel::ps::kEmbeddingLookupCmd);
parallel::ps::worker.DoPSEmbeddingLookup({key_}, lookup_ids, lengths, &lookup_result,
parallel::ps::kEmbeddingLookupCmd);
auto ret2 = memcpy_s(output_addr, output_size, lookup_result.data(), output_size);
if (ret2 != EOK) {

@ -17,6 +17,7 @@
#include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h"
#include <vector>
#include <memory>
#include <functional>
#include "backend/kernel_compiler/common_utils.h"
#include "frontend/parallel/ps/util.h"
@ -54,9 +55,8 @@ void EmbeddingLookUpPSKernel::InitKernel(
output_size_list_.emplace_back(output_size);
}
void EmbeddingLookUpPSKernel::ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
const auto &indices_shape = *(shape_vec[0]);
void EmbeddingLookUpPSKernel::ReInit(const std::vector<std::vector<size_t>> &shapes) {
const auto &indices_shape = shapes[0];
indices_lens_ = indices_shape[0];
size_t output_size = sizeof(float) * indices_lens_;

@ -31,7 +31,7 @@ class EmbeddingLookUpPSKernel : public EmbeddingLookUpCPUKernel, public PServerK
~EmbeddingLookUpPSKernel() override = default;
void InitKernel(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::vector<std::vector<size_t>> &) override;
bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override;

@ -14,8 +14,14 @@
* limitations under the License.
*/
#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h"
namespace mindspore {
namespace kernel {
namespace ps {} // namespace ps
namespace ps {
void PServerKernel::Shard(std::vector<size_t> *shape, int axis) {
(*shape)[axis] = Util::LocalShard((*shape)[axis], rank_id_, pserver_num_);
}
} // namespace ps
} // namespace kernel
} // namespace mindspore

@ -35,7 +35,7 @@ class PServerKernel {
virtual void InitKernel(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) {}
virtual void InitKernel(const CNodePtr &cnode,
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) {}
virtual void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) {}
virtual void ReInit(const std::vector<std::vector<size_t>> &) {}
virtual bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) = 0;
@ -45,32 +45,11 @@ class PServerKernel {
protected:
virtual void ReInit(const std::vector<AddressPtr> &) {}
void SetTotalRowCnt(size_t total_cnt) {
MS_LOG(INFO) << "Total row count of server " << rank_id_ << " is " << total_cnt;
total_row_cnt_ = total_cnt;
}
void CalOffset() {
size_t rem = total_row_cnt_ % pserver_num_;
if (rem == 0) {
row_offset_ = total_row_cnt_ / pserver_num_ * rank_id_;
} else {
row_offset_ = std::round((static_cast<float>(total_row_cnt_)) / pserver_num_) * rank_id_;
}
MS_LOG(INFO) << "Row offset of server " << rank_id_ << " is " << row_offset_;
}
void Shard(std::vector<size_t> *shape, int axis) {
(*shape)[axis] = Util::LocalShard((*shape)[axis], rank_id_, pserver_num_);
}
void Shard(std::vector<size_t> *shape, int axis);
size_t rank_id_;
size_t pserver_num_;
size_t worker_num_;
size_t total_row_cnt_;
size_t row_offset_;
};
} // namespace ps
} // namespace kernel

@ -29,11 +29,11 @@ namespace kernel {
template <typename T>
class PullKernel : public CPUKernel {
public:
PullKernel() : keys_size_(sizeof(size_t)), var_size_(sizeof(size_t)) {}
PullKernel() : key_(UINT64_MAX), keys_size_(sizeof(size_t)), var_size_(sizeof(size_t)) {}
~PullKernel() override = default;
bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &, const std::vector<AddressPtr> &) {
bool init_in_server = mindspore::parallel::ps::Worker<float>::GetInstance().GetParamInitInServer(param_name_);
bool init_in_server = parallel::ps::worker.GetParamInitInServer(param_name_);
// If init_in_server, forward kernel should run in server too.
if (!init_in_server) {
parallel::ps::Worker<T>::GetInstance().Pull(key_, inputs[1]->addr, inputs[1]->size);

@ -58,7 +58,7 @@ class PushKernel : public CPUKernel {
MS_LOG(INFO) << "Only init shape indices are " << only_shape_indices;
for (size_t i = 0; i < optim_input_shapes.size(); i++) {
auto shape = optim_input_shapes[i];
mindspore::parallel::ps::Worker<float>::GetInstance().SetOptimInputShapes(key_, shape);
parallel::ps::worker.SetOptimInputShapes(key_, shape);
if (std::count(only_shape_indices.begin(), only_shape_indices.end(), i) == 0) {
size_t size = sizeof(T);
for (size_t j = 0; j < shape.size(); j++) {

@ -31,8 +31,6 @@ void SparseApplyAdamPSKernel::InitKernel(
const std::vector<size_t> &grad_shape = *(shape_vec[9]);
const std::vector<size_t> &indices_shape = *(shape_vec[10]);
SetTotalRowCnt(var_shape[0]);
CalOffset();
Shard(&var_shape, 0);
Shard(&m_shape, 0);
Shard(&v_shape, 0);
@ -67,9 +65,8 @@ void SparseApplyAdamPSKernel::InitKernel(
workspace_size_list_.emplace_back(var_first_dim_size_ * var_outer_dim_size_ * sizeof(float) * worker_num_);
}
void SparseApplyAdamPSKernel::ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
const std::vector<size_t> &indices_shape = *(shape_vec[0]);
void SparseApplyAdamPSKernel::ReInit(const std::vector<std::vector<size_t>> &shapes) {
const std::vector<size_t> &indices_shape = shapes[0];
indices_size_ = indices_shape[0];
workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float) * worker_num_;
workspace_size_list_[1] = indices_size_ * sizeof(int) * worker_num_;

@ -33,7 +33,7 @@ class SparseApplyAdamPSKernel : public SparseApplyAdamCPUKernel, public PServerK
void InitKernel(const CNodePtr &cnode,
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::vector<std::vector<size_t>> &) override;
bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override;

@ -28,8 +28,6 @@ void SparseApplyFtrlPSKernel::InitKernel(
std::vector<size_t> grad_shape = *(shape_vec[3]);
std::vector<size_t> indices_shape = *(shape_vec[4]);
SetTotalRowCnt(var_shape[0]);
CalOffset();
Shard(&var_shape, 0);
Shard(&accum_shape, 0);
Shard(&linear_shape, 0);
@ -74,9 +72,8 @@ void SparseApplyFtrlPSKernel::InitKernel(
workspace_size_list_.emplace_back(indices_size_ * sizeof(int) * worker_num_);
}
void SparseApplyFtrlPSKernel::ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
std::vector<size_t> indices_shape = *(shape_vec[0]);
void SparseApplyFtrlPSKernel::ReInit(const std::vector<std::vector<size_t>> &shapes) {
const std::vector<size_t> &indices_shape = shapes[0];
indices_size_ = indices_shape[0];
workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float) * worker_num_;
workspace_size_list_[1] = indices_size_ * sizeof(int) * worker_num_;

@ -33,7 +33,7 @@ class SparseApplyFtrlPSKernel : public SparseApplyFtrlCPUKernel, public PServerK
void InitKernel(const CNodePtr &cnode,
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::vector<std::vector<size_t>> &) override;
bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override;

@ -31,8 +31,6 @@ void SparseApplyLazyAdamPSKernel::InitKernel(
const std::vector<size_t> &grad_shape = *(shape_vec[9]);
const std::vector<size_t> &indices_shape = *(shape_vec[10]);
SetTotalRowCnt(var_shape[0]);
CalOffset();
Shard(&var_shape, 0);
Shard(&m_shape, 0);
Shard(&v_shape, 0);
@ -66,10 +64,8 @@ void SparseApplyLazyAdamPSKernel::InitKernel(
workspace_size_list_.emplace_back(indices_size_ * sizeof(int) * worker_num_);
}
void SparseApplyLazyAdamPSKernel::ReInit(
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
const std::vector<size_t> &indices_shape = *(shape_vec[0]);
void SparseApplyLazyAdamPSKernel::ReInit(const std::vector<std::vector<size_t>> &shapes) {
const std::vector<size_t> &indices_shape = shapes[0];
indices_size_ = indices_shape[0];
workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float) * worker_num_;
workspace_size_list_[1] = indices_size_ * sizeof(int) * worker_num_;

@ -33,7 +33,7 @@ class SparseApplyLazyAdamPSKernel : public SparseApplyLazyAdamCPUKernel, public
void InitKernel(const CNodePtr &cnode,
const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override;
void ReInit(const std::vector<std::vector<size_t>> &) override;
bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) override;

@ -1393,7 +1393,7 @@ void SessionBasic::AssignParamKey(const KernelGraphPtr &kernel_graph) {
if (AnfAlgo::GetCNodeName(node) == kEmbeddingLookupOpName) {
size_t embedding_table_idx = 0;
auto embedding_table = AnfAlgo::GetInputNode(node->cast<CNodePtr>(), embedding_table_idx);
size_t key = parallel::ps::Worker<float>::GetInstance().SetParamKey(embedding_table->fullname_with_scope());
size_t key = parallel::ps::worker.SetParamKey(embedding_table->fullname_with_scope());
AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), node);
} else if (AnfAlgo::GetCNodeName(node) == kPushOpName) {
auto pull_node = FindPullNode(node, node_list);
@ -1404,12 +1404,12 @@ void SessionBasic::AssignParamKey(const KernelGraphPtr &kernel_graph) {
// Second input of Pull node is the trainable parameter.
size_t parameter_index = 1;
auto parameter_node = AnfAlgo::GetInputNode(pull_node->cast<CNodePtr>(), parameter_index);
size_t key = parallel::ps::Worker<float>::GetInstance().SetParamKey(parameter_node->fullname_with_scope());
size_t key = parallel::ps::worker.SetParamKey(parameter_node->fullname_with_scope());
AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), node);
AnfAlgo::SetNodeAttr(kAttrPsKey, MakeValue(key), pull_node);
std::string optimizer_name = AnfAlgo::GetNodeAttr<std::string>(node, kAttrOptimizerType);
parallel::ps::Worker<float>::GetInstance().SetKeyOptimId(key, optimizer_name);
parallel::ps::worker.SetKeyOptimId(key, optimizer_name);
}
}
}
@ -1440,7 +1440,7 @@ void SessionBasic::InitPSParamAndOptim(const KernelGraphPtr &kernel_graph,
MS_EXCEPTION_IF_NULL(input_node);
if (input_node->isa<Parameter>() && AnfAlgo::OutputAddrExist(input_node, 0)) {
auto pk_node = input_node->cast<ParameterPtr>();
mindspore::parallel::ps::Worker<float>::GetInstance().InitPSParamAndOptim(pk_node->fullname_with_scope(), tensor);
parallel::ps::worker.InitPSParamAndOptim(pk_node->fullname_with_scope(), tensor);
}
}
}

@ -15,6 +15,7 @@
*/
#include "frontend/parallel/ps/optimizer_info.h"
#include <map>
#include <memory>
#include "frontend/parallel/ps/util.h"
@ -37,13 +38,6 @@ size_t OptimizerInfo::grad_index() { return 0; }
size_t OptimizerInfo::indices_index() { return 0; }
void OptimizerInfo::UpdateWeight(const WeightPtr &weight) {
AddressPtr weight_addr = std::make_shared<kernel::Address>();
weight_addr->addr = weight->data();
weight_addr->size = weight->size();
inputs_[0] = weight_addr;
}
void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
size_t size = gradient()->size / sizeof(float);
@ -60,8 +54,7 @@ void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
}
}
void DenseOptimInfo::ComputeMean(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &, size_t n,
size_t server_num, size_t rank_id) {
void DenseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &, size_t n, size_t, size_t) {
if (n > 1) {
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
size_t size = gradient()->size / sizeof(float);
@ -88,6 +81,7 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
auto ret = memcpy_s(accum_grad_data + grads_offset_, incr_grad_size, incr_grad_data, incr_grad_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
grads_offset_ += lengths[grad_index];
gradient()->size += incr_grad_size;
@ -107,13 +101,14 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
memcpy_s(accum_indices_data + indices_offset_, incr_indice_data_size, incr_indice_data, incr_indice_data_size);
if (ret2 != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")";
return;
}
indices_offset_ += lengths[indices_index];
indices()->size += incr_indice_data_size;
}
void SparseOptimInfo::ComputeMean(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes,
size_t n, size_t server_num, size_t rank_id) {
void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) {
size_t indices_size = static_cast<size_t>(indices()->size / sizeof(int));
int segment_size = gradient()->size / indices()->size;
@ -121,16 +116,15 @@ void SparseOptimInfo::ComputeMean(const std::shared_ptr<std::vector<std::shared_
std::vector<int> new_indices(indices_size);
mindspore::kernel::SparseGradient<int> unique_sparse_grad({new_grad.data(), new_indices.data(), indices_size});
const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
if (shape_vec.size() < 2 || shape_vec[1] == nullptr) {
// const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
if (shapes.size() < 2 || shapes[1].empty()) {
MS_LOG(EXCEPTION) << "No input shape found";
}
auto input_shapes = shape_vec.size() > 0 ? shape_vec[1] : nullptr;
MS_EXCEPTION_IF_NULL(input_shapes);
if (input_shapes->size() == 0) {
auto input_shapes = shapes[1];
if (input_shapes.size() == 0) {
MS_LOG(EXCEPTION) << "Invalid input shapes";
}
int first_dim_size = input_shapes->front();
int first_dim_size = input_shapes.front();
int outer_dim_size = segment_size;
if (first_dim_size == 0 || outer_dim_size == 0) {
@ -140,7 +134,7 @@ void SparseOptimInfo::ComputeMean(const std::shared_ptr<std::vector<std::shared_
float *grad_data = reinterpret_cast<float *>(gradient()->addr);
int *indices_data = reinterpret_cast<int *>(indices()->addr);
size_t original_row_count = input_shapes->front();
size_t original_row_count = input_shapes.front();
if (original_row_count > 0) {
size_t offset = 0;
std::map<int, int> rank_dims = Util::AllRankLocalShard(original_row_count, rank_id, server_num);
@ -162,11 +156,13 @@ void SparseOptimInfo::ComputeMean(const std::shared_ptr<std::vector<std::shared_
auto ret = memcpy_s(gradient()->addr, reduced_grad_size, unique_sparse_grad.value_, reduced_grad_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
int reduced_indice_size = unique_sparse_grad.indices_size_ * sizeof(int);
ret = memcpy_s(indices()->addr, reduced_indice_size, unique_sparse_grad.indices_, reduced_indice_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
gradient()->size = reduced_grad_size;
@ -197,11 +193,12 @@ MomentumOptimInfo::MomentumOptimInfo(const AddressPtr &weight, const AddressPtr
}
void MomentumOptimInfo::Update(const Values &values, const Lengths &lens) {
size_t lr_offset = 0;
const size_t lr_offset = 0;
float *lr = values.data() + lr_offset;
auto ret = memcpy_s(inputs_[2]->addr, sizeof(float), lr, sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
}
@ -243,6 +240,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
auto ret = memcpy_s(beta1_power->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += size;
@ -251,6 +249,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
ret = memcpy_s(beta2_power->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += size;
@ -259,6 +258,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
ret = memcpy_s(lr->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += size;
@ -267,6 +267,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
ret = memcpy_s(beta1->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += size;
@ -275,6 +276,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
ret = memcpy_s(beta2->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += size;
@ -283,6 +285,7 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
ret = memcpy_s(epsilon->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
}

@ -18,7 +18,6 @@
#define MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_OPTIMIZER_INFO_H_
#include <vector>
#include <memory>
#include "backend/kernel_compiler/kernel.h"
#include "frontend/parallel/ps/common.h"
@ -32,10 +31,9 @@ class OptimizerInfo {
virtual ~OptimizerInfo() = default;
virtual void Update(const Values &values, const Lengths &lengths) {}
virtual void UpdateWeight(const WeightPtr &weight);
virtual void Accumulate(const Values &values, const Lengths &lengths) = 0;
virtual void ComputeMean(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes, size_t n,
size_t server_num, size_t rank_id) {}
virtual void ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) {}
virtual void Reset() {}
void AddWorkspace(const AddressPtr &workspace);
@ -62,8 +60,8 @@ class DenseOptimInfo : public OptimizerInfo {
~DenseOptimInfo() override = default;
void Accumulate(const Values &values, const Lengths &lens) override;
void ComputeMean(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes, size_t n,
size_t server_num, size_t rank_id) override;
void ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) override;
void Reset() override;
};
@ -73,8 +71,8 @@ class SparseOptimInfo : public OptimizerInfo {
~SparseOptimInfo() override = default;
void Accumulate(const Values &values, const Lengths &lens) override;
void ComputeMean(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes, size_t n,
size_t server_num, size_t rank_id) override;
void ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) override;
void Reset() override;
const size_t indice_size() const override;

@ -51,11 +51,13 @@ OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, co
AddressPtr weight_addr = std::make_shared<kernel::Address>();
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);
void *data_ptr = values.data();
void *copy_data_ptr = new float[values.size()];
float *data_ptr = values.data();
float *copy_data_ptr = new float[values.size()];
auto ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] copy_data_ptr;
return nullptr;
}
AddressPtr accumulate = std::make_shared<kernel::Address>();
accumulate->addr = new float[weight->size()];
@ -86,6 +88,8 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
int ret = memset_s(m->addr, m->size, 0x00, m->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
AddressPtr v = std::make_shared<kernel::Address>();
v->addr = new float[weight->size()];
@ -93,13 +97,20 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
ret = memset_s(v->addr, v->size, 0x00, v->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
void *data_ptr = values.data();
void *copy_data_ptr = new float[values.size()];
float *data_ptr = values.data();
float *copy_data_ptr = new float[values.size()];
ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
AddressPtr beta1_power = std::make_shared<kernel::Address>();
@ -134,6 +145,11 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
lens[6] * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(grad->addr);
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
grad->size = lens[6] * sizeof(float);
@ -147,6 +163,12 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
ret = memcpy_s(indices->addr, indices_data_size, indices_data, indices_data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<int *>(indices->addr);
delete[] reinterpret_cast<float *>(grad->addr);
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
indices->size = indices_data_size;
@ -173,6 +195,8 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
int ret = memset_s(linear->addr, weight->size() * sizeof(float), 0x00, weight->size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(linear->addr);
return nullptr;
}
linear->size = weight->size() * sizeof(float);
@ -183,6 +207,9 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
ret = memcpy_s(grad->addr, lens[0] * sizeof(float), values.data(), lens[0] * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(grad->addr);
delete[] reinterpret_cast<float *>(linear->addr);
return nullptr;
}
grad->size = lens[0] * sizeof(float);
@ -196,6 +223,10 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
ret = memcpy_s(indices->addr, indices_data_size, indices_data, indices_data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<int *>(indices->addr);
delete[] reinterpret_cast<float *>(grad->addr);
delete[] reinterpret_cast<float *>(linear->addr);
return nullptr;
}
indices->size = indices_data_size;

@ -55,14 +55,14 @@ class MomentumOptimInfoBuilder : public OptimizerInfoBuilder {
class SparseAdamOptimInfoBuilder : public OptimizerInfoBuilder {
public:
OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens,
const InputsShapePtr &inputs_shpae, size_t worker_num,
const InputsShapePtr &inputs_shape, size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) override;
};
class SparseFtrlOptimInfoBuilder : public OptimizerInfoBuilder {
public:
OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens,
const InputsShapePtr &inputs_shpae, size_t worker_num,
const InputsShapePtr &inputs_shape, size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) override;
};
} // namespace ps

@ -31,6 +31,7 @@
#include <utility>
#include <list>
#include <map>
#include <functional>
#include "ir/func_graph.h"
#include "backend/session/session_basic.h"
#include "backend/session/anf_runtime_algorithm.h"
@ -85,6 +86,7 @@ class ParameterServer {
class ServerHandler {
public:
explicit ServerHandler(ParameterServer *ps) : ps_(ps) {}
~ServerHandler() = default;
void Init();
void operator()(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVServer<T> *server);
@ -124,7 +126,6 @@ class ParameterServer {
void AccumGrad(const Keys &key, const Values &values, const Lengths &lengths);
WeightPtr weight(const Key &key);
void DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, ::ps::KVPairs<T> *res);
int SumOfShapes(const std::vector<int> &shapes) const;
bool ReadyForUpdateWeights();
bool ReadyForPush(const Key &key);
bool ReadyForPull(const Key &key);
@ -460,11 +461,7 @@ void ParameterServer<T>::InitEmbeddingTable(
// Init embedding weight
const std::vector<size_t> &input_shapes = lookup->input_sizes();
size_t total_dims = 1;
for (auto shape : input_shapes) {
total_dims *= shape;
}
size_t total_dims = std::accumulate(input_shapes.begin(), input_shapes.end(), 1, std::multiplies<size_t>());
WeightPtr embedding = std::make_shared<Weight>(total_dims, 0);
T *embedding_data = embedding->data();
std::default_random_engine engine;
@ -517,15 +514,14 @@ void ParameterServer<T>::UpdateWeights() {
const std::vector<kernel::AddressPtr> &workspaces = optim_info->workspaces();
const std::vector<kernel::AddressPtr> &outputs = optim_info->outputs();
std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> shapes =
std::make_shared<std::vector<std::shared_ptr<std::vector<size_t>>>>();
std::shared_ptr<std::vector<size_t>> indices_shape = std::make_shared<std::vector<size_t>>();
indices_shape->emplace_back(optim_info->indice_size());
shapes->push_back(indices_shape);
std::vector<std::vector<size_t>> shapes = {};
std::vector<size_t> indices_shape = {};
indices_shape.emplace_back(optim_info->indice_size());
shapes.push_back(indices_shape);
if (original_optim_inputs_shape_.count(key) != 0) {
for (auto &input_shapes : *(original_optim_inputs_shape_[key])) {
shapes->push_back(input_shapes);
for (auto input_shapes : *(original_optim_inputs_shape_[key])) {
shapes.push_back(*input_shapes);
}
}
optimizer->ReInit(shapes);
@ -604,11 +600,10 @@ void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids,
std::shared_ptr<PServerKernel> table_lookup_op = embedding_lookup_ops_[key];
// Update shapes of lookup operator
std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> shapes =
std::make_shared<std::vector<std::shared_ptr<std::vector<size_t>>>>();
std::shared_ptr<std::vector<size_t>> indices_shape = std::make_shared<std::vector<size_t>>();
indices_shape->emplace_back(lookup_ids.size());
shapes->push_back(indices_shape);
std::vector<std::vector<size_t>> shapes = {};
std::vector<size_t> indices_shape = {};
indices_shape.emplace_back(lookup_ids.size());
shapes.push_back(indices_shape);
table_lookup_op->ReInit(shapes);
const std::vector<size_t> output_shapes = table_lookup_op->output_sizes();
@ -641,15 +636,6 @@ void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids,
res->lens.push_back(res->vals.size());
}
template <typename T>
int ParameterServer<T>::SumOfShapes(const std::vector<int> &shapes) const {
int sum = 1;
for (auto shape : shapes) {
sum *= shape;
}
return sum;
}
template <typename T>
inline bool ParameterServer<T>::ReadyForUpdateWeights() {
return grads_accum_counter_.size() > 0 && grad_accum_count_ == grads_accum_counter_.size();
@ -726,6 +712,7 @@ void ParameterServer<T>::SyncEmbeddingTables() {
int ret = memcpy_s(new_tensor_data_ptr, new_tensor_size, weights_[key]->data(), embedding_table_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
auto paramter_tensor_ptr = embedding_table.second->default_param();

@ -21,6 +21,8 @@
#include <memory>
#include <vector>
#include <string>
#include <numeric>
#include <functional>
#include <map>
#include "ps/ps.h"
#include "utils/log_adapter.h"
@ -124,10 +126,7 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
indice_index = 1;
}
size_t total_size = 0;
for (auto size : sizes) {
total_size += size;
}
size_t total_size = std::accumulate(sizes.begin(), sizes.end(), 0, std::plus<int>());
::ps::SArray<T> total_buffer(total_size, 0);
size_t offset = 0;
for (size_t i = 0; i < sizes.size(); i++) {
@ -135,6 +134,7 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
reinterpret_cast<void *>(addrs[i]), sizes[i] * sizeof(T));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += sizes[i] * sizeof(T);
}
@ -147,10 +147,7 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
} else {
std::vector<int> &var_shape = key_to_optim_shapes_[key][0];
int first_dim_size = var_shape[0];
int outer_dim_size = 1;
for (size_t i = 1; i < var_shape.size(); ++i) {
outer_dim_size *= var_shape[i];
}
int outer_dim_size = std::accumulate(var_shape.begin() + 1, var_shape.end(), 1, std::multiplies<int>());
kv_worker_->PushSparseData(::ps::SArray<::ps::Key>(keys), total_buffer, ::ps::SArray<int>(sizes), grad_index,
indice_index, first_dim_size, outer_dim_size);
}
@ -166,6 +163,7 @@ void Worker<T>::Pull(const size_t key, void *dev_addr, const size_t size) {
auto ret = memcpy_s(dev_addr, size, variables.data(), size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
}
@ -349,6 +347,8 @@ void Worker<T>::AddEmbeddingTable(const ::ps::Key &key, const size_t &row_count)
}
kv_worker_->AddEmbeddingTable(key, row_count);
}
static Worker<float> &worker = Worker<float>::GetInstance();
} // namespace ps
} // namespace parallel
} // namespace mindspore

@ -18,6 +18,8 @@
#define MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_WORKER_PROXY_H_
#include <map>
#include <numeric>
#include <functional>
#include <unordered_map>
#include <unordered_set>
#include <algorithm>
@ -247,7 +249,7 @@ void WorkerProxy<T>::PushSparseData(const ::ps::SArray<::ps::Key> &keys, const :
kvs.keys = keys;
kvs.vals = vals;
kvs.lens = lens;
int cmd = 0;
const int cmd = 0;
if (embedding_table_ranges_.count(keys[0])) {
std::map<int, int> attrs{{0, grad_index}, {1, indice_index}, {2, first_dim_size}, {3, outer_dim_size}};
Send(general_customer_.get(), ts, true, false, cmd, kvs, sparse_slicer_, attrs);
@ -319,6 +321,7 @@ int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps:
auto ret = memcpy_s(result_addr + offset, size, pair->first, size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += pair->second;
}
@ -493,10 +496,7 @@ void WorkerProxy<T>::SparseSlicer(int timestamp, const ::ps::KVPairs<T> &send, c
reduced_lens[indice_index] = unique_sparse_grad.indices_size_;
// Build the sparse value to be sent
size_t total_size = 0;
for (auto size : reduced_lens) {
total_size += size;
}
size_t total_size = std::accumulate(reduced_lens.begin(), reduced_lens.end(), 0, std::plus<int>());
::ps::SArray<T> reduced_data(total_size, 0);
BuildSparseValue(reduced_lens, grad_index, indice_index, data, unique_sparse_grad.value_,
unique_sparse_grad.indices_, &reduced_data);
@ -536,6 +536,7 @@ void WorkerProxy<T>::PrepareSparseGradient(const size_t begin, const size_t end,
auto ret = memcpy_s(gradient + offset, segment_data_size, pair.second, segment_data_size);
if (ret != 0) {
MS_LOG(ERROR) << "memcpy_s error, errorno(" << ret << ")";
return;
}
offset += segment_size;
}
@ -566,6 +567,7 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
auto ret = memcpy_s(reduced_data->data() + grad_offset, data_size, grads, data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
// Fill the reduced indice
@ -575,6 +577,7 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
ret = memcpy_s(indice_data, data_size, indices, data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
}

@ -404,7 +404,7 @@ bool ExecuteAction(const ResourcePtr &res) {
#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU))
bool StartPSWorkerAction(const ResourcePtr &res) {
parallel::ps::Worker<float>::GetInstance().Run();
parallel::ps::worker.Run();
return true;
}

@ -996,9 +996,9 @@ void ClearResAtexit() {
pynative::ClearPyNativeSession();
session::ClearPythonParasMap();
#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU))
if (mindspore::parallel::ps::Util::IsParamServerMode()) {
if (parallel::ps::Util::IsParamServerMode()) {
if (parallel::ps::Util::IsRoleOfWorker()) {
parallel::ps::Worker<float>::GetInstance().Finalize();
parallel::ps::worker.Finalize();
}
}
#endif

@ -81,7 +81,7 @@ export RANK_TABLE_FILE=$PATH1
export MS_COMM_TYPE=zmq
export MS_SCHED_NUM=1
export MS_WORKER_NUM=$RANK_SIZE
export MS_SERVER_NUM=1
export MS_SERVER_NUM=8
export MS_SCHED_HOST=127.0.0.1
export MS_SCHED_PORT=8081

@ -73,7 +73,7 @@ export RANK_SIZE=8
export MS_COMM_TYPE=zmq
export MS_SCHED_NUM=1
export MS_WORKER_NUM=8
export MS_SERVER_NUM=1
export MS_SERVER_NUM=8
export MS_SCHED_HOST=127.0.0.1
export MS_SCHED_PORT=8081

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save