fix several sparse table issuses (#20686)

* no longer need to define all embedding layers (no one less) of all slots in each program. make trainer_param repeated in ps.proto.
* add find_distributed_lookup_table_grads instead of hard code GRAD
* support embedding stop gradient. push sparse has error before fix this.* 
* fix fill sparse, skip slots which do not have embedding. each slot's embedding in a sparse table should be used in all training programs before fix this.
* fix pull sparse, skip slots which do not have embedding.
* fix collect feasign label info, skip slots which do not have embedding.
* support when there are multi sparse tables in one or multi training programs, each program can pull/push its own related sparse tables instead of all sparse tables.
* test=develop
yaoxuefeng
xujiaqi01 6 years ago committed by GitHub
parent fa67e6e83e
commit 48669aa8f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -211,6 +211,8 @@ class DownpourWorker : public HogwildWorker {
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_; std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
std::map<uint64_t, std::vector<std::string>> dense_value_names_; std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_; std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
// actually pushed feasign of each table
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
// feasign // feasign
std::map<uint64_t, std::vector<uint64_t>> features_; std::map<uint64_t, std::vector<uint64_t>> features_;

@ -44,6 +44,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
sparse_grad_names_[table_id][j] = table.sparse_grad_name(j); sparse_grad_names_[table_id][j] = table.sparse_grad_name(j);
} }
label_var_name_[table_id] = table.label_var_name(); label_var_name_[table_id] = table.label_var_name();
sparse_push_keys_[table_id] = std::vector<uint64_t>();
} }
for (int i = 0; i < param_.dense_table_size(); ++i) { for (int i = 0; i < param_.dense_table_size(); ++i) {
@ -191,6 +192,14 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) {
LoDTensor* tensor = fea_var->GetMutable<LoDTensor>(); LoDTensor* tensor = fea_var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " CHECK(tensor != nullptr) << "tensor of var "
<< sparse_key_names_[table_id][i] << " is null"; << sparse_key_names_[table_id][i] << " is null";
// skip slots which do not have embedding
Variable* emb_var =
thread_scope_->FindVar(sparse_value_names_[table_id][i]);
if (emb_var == nullptr) {
continue;
}
int64_t* ids = tensor->data<int64_t>(); int64_t* ids = tensor->data<int64_t>();
size_t fea_idx = 0; size_t fea_idx = 0;
// tensor->lod()[0].size() == batch_size + 1 // tensor->lod()[0].size() == batch_size + 1
@ -237,6 +246,9 @@ void DownpourWorker::FillSparseValue(size_t table_idx) {
int64_t* ids = tensor->data<int64_t>(); int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel(); int len = tensor->numel();
Variable* var_emb = thread_scope_->FindVar(emb_slot_name); Variable* var_emb = thread_scope_->FindVar(emb_slot_name);
if (var_emb == nullptr) {
continue;
}
LoDTensor* tensor_emb = var_emb->GetMutable<LoDTensor>(); LoDTensor* tensor_emb = var_emb->GetMutable<LoDTensor>();
float* ptr = tensor_emb->mutable_data<float>({len, table.emb_dim()}, float* ptr = tensor_emb->mutable_data<float>({len, table.emb_dim()},
platform::CPUPlace()); platform::CPUPlace());
@ -422,9 +434,9 @@ void DownpourWorker::TrainFilesWithProfiler() {
} }
} }
timeline.Start(); timeline.Start();
fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid, fleet_ptr_->PullSparseVarsSync(
sparse_key_names_[tid], &features_[tid], *thread_scope_, tid, sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim()); &feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]);
timeline.Pause(); timeline.Pause();
pull_sparse_time += timeline.ElapsedSec(); pull_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec();
@ -504,7 +516,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
*thread_scope_, tid, features_[tid], feature_labels_[tid], *thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_); dump_slot_, &sparse_push_keys_[tid]);
timeline.Pause(); timeline.Pause();
push_sparse_time += timeline.ElapsedSec(); push_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec();
@ -646,9 +658,9 @@ void DownpourWorker::TrainFiles() {
break; break;
} }
} }
fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid, fleet_ptr_->PullSparseVarsSync(
sparse_key_names_[tid], &features_[tid], *thread_scope_, tid, sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim()); &feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]);
CollectLabelInfo(i); CollectLabelInfo(i);
FillSparseValue(i); FillSparseValue(i);
auto nid_iter = std::find(sparse_value_names_[tid].begin(), auto nid_iter = std::find(sparse_value_names_[tid].begin(),
@ -707,7 +719,7 @@ void DownpourWorker::TrainFiles() {
*thread_scope_, tid, features_[tid], feature_labels_[tid], *thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_); dump_slot_, &sparse_push_keys_[tid]);
} }
} }

