|
|
|
@ -543,7 +543,7 @@ std::unordered_set<int64_t> GeoSgdCommunicator::SparseIdsMerge(
|
|
|
|
|
const std::string &splited_var_name) {
|
|
|
|
|
// every batch has some sparse id, merge them into one unoredered_set
|
|
|
|
|
VLOG(4) << "Sparse Ids merge var: " << var_name
|
|
|
|
|
<< " splited var: " << splited_var_name;
|
|
|
|
|
<< " split var: " << splited_var_name;
|
|
|
|
|
auto before_run_ids_merge_ = GetCurrentUS();
|
|
|
|
|
auto origin_var_name = DeltaVarToVar(var_name);
|
|
|
|
|
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
|
|
|
|
@ -567,9 +567,8 @@ void GeoSgdCommunicator::SendUpdateDenseVars(
|
|
|
|
|
// var_name: param.delta
|
|
|
|
|
auto origin_var_name = DeltaVarToVar(var_name);
|
|
|
|
|
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
|
|
|
|
|
VLOG(4) << "Dense var: " << var_name
|
|
|
|
|
<< " 's splited var: " << splited_var_name
|
|
|
|
|
<< " splited var index: " << splited_var_index;
|
|
|
|
|
VLOG(4) << "Dense var: " << var_name << " 's split var: " << splited_var_name
|
|
|
|
|
<< " split var index: " << splited_var_index;
|
|
|
|
|
auto before_run_send_dense = GetCurrentUS();
|
|
|
|
|
auto cpu_ctx = paddle::platform::CPUDeviceContext();
|
|
|
|
|
|
|
|
|
@ -592,7 +591,7 @@ void GeoSgdCommunicator::SendUpdateDenseVars(
|
|
|
|
|
begin_loc = absolute_section_[origin_var_name][splited_var_index];
|
|
|
|
|
dimension = total_element / vars_first_dimension_[origin_var_name];
|
|
|
|
|
total_element = section * dimension;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name
|
|
|
|
|
<< " section: " << section << " dimension: " << dimension
|
|
|
|
|
<< " begin loc: " << begin_loc << " total_element "
|
|
|
|
|
<< total_element;
|
|
|
|
@ -600,12 +599,12 @@ void GeoSgdCommunicator::SendUpdateDenseVars(
|
|
|
|
|
|
|
|
|
|
auto *var_x_data = var_x_tensor.mutable_data<float>(var_x_tensor.place()) +
|
|
|
|
|
begin_loc * dimension;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
<< var_x_data[0] << " var_x_data[end] "
|
|
|
|
|
<< var_x_data[total_element - 1];
|
|
|
|
|
auto *var_y_data = var_y_tensor.mutable_data<float>(var_y_tensor.place()) +
|
|
|
|
|
begin_loc * dimension;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
<< var_y_data[0] << " var_y_data[end] "
|
|
|
|
|
<< var_y_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -616,14 +615,14 @@ void GeoSgdCommunicator::SendUpdateDenseVars(
|
|
|
|
|
var_z_tensor->mutable_data<float>(dims, cpu_ctx.GetPlace());
|
|
|
|
|
auto *var_z_data = var_z_tensor->mutable_data<float>(cpu_ctx.GetPlace());
|
|
|
|
|
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << "var_z_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << "var_z_data[0] "
|
|
|
|
|
<< var_z_data[0] << " var_z_data[end] "
|
|
|
|
|
<< var_z_data[total_element - 1];
|
|
|
|
|
|
|
|
|
|
// calc sub = var_training - var_old
|
|
|
|
|
auto blas = math::GetBlas<paddle::platform::CPUDeviceContext, float>(cpu_ctx);
|
|
|
|
|
blas.VSUB(total_element, var_x_data, var_y_data, var_z_data);
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_z_data[0] "
|
|
|
|
|
<< var_z_data[0] << " var_z_data[end] "
|
|
|
|
|
<< var_z_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -633,7 +632,7 @@ void GeoSgdCommunicator::SendUpdateDenseVars(
|
|
|
|
|
|
|
|
|
|
// calc var_old += var_delta
|
|
|
|
|
blas.VADD(total_element, var_y_data, var_z_data, var_y_data);
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
<< var_y_data[0] << " var_y_data[end] "
|
|
|
|
|
<< var_y_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -763,7 +762,7 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(
|
|
|
|
|
section = dims[0];
|
|
|
|
|
begin_loc = absolute_section_[origin_var_name][splited_var_index];
|
|
|
|
|
dimension = total_element / section;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name
|
|
|
|
|
<< " section: " << section << " dimension: " << dimension
|
|
|
|
|
<< " begin loc: " << begin_loc << " total_element "
|
|
|
|
|
<< total_element;
|
|
|
|
@ -771,18 +770,18 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(
|
|
|
|
|
|
|
|
|
|
auto *var_x_data = var_x_tensor.mutable_data<float>(var_x_tensor.place()) +
|
|
|
|
|
begin_loc * dimension;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
<< var_x_data[0] << " var_x_data[end] "
|
|
|
|
|
<< var_x_data[total_element - 1];
|
|
|
|
|
|
|
|
|
|
auto *var_y_data = var_y_tensor.mutable_data<float>(var_y_tensor.place()) +
|
|
|
|
|
begin_loc * dimension;
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
<< var_y_data[0] << " var_y_data[end] "
|
|
|
|
|
<< var_y_data[total_element - 1];
|
|
|
|
|
|
|
|
|
|
auto *var_z_data = var_z_tensor.mutable_data<float>(cpu_ctx.GetPlace());
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_z_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_z_data[0] "
|
|
|
|
|
<< var_z_data[0] << " var_z_data[end] "
|
|
|
|
|
<< var_z_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -793,7 +792,7 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(
|
|
|
|
|
auto *var_y_sub_data =
|
|
|
|
|
var_y_sub_tensor->mutable_data<float>(cpu_ctx.GetPlace());
|
|
|
|
|
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_sub_data[0] "
|
|
|
|
|
<< var_y_sub_data[0] << " var_y_sub_data[end] "
|
|
|
|
|
<< var_y_sub_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -801,19 +800,19 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(
|
|
|
|
|
|
|
|
|
|
// calc sub = pserver - old
|
|
|
|
|
blas.VSUB(total_element, var_z_data, var_y_data, var_y_sub_data);
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_sub_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_sub_data[0] "
|
|
|
|
|
<< var_y_sub_data[0] << " var_y_sub_data[end] "
|
|
|
|
|
<< var_y_sub_data[total_element - 1];
|
|
|
|
|
|
|
|
|
|
// calc train += sub
|
|
|
|
|
blas.VADD(total_element, var_x_data, var_y_sub_data, var_x_data);
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_x_data[0] "
|
|
|
|
|
<< var_x_data[0] << " var_x_data[end] "
|
|
|
|
|
<< var_x_data[total_element - 1];
|
|
|
|
|
|
|
|
|
|
// calc old = pserver
|
|
|
|
|
blas.VCOPY(total_element, var_z_data, var_y_data);
|
|
|
|
|
VLOG(4) << "Dense splited var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
VLOG(4) << "Dense split var: " << splited_var_name << " var_y_data[0] "
|
|
|
|
|
<< var_y_data[0] << " var_y_data[end] "
|
|
|
|
|
<< var_y_data[total_element - 1];
|
|
|
|
|
|
|
|
|
@ -824,7 +823,7 @@ void GeoSgdCommunicator::RecvUpdateDenseVars(
|
|
|
|
|
|
|
|
|
|
void GeoSgdCommunicator::RecvUpdateSparseVars(
|
|
|
|
|
const std::string &var_name, const std::string &splited_var_name) {
|
|
|
|
|
// step 1: recv splited var from pserver
|
|
|
|
|
// step 1: recv split var from pserver
|
|
|
|
|
auto splited_var_index = GetSplitedVarIndex(var_name, splited_var_name);
|
|
|
|
|
auto origin_var_name = DeltaVarToVar(var_name);
|
|
|
|
|
auto origin_splited_var_name = DeltaVarToVar(splited_var_name);
|
|
|
|
|