|
|
@ -192,8 +192,8 @@ void AsyncCommunicator::SendThread() {
|
|
|
|
auto send_task = [this, &var_name, &var_queue] {
|
|
|
|
auto send_task = [this, &var_name, &var_queue] {
|
|
|
|
VLOG(3) << var_name << " merge and send";
|
|
|
|
VLOG(3) << var_name << " merge and send";
|
|
|
|
std::vector<std::shared_ptr<Variable>> vars;
|
|
|
|
std::vector<std::shared_ptr<Variable>> vars;
|
|
|
|
size_t merged_var_num = 0;
|
|
|
|
int merged_var_num = 0;
|
|
|
|
size_t wait_times = 0;
|
|
|
|
int wait_times = 0;
|
|
|
|
while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
|
|
|
|
while (merged_var_num < FLAGS_communicator_max_merge_var_num) {
|
|
|
|
if (var_queue->Size() == 0) {
|
|
|
|
if (var_queue->Size() == 0) {
|
|
|
|
VLOG(3) << "wait_times -> " << wait_times;
|
|
|
|
VLOG(3) << "wait_times -> " << wait_times;
|
|
|
@ -254,7 +254,7 @@ void AsyncCommunicator::SendThread() {
|
|
|
|
void AsyncCommunicator::RecvThread() {
|
|
|
|
void AsyncCommunicator::RecvThread() {
|
|
|
|
VLOG(3) << "RecvThread start!";
|
|
|
|
VLOG(3) << "RecvThread start!";
|
|
|
|
while (running_) {
|
|
|
|
while (running_) {
|
|
|
|
auto grad_num = grad_num_.load();
|
|
|
|
int grad_num = grad_num_.load();
|
|
|
|
if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
|
|
|
|
if (grad_num > FLAGS_communicator_min_send_grad_num_before_recv) {
|
|
|
|
VLOG(1) << "current grad num " << grad_num;
|
|
|
|
VLOG(1) << "current grad num " << grad_num;
|
|
|
|
RecvAll();
|
|
|
|
RecvAll();
|
|
|
@ -538,7 +538,7 @@ void GeoSgdCommunicator::Send(const std::vector<std::string> &sparse_var_names,
|
|
|
|
int element_number = var_tensor.numel();
|
|
|
|
int element_number = var_tensor.numel();
|
|
|
|
int *var_mutable_data = var_tensor.mutable_data<int>(var_tensor.place());
|
|
|
|
int *var_mutable_data = var_tensor.mutable_data<int>(var_tensor.place());
|
|
|
|
// insert ids which has not been record
|
|
|
|
// insert ids which has not been record
|
|
|
|
for (size_t j = 0; j < element_number; j++) {
|
|
|
|
for (int j = 0; j < element_number; j++) {
|
|
|
|
auto ep_idx = GetSectionIndex(var_mutable_data[j],
|
|
|
|
auto ep_idx = GetSectionIndex(var_mutable_data[j],
|
|
|
|
absolute_section_[sparse_var_tables[i]]);
|
|
|
|
absolute_section_[sparse_var_tables[i]]);
|
|
|
|
ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]);
|
|
|
|
ids_table->at(sparse_var_tables[i])[ep_idx].insert(var_mutable_data[j]);
|
|
|
@ -559,7 +559,7 @@ void GeoSgdCommunicator::SendThread() {
|
|
|
|
std::vector<std::future<void>> task_futures;
|
|
|
|
std::vector<std::future<void>> task_futures;
|
|
|
|
task_futures.reserve(send_varname_to_ctx_.size());
|
|
|
|
task_futures.reserve(send_varname_to_ctx_.size());
|
|
|
|
|
|
|
|
|
|
|
|
size_t wait_times = 0;
|
|
|
|
int wait_times = 0;
|
|
|
|
while (ids_send_vec_.size() < geo_need_push_nums_) {
|
|
|
|
while (ids_send_vec_.size() < geo_need_push_nums_) {
|
|
|
|
VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size();
|
|
|
|
VLOG(4) << "ids_send_vec_ Size: " << ids_send_vec_.size();
|
|
|
|
if (need_push_queue_->Size() > 0) {
|
|
|
|
if (need_push_queue_->Size() > 0) {
|
|
|
@ -747,7 +747,7 @@ void GeoSgdCommunicator::SendUpdateSparseVars(
|
|
|
|
auto cpu_ctx = paddle::platform::CPUDeviceContext();
|
|
|
|
auto cpu_ctx = paddle::platform::CPUDeviceContext();
|
|
|
|
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
|
|
|
|
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
|
|
|
|
float avg = 1 / static_cast<float>(trainer_nums_);
|
|
|
|
float avg = 1 / static_cast<float>(trainer_nums_);
|
|
|
|
for (int y = 0; y < new_rows.size(); y++) {
|
|
|
|
for (size_t y = 0; y < new_rows.size(); y++) {
|
|
|
|
auto ids = new_rows[y];
|
|
|
|
auto ids = new_rows[y];
|
|
|
|
|
|
|
|
|
|
|
|
float *x_val = x_value + ids * row_numel;
|
|
|
|
float *x_val = x_value + ids * row_numel;
|
|
|
@ -876,7 +876,7 @@ void GeoSgdCommunicator::RecvUpdateSparseVars(
|
|
|
|
|
|
|
|
|
|
|
|
auto cpu_ctx = paddle::platform::CPUDeviceContext();
|
|
|
|
auto cpu_ctx = paddle::platform::CPUDeviceContext();
|
|
|
|
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
|
|
|
|
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
|
|
|
|
for (int y = 0; y < new_rows.size(); y++) {
|
|
|
|
for (size_t y = 0; y < new_rows.size(); y++) {
|
|
|
|
std::vector<float> row_delta(row_numel, 0);
|
|
|
|
std::vector<float> row_delta(row_numel, 0);
|
|
|
|
|
|
|
|
|
|
|
|
auto ids = new_rows[y];
|
|
|
|
auto ids = new_rows[y];
|
|
|
|