@ -159,14 +159,16 @@ void FleetWrapper::CreateClient2ClientConnection() {
void FleetWrapper::PullSparseVarsSync( void FleetWrapper::PullSparseVarsSync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys, const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_value_dim) { std::vector<std::vector<float>>* fea_values, int fea_value_dim,
const std::vector<std::string>& var_emb_names) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
std::vector<::std::future<int32_t>> pull_sparse_status; std::vector<::std::future<int32_t>> pull_sparse_status;
pull_sparse_status.resize(0); pull_sparse_status.resize(0);
fea_keys->clear(); fea_keys->clear();
fea_keys->resize(0); fea_keys->resize(0);
fea_keys->reserve(MAX_FEASIGN_NUM); fea_keys->reserve(MAX_FEASIGN_NUM);
for (auto name : var_names) { for (size_t var_index = 0; var_index < var_names.size(); ++var_index) {
const std::string& name = var_names[var_index];
Variable* var = scope.FindVar(name); Variable* var = scope.FindVar(name);
if (var == nullptr) { if (var == nullptr) {
continue; continue;
@ -175,6 +177,14 @@ void FleetWrapper::PullSparseVarsSync(
CHECK(tensor != nullptr) << "tensor of var " << name << " is null"; CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>(); int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel(); int len = tensor->numel();
// skip slots which do not have embedding
const std::string& emb_name = var_emb_names[var_index];
Variable* emb_var = scope.FindVar(emb_name);
if (emb_var == nullptr) {
continue;
}
for (auto i = 0u; i < len; ++i) { for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) { if (ids[i] == 0u) {
continue; continue;
@ -314,7 +324,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
const std::vector<std::string>& sparse_grad_names, const int emb_dim, const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values, std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot) { const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
int offset = 2; int offset = 2;
int slot_offset = 0; int slot_offset = 0;
@ -332,12 +343,15 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
} }
CHECK_GE(grad_dim, 0); CHECK_GE(grad_dim, 0);
sparse_push_keys->clear();
sparse_push_keys->reserve(fea_keys.size() + 1);
push_values->resize(fea_keys.size() + 1); push_values->resize(fea_keys.size() + 1);
for (auto& t : *push_values) { for (auto& t : *push_values) {
t.resize(emb_dim + offset + slot_offset); t.resize(emb_dim + offset + slot_offset);
} }
uint64_t fea_idx = 0u; uint64_t fea_idx = 0u;
for (size_t i = 0; i < sparse_key_names.size(); ++i) { for (size_t i = 0;
i < sparse_key_names.size() && i < sparse_grad_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]); Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) { if (var == nullptr) {
continue; continue;
@ -376,6 +390,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
g += emb_dim; g += emb_dim;
continue; continue;
} }
sparse_push_keys->push_back(ids[id_idx]);
CHECK(fea_idx < (*push_values).size()); CHECK(fea_idx < (*push_values).size());
CHECK(fea_idx < fea_labels.size()); CHECK(fea_idx < fea_labels.size());
@ -396,17 +411,43 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
fea_idx++; fea_idx++;
} }
} }
CHECK(fea_idx == fea_keys.size()) << "fea_idx: " << fea_idx // slots whose embedding has been stop gradient or
// not involved in forward-backward
uint64_t no_grad_fea_num = 0u;
for (size_t i = sparse_grad_names.size(); i < sparse_key_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
int len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
continue;
}
++no_grad_fea_num;
}
}
CHECK(fea_idx + no_grad_fea_num == fea_keys.size())
<< "fea_idx: " << fea_idx << " no_grad_fea_num: " << no_grad_fea_num
<< " features size: " << fea_keys.size(); << " features size: " << fea_keys.size();
CHECK(fea_idx == sparse_push_keys->size());
if (fea_idx == 0) {
return;
}
std::vector<float*> push_g_vec; std::vector<float*> push_g_vec;
for (auto i = 0u; i < fea_keys.size(); ++i) { for (auto i = 0u; i < sparse_push_keys->size(); ++i) {
push_g_vec.push_back((*push_values)[i].data()); push_g_vec.push_back((*push_values)[i].data());
} }
auto status = pslib_ptr_->_worker_ptr->push_sparse( auto status = pslib_ptr_->_worker_ptr->push_sparse(
table_id, fea_keys.data(), (const float**)push_g_vec.data(), table_id, sparse_push_keys->data(), (const float**)push_g_vec.data(),
fea_keys.size()); sparse_push_keys->size());
push_sparse_status->push_back(std::move(status)); push_sparse_status->push_back(std::move(status));
#endif #endif
} }

