|
|
|
@ -35,9 +35,17 @@ void GRPCClient::InitEventLoop() {
|
|
|
|
|
client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GRPCClient::SendComplete() {
|
|
|
|
|
void GRPCClient::SendBeginPass() {
|
|
|
|
|
for (auto& it : channels_) {
|
|
|
|
|
this->AsyncSendComplete(it.first);
|
|
|
|
|
VLOG(3) << "send begin pass to: " it.first;
|
|
|
|
|
this->AsyncSendBeginPass(it.first);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GRPCClient::SendEndPass() {
|
|
|
|
|
for (auto& it : channels_) {
|
|
|
|
|
VLOG(3) << "send end pass to " << it.first;
|
|
|
|
|
this->AsyncSendEndPass(it.first);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -226,19 +234,32 @@ void GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
|
|
|
|
|
req_count_++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GRPCClient::AsyncSendComplete(const std::string& ep, int64_t time_out) {
|
|
|
|
|
void GRPCClient::AsyncSendBeginPass(const std::string& ep, int64_t time_out) {
|
|
|
|
|
const auto ch = GetChannel(ep);
|
|
|
|
|
|
|
|
|
|
BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
|
|
|
|
|
s->Prepare(time_out);
|
|
|
|
|
|
|
|
|
|
sendrecv::VariableMessage req;
|
|
|
|
|
req.set_varname(COMPLETE_MESSAGE);
|
|
|
|
|
req.set_varname(BEGIN_PASS_MESSAGE);
|
|
|
|
|
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
|
|
|
|
|
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
|
|
|
|
|
req_count_++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GRPCClient::AsyncSendEndPass(const std::string& ep, int64_t time_out) {
|
|
|
|
|
const auto ch = GetChannel(ep);
|
|
|
|
|
|
|
|
|
|
FetchBarrierProcessor* s = new FetchBarrierProcessor(ch);
|
|
|
|
|
s->Prepare(time_out);
|
|
|
|
|
|
|
|
|
|
sendrecv::VariableMessage req;
|
|
|
|
|
req.set_varname(END_PASS_MESSAGE);
|
|
|
|
|
auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
|
|
|
|
|
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
|
|
|
|
|
req_count_++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GRPCClient::AsyncCheckpointNotify(const std::string& ep,
|
|
|
|
|
const std::string& dir,
|
|
|
|
|
int64_t time_out) {
|
|
|
|
|