fix large scale memory (#30035)

* memory holder optimize

Change-Id: Ic91af8ac6f2853336d28a9fbbc5e8d0c57b5d05e

* memory holder optimize

Change-Id: I2fd1c14ecc17f5d5ce88b87890381ea801e6367f

* fix large scale memory holder

Change-Id: Ief0992b02b00220e16c72cc637a56e7b5788140f

* fix large scale memory holder

Change-Id: I910142a3952ead643a5604f8f80955f3e6efe655
revert-31562-mean
tangwei12 5 years ago committed by GitHub
parent 08dc5bc27e
commit 7d4bdff07d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -114,18 +114,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
}
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const std::vector<std::string>& saved_names,
const int mode) {
for (auto value : block->values_) {
std::vector<std::vector<float>*> vss = value.second->get(saved_names);
auto* vs = value.second->data_.data();
std::stringstream ss;
auto id = value.first;
ss << id << "\t";
for (int i = 0; i < static_cast<int>(vss.size()); i++) {
auto& vs = vss[i];
ss << paddle::string::join_strings((*vs), ',');
ss << "\t";
for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
}
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
@ -159,62 +159,13 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues);
block->Init(id, &kvalues, 1);
// warning: need fix
block->Init(id);
}
return 0;
}
void SaveShard(std::shared_ptr<ValueBlock> block, const std::string& dirname,
const CommonAccessorParameter& common, const int mode,
const int pserver_id, const int shard_id) {
auto varname = common.table_name();
std::string var_store = string::Sprintf("%s/%s", dirname, varname);
VLOG(3) << "save " << varname << " in dir: " << var_store << " begin";
MkDirRecursively(var_store.c_str());
std::string shard_var_pre =
string::Sprintf("%s.block%d.%d", varname, pserver_id, shard_id);
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
// save values
std::vector<std::string> params(common.params().begin(),
common.params().end());
std::unique_ptr<std::ofstream> value_out(new std::ofstream(value_));
SaveToText(value_out.get(), block, params, mode);
// save meta
std::stringstream stream;
stream << "param=" << common.table_name() << "\n";
stream << "server_id=" << pserver_id << "\n";
stream << "shard_id=" << shard_id << "\n";
stream << "row_names=" << paddle::string::join_strings(common.params(), ',')
<< "\n";
stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',')
<< "\n";
stream << "count=" << block->values_.size() << "\n";
std::unique_ptr<std::ofstream> meta_out(new std::ofstream(meta_));
meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size());
meta_out->close();
VLOG(3) << "save " << varname << " in dir: " << var_store << " done";
}
void CommonSparseTable::create_initializer(const std::string& attr,
const std::string& name) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_[name] = new GaussianInitializer(slices);
} else if (slices[0] == "fill_constant") {
initializers_[name] = new FillConstantInitializer(slices);
} else if (slices[0] == "uniform_random") {
initializers_[name] = new UniformInitializer(slices);
} else {
PADDLE_THROW(
platform::errors::InvalidArgument("%s can not be supported", name));
}
}
int32_t CommonSparseTable::initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
@ -224,31 +175,44 @@ int32_t CommonSparseTable::initialize() {
sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
initialize_value();
initialize_optimizer();
initialize_recorder();
return 0;
}
int32_t CommonSparseTable::initialize_recorder() { return 0; }
int32_t CommonSparseTable::initialize_value() {
auto common = _config.common();
int size = static_cast<int>(common.params().size());
size_t offset = 0;
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
value_idx_[varname] = x;
value_names_.push_back(varname);
value_dims_.push_back(dim);
value_offsets_.push_back(offset);
initializer_attrs_.push_back(common.initializers()[x]);
if (varname == "Param") {
param_dim_ = dim;
param_offset_ = offset;
}
auto& initializer = common.initializers()[x];
create_initializer(initializer, varname);
offset += dim;
}
initialize_value();
initialize_optimizer();
initialize_recorder();
return 0;
}
int32_t CommonSparseTable::initialize_recorder() { return 0; }
int32_t CommonSparseTable::initialize_value() {
shard_values_.reserve(task_pool_size_);
for (int x = 0; x < task_pool_size_; ++x) {
auto shard = std::make_shared<ValueBlock>(common, &initializers_);
auto shard =
std::make_shared<ValueBlock>(value_names_, value_dims_, value_offsets_,
value_idx_, initializer_attrs_, "none");
shard_values_.emplace_back(shard);
}
@ -281,14 +245,16 @@ int32_t CommonSparseTable::initialize_value() {
int32_t CommonSparseTable::initialize_optimizer() {
auto common = _config.common();
auto name = common.name();
auto attrs = common.attributes();
if (name == "sgd") {
optimizer_ = std::make_shared<SSGD>(common);
optimizer_ = std::make_shared<SSGD>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else if (name == "adam") {
optimizer_ = std::make_shared<SAdam>(common);
optimizer_ = std::make_shared<SAdam>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else if (name == "sum") {
optimizer_ = std::make_shared<SSUM>(common);
optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else {
VLOG(0) << "init optimizer failed";
}
@ -330,8 +296,7 @@ int32_t CommonSparseTable::save(const std::string& dirname,
int64_t total_ins = 0;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// save values
total_ins +=
SaveToText(value_out.get(), shard_values_[shard_id], params, mode);
total_ins += SaveToText(value_out.get(), shard_values_[shard_id], mode);
}
value_out->close();
@ -391,10 +356,6 @@ int32_t CommonSparseTable::pour() {
int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
size_t num) {
rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
@ -408,20 +369,18 @@ int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names,
&pull_values]() -> int {
[this, shard_id, &keys, &offset_bucket, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
block->InitFromInitializer(id, value_names);
auto values = block->Get(id, {"Param"});
auto dim = values[0]->size();
std::copy(values[0]->begin(), values[0]->end(),
pull_values + dim * offset);
auto* value = block->InitFromInitializer(id);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
return 0;
});
}
@ -492,10 +451,6 @@ int32_t CommonSparseTable::push_sparse(const uint64_t* keys,
int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
const float* values, size_t num) {
rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
@ -509,18 +464,16 @@ int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names,
&values]() -> int {
[this, shard_id, &keys, &offset_bucket, &values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
block->InitFromInitializer(id, value_names);
auto values_ = block->Get(id, {"Param"});
auto dim = values_[0]->size();
std::copy_n(values + dim * offset, dim, values_[0]->data());
auto* value = block->InitFromInitializer(id);
std::copy_n(values + param_dim_ * offset, param_dim_,
value + param_offset_);
}
return 0;
});

@ -50,8 +50,6 @@ class CommonSparseTable : public SparseTable {
virtual int32_t initialize();
virtual int32_t initialize_shard() { return 0; }
virtual void create_initializer(const std::string& attr,
const std::string& name);
virtual int32_t initialize_value();
virtual int32_t initialize_optimizer();
virtual int32_t initialize_recorder();
@ -86,8 +84,15 @@ class CommonSparseTable : public SparseTable {
bool sync = false;
int param_dim_ = 0;
int param_offset_ = 0;
std::unordered_map<std::string, int> value_idx_;
std::vector<std::string> value_names_;
std::vector<int> value_dims_;
std::vector<int> value_offsets_;
std::vector<std::string> initializer_attrs_;
std::shared_ptr<SparseOptimizer> optimizer_;
std::unordered_map<std::string, Initializer*> initializers_;
std::vector<std::shared_ptr<ValueBlock>> shard_values_;
std::unordered_map<uint64_t, ReservoirValue<float>> pull_reservoir_;
std::unique_ptr<framework::RWLock> rwlock_{nullptr};

File diff suppressed because it is too large Load Diff

@ -19,6 +19,7 @@
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
@ -30,25 +31,38 @@ namespace distributed {
class SparseOptimizer {
public:
SparseOptimizer() {}
explicit SparseOptimizer(const CommonAccessorParameter& common) {}
explicit SparseOptimizer(
const std::vector<std::string>& value_names,
const std::vector<int>& value_dims, const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: value_names_(value_names),
value_dims_(value_dims),
value_offsets_(value_offsets),
value_idx_(value_idx) {}
virtual void update(const uint64_t* keys, const float* update_values,
size_t num, const std::vector<uint64_t>& offsets,
ValueBlock* block) = 0;
const std::vector<std::string>& value_names_;
const std::vector<int>& value_dims_;
const std::vector<int>& value_offsets_;
const std::unordered_map<std::string, int>& value_idx_;
int param_offset = 0;
int update_numel = 0;
};
// sum calc for sparse tensor
class SSUM : public SparseOptimizer {
public:
SSUM(){};
explicit SSUM(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
}
explicit SSUM(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
}
void update(const uint64_t* keys, const float* update_values, size_t num,
@ -57,35 +71,27 @@ class SSUM : public SparseOptimizer {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* param = values[param_idx]->data();
std::vector<float> delta;
delta.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, delta.data());
blas.VADD(update_numel, delta.data(), param, param);
auto* value = block->Get(id);
float* param = value + param_offset;
blas.VADD(update_numel, update_values + x * update_numel, param, param);
}
}
int param_idx;
int update_numel;
};
// sgd optimzer for sparse tensor
class SSGD : public SparseOptimizer {
public:
SSGD(){};
explicit SSGD(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate_idx = x;
}
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
}
explicit SSGD(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
idx = value_idx.at("LearningRate");
lr_offset = value_offsets.at(idx);
}
void update(const uint64_t* keys, const float* update_values, size_t num,
@ -94,9 +100,10 @@ class SSGD : public SparseOptimizer {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data();
float* param = values[param_idx]->data();
auto* value = block->Get(id);
float* learning_rate = value + lr_offset;
float* param = value + param_offset;
std::vector<float> grads;
grads.resize(update_numel);
@ -106,38 +113,35 @@ class SSGD : public SparseOptimizer {
}
}
int learning_rate_idx;
int param_idx;
int update_numel;
int lr_offset;
};
// adam optimzer for sparse tensor
class SAdam : public SparseOptimizer {
public:
SAdam() {}
explicit SAdam(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate_idx = x;
}
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
if (names[x] == "Moment1") {
moment1_idx = x;
}
if (names[x] == "Moment2") {
moment2_idx = x;
}
if (names[x] == "Beta1Pow") {
beta1_pow_idx = x;
}
if (names[x] == "Beta2Pow") {
beta2_pow_idx = x;
}
}
explicit SAdam(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
idx = value_idx.at("LearningRate");
lr_offset = value_offsets.at(idx);
idx = value_idx.at("Moment1");
m1_offset = value_offsets.at(idx);
idx = value_idx.at("Moment2");
m2_offset = value_offsets.at(idx);
idx = value_idx.at("Beta1Pow");
beta1_pow_offset = value_offsets.at(idx);
idx = value_idx.at("Beta2Pow");
beta2_pow_offset = value_offsets.at(idx);
// add attr later
beta1 = 0.9;
@ -151,13 +155,13 @@ class SAdam : public SparseOptimizer {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data();
float* param = values[param_idx]->data();
float* moment1 = values[moment1_idx]->data();
float* moment2 = values[moment2_idx]->data();
float* beta1_pow = values[beta1_pow_idx]->data();
float* beta2_pow = values[beta2_pow_idx]->data();
auto* values = block->Get(id);
float* learning_rate = values + lr_offset;
float* param = values + param_offset;
float* moment1 = values + m1_offset;
float* moment2 = values + m2_offset;
float* beta1_pow = values + beta1_pow_offset;
float* beta2_pow = values + beta2_pow_offset;
beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2;
@ -194,16 +198,15 @@ class SAdam : public SparseOptimizer {
}
}
int learning_rate_idx;
int param_idx;
int moment1_idx;
int moment2_idx;
int beta1_pow_idx;
int beta2_pow_idx;
int lr_offset;
int m1_offset;
int m2_offset;
int beta1_pow_offset;
int beta2_pow_offset;
float beta1;
float beta2;
float epsilon;
int update_numel;
};
} // namespace distributed

Loading…
Cancel
Save