@ -77,7 +77,8 @@ class FleetWrapper {
const std::vector<std::string>& var_names, const std::vector<std::string>& var_names,
std::vector<uint64_t>* fea_keys, std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, std::vector<std::vector<float>>* fea_values,
int fea_dim); int fea_dim,
const std::vector<std::string>& var_emb_names);
void PullDenseVarsSync(const Scope& scope, const uint64_t table_id, void PullDenseVarsSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names); const std::vector<std::string>& var_names);
@ -115,7 +116,8 @@ class FleetWrapper {
const std::vector<std::string>& sparse_grad_names, const int emb_dim, const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values, std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot); const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys);
// Push sparse variables to server in Async mode // Push sparse variables to server in Async mode
// Param<In>: scope, table_id, fea_keys, sparse_grad_names // Param<In>: scope, table_id, fea_keys, sparse_grad_names

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Defination of device workers."""
__all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section'] __all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section']
@ -23,9 +24,7 @@ class DeviceWorker(object):
""" """
def __init__(self): def __init__(self):
""" """Init."""
Init.
"""
self._program = None self._program = None
self._infer = None self._infer = None
@ -75,9 +74,7 @@ class Hogwild(DeviceWorker):
""" """
def __init__(self): def __init__(self):
""" """Init."""
Init.
"""
super(Hogwild, self).__init__() super(Hogwild, self).__init__()
def _gen_worker_desc(self, trainer_desc): def _gen_worker_desc(self, trainer_desc):
@ -140,23 +137,29 @@ class DownpourSGD(DeviceWorker):
trainer_desc.device_worker_name = "DownpourWorker" trainer_desc.device_worker_name = "DownpourWorker"
pull_thread = trainer_desc.pull_dense_param pull_thread = trainer_desc.pull_dense_param
pull_thread.device_num = trainer_desc.thread_num pull_thread.device_num = trainer_desc.thread_num
for i in self._fleet_desc.trainer_param.dense_table: if opt_info.get("program_id_to_worker") is None:
raise ValueError("opt_info must have program_id_to_worker")
prog_id_to_worker = opt_info["program_id_to_worker"]
if prog_id_to_worker.get(program_id) is None:
raise ValueError("%s not found in program_id_to_worker" %
program_id)
worker = opt_info["program_id_to_worker"][program_id]
for i in worker.get_desc().dense_table:
if i.table_id in dense_table_set: if i.table_id in dense_table_set:
dense_table = pull_thread.dense_table.add() dense_table = pull_thread.dense_table.add()
dense_table.dense_value_name.extend(i.dense_variable_name) dense_table.dense_value_name.extend(i.dense_variable_name)
dense_table.table_id = \ dense_table.table_id = \
i.table_id i.table_id
sparse_len = len(self._fleet_desc.trainer_param.sparse_table) sparse_len = len(worker.get_desc().sparse_table)
for i in range(sparse_len): for i in range(sparse_len):
sparse_table = downpour.sparse_table.add() sparse_table = downpour.sparse_table.add()
sparse_table.table_id = \ sparse_table.table_id = worker.get_desc().sparse_table[i].table_id
self._fleet_desc.trainer_param.sparse_table[i].table_id sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[
sparse_table.sparse_key_name.extend( i].slot_key)
self._fleet_desc.trainer_param.sparse_table[i].slot_key) sparse_table.sparse_value_name.extend(worker.get_desc()
sparse_table.sparse_value_name.extend( .sparse_table[i].slot_value)
self._fleet_desc.trainer_param.sparse_table[i].slot_value) sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
sparse_table.sparse_grad_name.extend( i].slot_gradient)
self._fleet_desc.trainer_param.sparse_table[i].slot_gradient)
if opt_info["use_cvm"]: if opt_info["use_cvm"]:
sparse_table.emb_dim = \ sparse_table.emb_dim = \
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
@ -173,28 +176,24 @@ class DownpourSGD(DeviceWorker):
for i in opt_info["stat_var_names"]: for i in opt_info["stat_var_names"]:
downpour.stat_var_names.extend([i]) downpour.stat_var_names.extend([i])
for i in self._fleet_desc.trainer_param.dense_table: for i in worker.get_desc().dense_table:
if i.table_id in dense_table_set: if i.table_id in dense_table_set:
dense_table = downpour.dense_table.add() dense_table = downpour.dense_table.add()
dense_table.table_id = i.table_id dense_table.table_id = i.table_id
dense_table.dense_value_name.extend(i.dense_variable_name) dense_table.dense_value_name.extend(i.dense_variable_name)
dense_table.dense_grad_name.extend( dense_table.dense_grad_name.extend(
i.dense_gradient_variable_name) i.dense_gradient_variable_name)
downpour.skip_ops.extend(self._fleet_desc.trainer_param.skip_op) downpour.skip_ops.extend(worker.get_desc().skip_op)
if self._infer: if self._infer:
downpour.push_dense = False downpour.push_dense = False
downpour.push_sparse = False downpour.push_sparse = False
class Section(DeviceWorker): class Section(DeviceWorker):
""" """SectionWorker."""
SectionWorker
"""
def __init__(self): def __init__(self):
""" """Init."""
Init.
"""
super(Section, self).__init__() super(Section, self).__init__()
def _gen_worker_desc(self, trainer_desc): def _gen_worker_desc(self, trainer_desc):

@ -10,6 +10,7 @@
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
"""Defination of PSLib."""
import os import os
import sys import sys
@ -25,6 +26,8 @@ from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
class PSLib(Fleet): class PSLib(Fleet):
"""PSLib class."""
def __init__(self): def __init__(self):
super(PSLib, self).__init__(Mode.PSLIB) super(PSLib, self).__init__(Mode.PSLIB)
self._opt_info = None self._opt_info = None
@ -89,7 +92,10 @@ class PSLib(Fleet):
# barrier for init model # barrier for init model
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
tables = self._dist_desc.trainer_param.dense_table tables = []
for tp in self._dist_desc.trainer_param:
for i in tp.dense_table:
tables.append(i)
for prog, scope in zip(self._main_programs, self._scopes): for prog, scope in zip(self._main_programs, self._scopes):
prog_id = str(id(prog)) prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id] prog_conf = self._opt_info['program_configs'][prog_id]
@ -244,7 +250,9 @@ class PSLib(Fleet):
3 means save batch model. 3 means save batch model.
Example: Example:
>>> fleet.save_persistables(dirname="/you/path/to/model", mode = 0) .. code-block:: python
fleet.save_persistables(dirname="/you/path/to/model", mode = 0)
""" """
mode = kwargs.get("mode", 0) mode = kwargs.get("mode", 0)
@ -260,15 +268,20 @@ class PSLib(Fleet):
when using fleet, it will save sparse cache table when using fleet, it will save sparse cache table
Args: Args:
executor(Executor): fluid executor
dirname(str): save path. It can be hdfs/afs path or local path dirname(str): save path. It can be hdfs/afs path or local path
main_program(Program): fluid program, default None main_program(Program): fluid program, default None
kwargs: use define property, current support following kwargs: use define property, current support following
mode(int): define for feature extension in the future, mode(int): define for feature extension in the future,
currently no use, will pass a default value 0 currently no use, will pass a default value 0
Returns:
feasign_num(int): cache feasign num
Example: Example:
.. code-block:: python .. code-block:: python
>>> fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0)
fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0)
""" """
mode = kwargs.get("mode", 0) mode = kwargs.get("mode", 0)
@ -304,8 +317,12 @@ class PSLib(Fleet):
""" """
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.sparse_table: tables = []
self._fleet_ptr.shrink_sparse_table(i.table_id) for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.sparse_table:
tables.append(i.table_id)
for i in list(set(tables)):
self._fleet_ptr.shrink_sparse_table(i)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None): def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None):
@ -330,7 +347,8 @@ class PSLib(Fleet):
scope = fluid.global_scope() scope = fluid.global_scope()
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.dense_table: for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.dense_table:
if table_id is not None and table_id != i.table_id: if table_id is not None and table_id != i.table_id:
continue continue
var_list = [var for var in i.dense_variable_name] var_list = [var for var in i.dense_variable_name]
@ -341,8 +359,8 @@ class PSLib(Fleet):
break break
if skip: if skip:
continue continue
self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list, self._fleet_ptr.shrink_dense_table(i.table_id, scope,
decay, emb_dim) var_list, decay, emb_dim)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def clear_model(self): def clear_model(self):
@ -476,7 +494,8 @@ class PSLib(Fleet):
if ret != 0: if ret != 0:
raise RuntimeError("download model proto file failed") raise RuntimeError("download model proto file failed")
model_proto_file = dest model_proto_file = dest
for i in self._opt_info["fleet_desc"].trainer_param.dense_table: for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.dense_table:
if table_id is not None and table_id != i.table_id: if table_id is not None and table_id != i.table_id:
continue continue
table_var_names = [var for var in i.dense_variable_name] table_var_names = [var for var in i.dense_variable_name]
@ -488,8 +507,8 @@ class PSLib(Fleet):
if skip: if skip:
continue continue
self._fleet_ptr.load_from_paddle_model( self._fleet_ptr.load_from_paddle_model(
scope, table_id, var_names, model_path, model_proto_file, scope, table_id, var_names, model_path,
table_var_names, load_combine) model_proto_file, table_var_names, load_combine)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info): def _set_opt_info(self, opt_info):

@ -10,13 +10,15 @@
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib from . import ps_pb2 as pslib
class Server(object): class Server(object):
""" """
A Server basic class. A Server basic class
it's a base class, does not have implementation
""" """
def __init__(self): def __init__(self):
@ -26,6 +28,7 @@ class Server(object):
class Worker(object): class Worker(object):
""" """
A Worker basic class. A Worker basic class.
it's a base class, does not have implementation
""" """
def __init__(self): def __init__(self):
@ -169,7 +172,10 @@ class DownpourServer(Server):
""" """
Args: Args:
table_id(int): id of sparse params table table_id(int): id of sparse params table
strategy(dict): the dense config dict. param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the dense config dict
sparse_table_names(list): sparse table names
Returns: Returns:
return None return None
""" """
@ -230,7 +236,11 @@ class DownpourServer(Server):
""" """
Args: Args:
table_id(int): id of datanorm table table_id(int): id of datanorm table
strategy(dict): the datanorm config dict. learning_rate(float): the learning rate used to update parameters
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the datanorm config dict
sparse_table_names(list): sparse table names
Returns: Returns:
return None return None
""" """
@ -296,43 +306,60 @@ class DownpourWorker(Worker):
self.window = window self.window = window
self._worker = pslib.DownpourTrainerParameter() self._worker = pslib.DownpourTrainerParameter()
def add_sparse_table(self, table_id, slot_key_vars, slot_value_vars): def add_sparse_table(self,
table_id,
slot_key_vars,
slot_value_vars,
slot_value_grads=None):
""" """
Args: Args:
table_id(int): id of sparse params table table_id(int): id of sparse params table
slot_key_vars(string): slot key id slot_key_vars(list): slot key id
slot_value_var(string): slot key value after embedding slot_value_vars(list): slot key value after embedding
slot_value_grads(list): grad of all params, default is None
Returns: Returns:
return None return None
""" """
if slot_value_grads is None:
slot_value_grad_names = \
[var.name + "@GRAD" for var in slot_value_vars]
else:
value_to_key = {}
for i in range(len(slot_key_vars)):
value_to_key[slot_value_vars[i].name] = slot_key_vars[i]
slot_value_grad_names = []
all_grad_names = [var.name for var in slot_value_grads]
for var in slot_value_vars:
if var.name + "@GRAD" in all_grad_names:
slot_value_grad_names.append(var.name + "@GRAD")
sorted_slot_value_vars = [i for i in slot_value_vars if \
i.name + "@GRAD" in slot_value_grad_names]
sorted_slot_value_vars += [i for i in slot_value_vars if \
i.name + "@GRAD" not in slot_value_grad_names]
sorted_slot_key_vars = \
[value_to_key[v.name] for v in sorted_slot_value_vars]
target_table = None
for table in self._worker.sparse_table: for table in self._worker.sparse_table:
if table.table_id == table_id: if table.table_id == table_id:
if [var.name for var in slot_key_vars keys = self._worker.sparse_table[table_id].slot_key
] == self._worker.sparse_table[table_id].slot_key: key_names = [var.name for var in sorted_slot_key_vars]
if [var.name for var in slot_value_vars for key_name in key_names:
] == self._worker.sparse_table[table_id].slot_value: if key_name not in keys:
if [
var.name + "@GRAD" for var in slot_value_vars
] == self._worker.sparse_table[table_id].slot_gradient:
return
else:
raise ValueError(
"sparse table %s slot_gradient error" %
table_id)
else:
raise ValueError("sparse table %s slot_value error" %
table_id)
else:
raise ValueError("sparse table %s slot_key error" % raise ValueError("sparse table %s slot_key error" %
table_id) table_id)
target_table = table
break
table = target_table
if table is not None:
self._worker.sparse_table.remove(table)
table = self._worker.sparse_table.add() table = self._worker.sparse_table.add()
table.table_id = table_id table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars]) table.slot_key.extend([var.name for var in sorted_slot_key_vars])
table.slot_value.extend([var.name for var in slot_value_vars]) table.slot_value.extend([var.name for var in sorted_slot_value_vars])
table.slot_gradient.extend( table.slot_gradient.extend(slot_value_grad_names)
[var.name + "@GRAD" for var in slot_value_vars])
def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars, def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars,
dense_start_table_id, sparse_table_names): dense_start_table_id, sparse_table_names):
@ -341,8 +368,10 @@ class DownpourWorker(Worker):
table_id(int): id of sparse params table table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \ learning_rate(float): the learning rate used to update parameters. \
Can be a float value Can be a float value
param_var(list): all dense param. it is a list. param_vars(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list. grad_vars(list): all dense grad parm it is a list.
dense_start_table_id(int): dense table start index
sparse_table_names(list): sparse table names
Returns: Returns:
return None return None
""" """
@ -365,21 +394,19 @@ class DownpourWorker(Worker):
for table in self._worker.dense_table: for table in self._worker.dense_table:
if table.table_id == table_id: if table.table_id == table_id:
desc_dense_param_name = list(self._worker.dense_table[ desc_dense_param_name = list(table.dense_variable_name)
table_id - dense_start_table_id].dense_variable_name)
desc_dense_param_name.sort() desc_dense_param_name.sort()
if dense_param_name == desc_dense_param_name: if dense_param_name == desc_dense_param_name:
desc_dense_grad_name = list(self._worker.dense_table[ desc_dense_grad_name = list(
table_id - dense_start_table_id] table.dense_gradient_variable_name)
.dense_gradient_variable_name)
desc_dense_grad_name.sort() desc_dense_grad_name.sort()
if dense_grad_name == desc_dense_grad_name: if dense_grad_name == desc_dense_grad_name:
return return
else: else:
raise ValueError( raise ValueError(
"dense table %s dense_gradient_variable_name error" "dense table %s dense_gradient_variable_name "
% table_id) "error" % table_id)
else: else:
raise ValueError( raise ValueError(
"dense table %s dense_variable_name error" % table_id) "dense table %s dense_variable_name error" % table_id)

File diff suppressed because one or more lines are too long

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Testcases for Downpour."""
from __future__ import print_function from __future__ import print_function
@ -25,15 +26,19 @@ import sys
from op_test import OpTest from op_test import OpTest
from paddle.fluid.trainer_desc import DistMultiTrainer from paddle.fluid.trainer_desc import DistMultiTrainer
from paddle.fluid.device_worker import DownpourSGD from paddle.fluid.device_worker import DownpourSGD
from paddle.fluid.incubate.fleet.parameter_server.pslib.node import DownpourWorker
from google.protobuf import text_format from google.protobuf import text_format
import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib
class TestListenAndServOp(OpTest): class TestListenAndServOp(OpTest):
"""TestListenAndServOp."""
def setUp(self): def setUp(self):
pass pass
def test_device_work_use_cvm(self): def test_device_work_use_cvm(self):
"""test device work use_cvm."""
if sys.platform == 'win32' or sys.platform == 'sys.platform': if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass pass
else: else:
@ -77,6 +82,9 @@ class TestListenAndServOp(OpTest):
opt_info["scale_datanorm"] = -1 opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False opt_info["dump_slot"] = False
opt_info["stat_var_names"] = [] opt_info["stat_var_names"] = []
worker = DownpourWorker(None)
worker.get_desc().CopyFrom(ps_param.trainer_param[0])
opt_info["program_id_to_worker"] = {program_id: worker}
main_program._fleet_opt = opt_info main_program._fleet_opt = opt_info
trainer = DistMultiTrainer() trainer = DistMultiTrainer()
@ -90,6 +98,7 @@ class TestListenAndServOp(OpTest):
os.system(cmd) os.system(cmd)
def test_device_work(self): def test_device_work(self):
"""test devicve worker."""
if sys.platform == 'win32' or sys.platform == 'sys.platform': if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass pass
else: else:
@ -133,6 +142,9 @@ class TestListenAndServOp(OpTest):
opt_info["scale_datanorm"] = -1 opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False opt_info["dump_slot"] = False
opt_info["stat_var_names"] = [] opt_info["stat_var_names"] = []
worker = DownpourWorker(None)
worker.get_desc().CopyFrom(ps_param.trainer_param[0])
opt_info["program_id_to_worker"] = {program_id: worker}
main_program._fleet_opt = opt_info main_program._fleet_opt = opt_info
trainer = DistMultiTrainer() trainer = DistMultiTrainer()

Loading…
Cancel
Save