add slot to sparse table (#18686)

The change includes 2 things:

1. save delta model and shrink table are control by the same parameter before, now add delete_after_unseen_days to control shrink table.
2. value in sparse table has no slot before, now add slot in sparse table, and add DownpureCtrAccessor to support the new meta.
test=develop
DDDivano-patch-1
Thunderbrook 6 years ago committed by GitHub
parent f0cfc3c3fc
commit d8396281ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -179,6 +179,7 @@ class DownpourWorker : public HogwildWorker {
private:
bool need_to_push_dense_;
bool dump_slot_;
bool need_to_push_sparse_;
DownpourWorkerParameter param_;
// just save the value in param_ for easy access
@ -285,7 +286,6 @@ class SectionWorker : public DeviceWorker {
int section_num_;
int pipeline_num_;
int thread_id_;
// This worker will consume scope from in_scope_queue_
// and produce scope to out_scope_queue_
ScopeQueue* in_scope_queue_ = nullptr;

@ -64,6 +64,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
fleet_ptr_ = FleetWrapper::GetInstance();
fetch_config_ = desc.fetch_config();
use_cvm_ = desc.use_cvm();
dump_slot_ = desc.dump_slot();
}
void DownpourWorker::CollectLabelInfo(size_t table_idx) {
@ -282,7 +283,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
fleet_ptr_->PushSparseVarsWithLabelAsync(
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_);
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_);
timeline.Pause();
push_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
@ -454,7 +456,8 @@ void DownpourWorker::TrainFiles() {
fleet_ptr_->PushSparseVarsWithLabelAsync(
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_);
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_);
}
}

@ -288,19 +288,27 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm) {
const int batch_size, const bool use_cvm, const bool dump_slot) {
#ifdef PADDLE_WITH_PSLIB
int offset = 2;
int slot_offset = 0;
int grad_dim = emb_dim;
int show_index = 0;
int click_index = 1;
if (use_cvm) {
offset = 0;
grad_dim = emb_dim - 2;
}
if (dump_slot) {
slot_offset = 1;
show_index = 1;
click_index = 2;
}
CHECK_GE(grad_dim, 0);
push_values->resize(fea_keys.size() + 1);
for (auto& t : *push_values) {
t.resize(emb_dim + offset);
t.resize(emb_dim + offset + slot_offset);
}
uint64_t fea_idx = 0u;
for (size_t i = 0; i < sparse_key_names.size(); ++i) {
@ -315,7 +323,10 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
}
int len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
int slot = 0;
if (dump_slot) {
slot = boost::lexical_cast<int>(sparse_key_names[i]);
}
Variable* g_var = scope.FindVar(sparse_grad_names[i]);
CHECK(g_var != nullptr) << "var[" << sparse_grad_names[i] << "] not found";
LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>();
@ -339,14 +350,19 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
}
CHECK(fea_idx < (*push_values).size());
CHECK(fea_idx < fea_labels.size());
if (use_cvm) {
memcpy((*push_values)[fea_idx].data() + offset, g,
memcpy((*push_values)[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
} else {
memcpy((*push_values)[fea_idx].data() + offset, g,
memcpy((*push_values)[fea_idx].data() + offset + slot_offset, g,
sizeof(float) * emb_dim);
(*push_values)[fea_idx][0] = 1.0f;
(*push_values)[fea_idx][1] = static_cast<float>(fea_labels[fea_idx]);
(*push_values)[fea_idx][show_index] = 1.0f;
(*push_values)[fea_idx][click_index] =
static_cast<float>(fea_labels[fea_idx]);
}
if (dump_slot) {
(*push_values)[fea_idx][0] = static_cast<float>(slot);
}
g += emb_dim;
fea_idx++;

@ -100,7 +100,7 @@ class FleetWrapper {
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm);
const int batch_size, const bool use_cvm, const bool dump_slot);
// Push sparse variables to server in Async mode
// Param<In>: scope, table_id, fea_keys, sparse_grad_names

@ -33,6 +33,7 @@ message TrainerDesc {
optional bool debug = 6 [ default = false ];
optional FetchConfig fetch_config = 7;
optional bool use_cvm = 8 [ default = false ];
optional bool dump_slot = 9 [ default = false ];
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;

@ -75,7 +75,7 @@ class DownpourServer(Server):
table.type = pslib.PS_SPARSE_TABLE
table.compress_in_save = True
table.shard_num = 1000
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
table.accessor.accessor_class = "DownpourCtrAccessor"
table.accessor.sparse_sgd_param.learning_rate = learning_rate
table.accessor.sparse_sgd_param.initial_g2sum = 3
table.accessor.sparse_sgd_param.initial_range = 1e-4
@ -88,7 +88,8 @@ class DownpourServer(Server):
table.accessor.downpour_accessor_param.click_coeff = 2
table.accessor.downpour_accessor_param.base_threshold = 0.2
table.accessor.downpour_accessor_param.delta_threshold = 0.15
table.accessor.downpour_accessor_param.delta_keep_days = 31
table.accessor.downpour_accessor_param.delta_keep_days = 16
table.accessor.downpour_accessor_param.delete_after_unseen_days = 30
table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
table.accessor.downpour_accessor_param.delete_threshold = 0.8

@ -162,6 +162,10 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["dump_slot"] = False
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class == "DownpourCtrAccessor":
opt_info["dump_slot"] = True
for loss in losses:
loss.block.program._fleet_opt = opt_info

File diff suppressed because one or more lines are too long

@ -17,8 +17,12 @@ from os import path
__all__ = ['TrainerDesc', 'MultiTrainer', 'DistMultiTrainer', 'PipelineTrainer']
# can be initialized from train_desc,
class TrainerDesc(object):
'''
Set proto from python to c++.
Can be initialized from train_desc.
'''
def __init__(self):
'''
self.proto_desc = data_feed_pb2.DataFeedDesc()
@ -71,6 +75,9 @@ class TrainerDesc(object):
def _set_use_cvm(self, use_cvm=False):
self.proto_desc.use_cvm = use_cvm
def _set_dump_slot(self, dump_slot):
self.proto_desc.dump_slot = dump_slot
def _desc(self):
from google.protobuf import text_format
return self.proto_desc.SerializeToString()
@ -81,6 +88,11 @@ class TrainerDesc(object):
class MultiTrainer(TrainerDesc):
'''
Implement of MultiTrainer.
Can be init from TrainerDesc.
'''
def __init__(self):
super(MultiTrainer, self).__init__()
pass

@ -39,5 +39,6 @@ class TrainerFactory(object):
device_worker._set_fleet_desc(opt_info["fleet_desc"])
trainer._set_fleet_desc(opt_info["fleet_desc"])
trainer._set_use_cvm(opt_info["use_cvm"])
trainer._set_dump_slot(opt_info["dump_slot"])
trainer._set_device_worker(device_worker)
return trainer

Loading…
Cancel
Save