update grpc_variable_response

f7c96f079b
Qiao Longfei 7 years ago
parent d827881502
commit 686d15c8e0

@ -175,6 +175,7 @@ VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
const std::string ep_val = ep;
const std::string in_var_name_val = in_var_name;
const std::string out_var_name_val = out_var_name;
const std::string table_name_val = table_name;
const framework::Scope* p_scope = &scope;
const auto ch = GetChannel(ep_val);
GetProcessor* s = new GetProcessor(ch);
@ -185,12 +186,12 @@ VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
s->Prepare(h, time_out);
framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx,
s, method, h, table_name, this] {
s, method, h, table_name_val, this] {
auto* var = p_scope->FindVar(in_var_name_val);
::grpc::ByteBuffer req;
SerializeToByteBuffer(in_var_name_val, var, *p_ctx, &req, out_var_name_val,
0, table_name);
0, table_name_val);
VLOG(30) << s->GetVarHandlePtr()->String() << " begin";

@ -130,7 +130,8 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
math::set_constant(ctx, tensor, 31.9);
::grpc::ByteBuffer msg;
operators::distributed::SerializeToByteBuffer("myvar", &var, ctx, &msg);
operators::distributed::SerializeToByteBuffer("myvar", &var, ctx, &msg,
"outvar", 0, "table_name");
EXPECT_GT(msg.Length(), static_cast<size_t>(0));
// deserialize

@ -301,6 +301,20 @@ int GRPCVariableResponse::Parse(Source* source) {
meta_.set_trainer_id(trainer_id);
break;
}
case sendrecv::VariableMessage::kTableNameFieldNumber: {
uint32_t length;
if ((wt != WIRETYPE_LENGTH_DELIMITED) || !input.ReadVarint32(&length)) {
return tag;
}
std::string temp;
if (!input.ReadString(&temp, length)) {
return tag;
}
meta_.set_table_name(temp);
break;
}
default: {
// Unknown tag, return unknown error.
return -1;

Loading…
Cancel
Save