From dbca7f166ddb62fbe3bd5dc50230f6347f0863ca Mon Sep 17 00:00:00 2001
From: gongweibao <weibao.gong@gmail.com>
Date: Fri, 22 Jun 2018 02:37:53 -0500
Subject: [PATCH] tune logs (#11649)

---
 .../operators/distributed/grpc_client.cc      | 15 ++++++++--
 .../fluid/operators/distributed/grpc_client.h |  6 +++-
 .../operators/distributed/grpc_server.cc      | 28 +++++++++++++------
 .../distributed/variable_response.cc          |  4 +++
 4 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc
index 65d63784c1..52f931188d 100644
--- a/paddle/fluid/operators/distributed/grpc_client.cc
+++ b/paddle/fluid/operators/distributed/grpc_client.cc
@@ -18,6 +18,7 @@ limitations under the License. */
 
 #include <limits>
 
+#include "glog/logging.h"  // For VLOG
 #include "paddle/fluid/framework/threadpool.h"
 #include "paddle/fluid/operators/distributed/request_handler.h"
 #include "paddle/fluid/platform/profiler.h"
@@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep,
     var_h.scope = p_scope;
     var_h.name = var_name_val;
     var_h.ctx = p_ctx;
+    var_h.method = "Send";
+
+    VLOG(3) << var_h.String() << " begin";
 
     // stub context
     SendProcessor* s = new SendProcessor(ch);
@@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep,
     var_h.scope = p_scope;
     var_h.name = var_name_val;
     var_h.ctx = p_ctx;
+    var_h.method = "Get";
+
+    VLOG(3) << var_h.String() << " begin";
 
     // stub context
     GetProcessor* s = new GetProcessor(ch);
@@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep,
     var_h.scope = p_scope;
     var_h.name = out_var_name_val;
     var_h.ctx = p_ctx;
+    var_h.method = "Prefetch";
+
+    VLOG(3) << var_h.String() << " begin";
 
     // stub context
     GetProcessor* s = new GetProcessor(ch);
@@ -243,10 +253,11 @@ void GRPCClient::Proceed() {
     GPR_ASSERT(ok);
     PADDLE_ENFORCE(c);
     if (c->status_.ok()) {
+      VLOG(3) << c->var_h_.String() << " process";
       c->Process();
     } else {
-      LOG(FATAL) << "var: " << c->var_h_.String()
-                 << " grpc error:" << c->status_.error_message();
+      LOG(FATAL) << c->var_h_.String()
+                 << " meets grpc error:" << c->status_.error_message();
     }
     delete c;
     {
diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h
index a6efa7dfd1..7875939ff5 100644
--- a/paddle/fluid/operators/distributed/grpc_client.h
+++ b/paddle/fluid/operators/distributed/grpc_client.h
@@ -47,14 +47,18 @@ namespace operators {
 namespace distributed {
 
 struct VarHandle {
+  // RPC endpoint.
   std::string ep;
   const platform::DeviceContext* ctx;
   const framework::Scope* scope;
+  // Variable name.
   std::string name;
+  // RPC method name.
+  std::string method;
 
   std::string String() const {
     std::ostringstream s;
-    s << "name:[" << name << "] ep:[" << ep << "]";
+    s << method << " name:[" << name << "], ep:[" << ep << "]";
     return s.str();
   }
 };
diff --git a/paddle/fluid/operators/distributed/grpc_server.cc b/paddle/fluid/operators/distributed/grpc_server.cc
index 707f665a29..b9a9b12cec 100644
--- a/paddle/fluid/operators/distributed/grpc_server.cc
+++ b/paddle/fluid/operators/distributed/grpc_server.cc
@@ -41,6 +41,19 @@ class RequestBase {
   virtual ~RequestBase() {}
   virtual void Process() = 0;
 
+  std::string Status2String(const std::string& method) {
+    std::string status = "Process";
+    if (status_ == FINISH) {
+      status = "Finish";
+    }
+
+    std::ostringstream s;
+    s << method << " name:[" << GetReqName() << "]"
+      << ", ep:[" << ctx_.peer() << "]"
+      << " " << status << " using req_id:" << req_id_;
+    return s.str();
+  }
+
   CallStatus Status() const {
     std::lock_guard<std::mutex> l(status_mu_);
     return status_;
@@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
                                           int req_id) {
   std::unique_lock<std::mutex> lock(cq_mutex_);
   if (is_shut_down_) {
-    VLOG(3) << "shutdown, do not TryToRegisterNewSendOne";
+    LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne";
     return;
   }
 
@@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest(
   bool ok = false;
 
   while (true) {
-    VLOG(3) << "HandleRequest " << rpc_name << " wait next";
+    VLOG(4) << "HandleRequest " << rpc_name << " wait next";
     if (!cq->Next(&tag, &ok)) {
       LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!";
       break;
     }
 
     int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag));
-    VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id
+    VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id
             << " get next";
 
     auto& reqs = rpc_reqs_[rpc_name];
@@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest(
       base = reqs[req_id];
     }
 
+    VLOG(3) << base->Status2String(rpc_name);
+
     // reference:
     // https://github.com/tensorflow/tensorflow/issues/5596
     // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM
     // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I
     if (!ok) {
       LOG(WARNING) << "completion queue:" << rpc_name
-                   << " recv no regular event:argument name["
-                   << base->GetReqName() << "]";
+                   << " recv no regular event"
+                   << " context:" << base->Status2String(rpc_name);
       TryToRegisterNewOne(rpc_name, req_id);
       delete base;
       continue;
     }
 
-    VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id
-            << ", status:" << base->Status();
-
     switch (base->Status()) {
       case PROCESS: {
         base->Process();
diff --git a/paddle/fluid/operators/distributed/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc
index 619890b193..45832c60bf 100644
--- a/paddle/fluid/operators/distributed/variable_response.cc
+++ b/paddle/fluid/operators/distributed/variable_response.cc
@@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
       if (total_written + size_to_write > length) {
         size_to_write = length - total_written;
       }
+      // This log is useful to see how long a internal block size is of rpc.
+      VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
       memory::Copy(boost::get<platform::CUDAPlace>(place),
                    reinterpret_cast<void*>(p), cpu, data, size_to_write,
                    gpu_dev_ctx.stream());
@@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
     }
     // TODO(gongwb): can we avoid copy?
     platform::CPUPlace cpu;
+    // This log is useful to see how long a internal block size is of rpc.
+    VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
     memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);
 
     p += size_to_write;