|
|
|
@ -43,7 +43,7 @@ void RPCServer::SavePort() const {
|
|
|
|
|
|
|
|
|
|
void RPCServer::WaitBarrier(const std::string& rpc_name) {
|
|
|
|
|
std::unique_lock<std::mutex> lock(this->mutex_);
|
|
|
|
|
barrier_cond_.wait(lock, [=] {
|
|
|
|
|
barrier_cond_.wait(lock, [this, &rpc_name] {
|
|
|
|
|
return (barrier_counter_[rpc_name] >= client_num_ || exit_flag_.load());
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -53,19 +53,23 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) {
|
|
|
|
|
void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) {
|
|
|
|
|
VLOG(3) << "RPCServer begin IncreaseBatchBarrier " << rpc_name;
|
|
|
|
|
int b = 0;
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
|
|
|
b = ++barrier_counter_[rpc_name];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
VLOG(3) << "RPCServer IncreaseBatchBarrier " << rpc_name
|
|
|
|
|
<< ", barrier_count:" << b << ", fan_in" << client_num_;
|
|
|
|
|
|
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
|
|
|
b = ++barrier_counter_[rpc_name];
|
|
|
|
|
if (b >= client_num_) {
|
|
|
|
|
lock.unlock();
|
|
|
|
|
barrier_cond_.notify_all();
|
|
|
|
|
lock.lock();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RPCServer::DecreaseClientNum() {
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
|
|
|
client_num_--;
|
|
|
|
|
}
|
|
|
|
|
barrier_cond_.notify_all();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void RPCServer::ResetBarrierCounter() {
|
|
|
|
|
VLOG(3) << "RPCServer ResetBarrierCounter ";
|
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
|
|
|
|