|
|
|
@ -174,13 +174,13 @@ void AsyncGRPCServer::ShutdownQueue() {
|
|
|
|
|
std::unique_lock<std::mutex> lock(cq_mutex_);
|
|
|
|
|
cq_send_->Shutdown();
|
|
|
|
|
cq_get_->Shutdown();
|
|
|
|
|
is_shut_down_ = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This URL explains why shutdown is complicate:
|
|
|
|
|
void AsyncGRPCServer::ShutDown() {
|
|
|
|
|
server_->Shutdown();
|
|
|
|
|
is_shut_down_ = true;
|
|
|
|
|
ShutdownQueue();
|
|
|
|
|
server_->Shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void AsyncGRPCServer::TryToRegisterNewSendOne() {
|
|
|
|
@ -213,14 +213,14 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq,
|
|
|
|
|
bool ok = false;
|
|
|
|
|
while (true) {
|
|
|
|
|
if (!cq->Next(&tag, &ok)) {
|
|
|
|
|
LOG(INFO) << cq_name << " get CompletionQueue shutdown!";
|
|
|
|
|
LOG(INFO) << cq_name << " CompletionQueue shutdown!";
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PADDLE_ENFORCE(tag);
|
|
|
|
|
// FIXME(typhoonzero): de-couple the barriers with recv_op
|
|
|
|
|
if (cq_name == "cq_get") WaitCond(1);
|
|
|
|
|
if (cq_name == "cq_send") WaitCond(0);
|
|
|
|
|
if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1);
|
|
|
|
|
if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0);
|
|
|
|
|
|
|
|
|
|
RequestBase* base = (RequestBase*)tag;
|
|
|
|
|
// reference:
|
|
|
|
|