|
|
|
@ -68,7 +68,7 @@ class SendOp : public framework::OperatorBase {
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < ins.size(); i++) {
|
|
|
|
|
if (NeedSend(scope, ins[i])) {
|
|
|
|
|
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
|
|
|
|
|
VLOG(2) << "sending " << ins[i] << " to " << epmap[i];
|
|
|
|
|
rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]);
|
|
|
|
|
} else {
|
|
|
|
|
VLOG(3) << "don't send no-initialied variable: " << ins[i];
|
|
|
|
@ -77,20 +77,20 @@ class SendOp : public framework::OperatorBase {
|
|
|
|
|
PADDLE_ENFORCE(rpc_client->Wait());
|
|
|
|
|
|
|
|
|
|
for (auto& ep : endpoints) {
|
|
|
|
|
VLOG(3) << "batch barrier, ep: " << ep;
|
|
|
|
|
VLOG(2) << "batch barrier, ep: " << ep;
|
|
|
|
|
rpc_client->AsyncSendBatchBarrier(ep);
|
|
|
|
|
}
|
|
|
|
|
PADDLE_ENFORCE(rpc_client->Wait());
|
|
|
|
|
|
|
|
|
|
if (outs.size() > 0) {
|
|
|
|
|
for (size_t i = 0; i < outs.size(); i++) {
|
|
|
|
|
VLOG(3) << "getting " << outs[i] << " from " << epmap[i];
|
|
|
|
|
VLOG(2) << "getting " << outs[i] << " from " << epmap[i];
|
|
|
|
|
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
|
|
|
|
|
}
|
|
|
|
|
PADDLE_ENFORCE(rpc_client->Wait());
|
|
|
|
|
// tell pservers that current trainer have called fetch
|
|
|
|
|
for (auto& ep : endpoints) {
|
|
|
|
|
VLOG(3) << "send fetch barrier, ep: " << ep;
|
|
|
|
|
VLOG(2) << "send fetch barrier, ep: " << ep;
|
|
|
|
|
rpc_client->AsyncSendFetchBarrier(ep);
|
|
|
|
|
}
|
|
|
|
|
PADDLE_ENFORCE(rpc_client->Wait());
|
|
|
|
|