From 0d3d4ae775ee822d8352ea6e7395383f9a298631 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 14:59:17 +0800 Subject: [PATCH 1/9] refine prefetch logic --- paddle/fluid/operators/listen_and_serv_op.cc | 7 +- paddle/fluid/operators/listen_and_serv_op.h | 2 +- .../fluid/transpiler/distribute_transpiler.py | 107 ++++++++++-------- 3 files changed, 64 insertions(+), 52 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 66d31c8895..272d7905eb 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -248,7 +248,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, request_prefetch_handler_.get()); auto *optimize_block = Attr(kOptimizeBlock); - auto *prefetch_block = Attr(kPrefetchBlock); + auto grad_to_block_id_str = Attr>(kPrefetchBlock); + framework::BlockDesc *prefetch_block = nullptr; auto *program = optimize_block->Program(); framework::Executor executor(dev_place); @@ -302,8 +303,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr(kOptimizeBlock, "BlockID to run on server side."); - AddAttr(kPrefetchBlock, - "prefetch block to run on server side."); + AddAttr>(kPrefetchBlock, + "prefetch block to run on server side."); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 87952cb0e6..db3582d9b4 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -30,7 +30,7 @@ namespace paddle { namespace operators { constexpr char kOptimizeBlock[] = "OptimizeBlock"; -constexpr char kPrefetchBlock[] = "PrefetchBlock"; +constexpr char kPrefetchBlock[] = "prefetch_var_name_to_block_id"; void RunServer(std::shared_ptr service); diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index c7ab300e0f..dfe990a728 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -515,21 +515,20 @@ class DistributeTranspiler: grad_to_block_id, None) # process distributed lookup_table - prefetch_block = None + prefetch_var_name_to_block_id = [] if self.has_distributed_lookup_table: pserver_index = self.pserver_endpoints.index(endpoint) table_opt_block = self._create_table_optimize_block( pserver_index, pserver_program, pre_block_idx, grad_to_block_id) - prefetch_block = self._create_prefetch_block( + prefetch_var_name_to_block_id = self._create_prefetch_block( pserver_index, pserver_program, table_opt_block) # NOTE: if has_distributed_lookup_table is False, then prefetch_block will # not be executed, so it's safe to use optimize_block to hold the place if self.has_distributed_lookup_table: - assert prefetch_block is not None + assert len(prefetch_var_name_to_block_id) > 0 else: - assert prefetch_block is None - prefetch_block = pserver_program.global_block() + assert len(prefetch_var_name_to_block_id) == 0 # step5 append the listen_and_serv op pserver_program.global_block().append_op( @@ -540,7 +539,7 @@ class DistributeTranspiler: "OptimizeBlock": pserver_program.block(1), "endpoint": endpoint, "Fanin": self.trainer_num, - "PrefetchBlock": prefetch_block, + "prefetch_var_name_to_block_id": prefetch_var_name_to_block_id, "sync_mode": self.sync_mode, "grad_to_block_id": grad_to_block_id }) @@ -608,8 +607,15 @@ class DistributeTranspiler: def _replace_lookup_table_op_with_prefetch(self, program, pserver_endpoints): # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op - self.prefetch_input_vars = None - self.prefetch_output_vars = None + # self.all_prefetch_input_vars = + # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] + # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] + self.all_prefetch_input_vars = [] + + # self.all_prefetch_input_vars = + # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] + # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] + self.all_prefetch_output_vars = [] continue_search_lookup_table_op = True while continue_search_lookup_table_op: @@ -623,18 +629,19 @@ class DistributeTranspiler: ids_name = op.input("Ids") out_name = op.output("Out") - if self.prefetch_input_vars is None: - ids_var = program.global_block().vars[ids_name[0]] - self.prefetch_input_vars = self.create_splited_vars( - source_var=ids_var, - block=program.global_block(), - tag="_prefetch_in_") - if self.prefetch_output_vars is None: - out_var = program.global_block().vars[out_name[0]] - self.prefetch_output_vars = self.create_splited_vars( - source_var=out_var, - block=program.global_block(), - tag="_prefetch_out_") + ids_var = program.global_block().vars[ids_name[0]] + prefetch_input_vars = self.create_splited_vars( + source_var=ids_var, + block=program.global_block(), + tag="_prefetch_in_") + self.all_prefetch_input_vars.append(prefetch_input_vars) + + out_var = program.global_block().vars[out_name[0]] + prefetch_output_vars = self.create_splited_vars( + source_var=out_var, + block=program.global_block(), + tag="_prefetch_out_") + self.all_prefetch_output_vars.append(prefetch_output_vars) # insert split_ids_op program.global_block().insert_op( @@ -646,14 +653,14 @@ class DistributeTranspiler: for varname in ids_name ] }, - outputs={"Out": self.prefetch_input_vars}) + outputs={"Out": prefetch_input_vars}) # insert prefetch_op program.global_block().insert_op( index=op_index + 1, type="prefetch", - inputs={'X': self.prefetch_input_vars}, - outputs={"Out": self.prefetch_output_vars}, + inputs={'X': prefetch_input_vars}, + outputs={"Out": prefetch_output_vars}, attrs={ "epmap": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE @@ -663,7 +670,7 @@ class DistributeTranspiler: program.global_block().insert_op( index=op_index + 2, type="concat", - inputs={'X': self.prefetch_output_vars}, + inputs={'X': prefetch_output_vars}, outputs={ "Out": [ program.global_block().vars[varname] @@ -709,30 +716,34 @@ class DistributeTranspiler: optimize_block): # STEP: create prefetch block table_var = pserver_program.global_block().vars[self.table_name] - prefetch_block = pserver_program.create_block(optimize_block.idx) - trainer_ids = self.prefetch_input_vars[pserver_index] - pserver_ids = pserver_program.global_block().create_var( - name=trainer_ids.name, - type=trainer_ids.type, - shape=trainer_ids.shape, - dtype=trainer_ids.dtype) - trainer_out = self.prefetch_output_vars[pserver_index] - pserver_out = pserver_program.global_block().create_var( - name=trainer_out.name, - type=trainer_out.type, - shape=trainer_out.shape, - dtype=trainer_out.dtype) - prefetch_block.append_op( - type="lookup_sparse_table", - inputs={'Ids': pserver_ids, - "W": table_var}, - outputs={"Out": pserver_out}, - attrs={ - "is_sparse": True, # has no effect on lookup_table op - "is_distributed": True, - "padding_idx": -1 - }) - return prefetch_block + prefetch_var_name_to_block_id = [] + for index in range(len(self.all_prefetch_input_vars)): + prefetch_block = pserver_program.create_block(optimize_block.idx) + trainer_ids = self.all_prefetch_input_vars[index][pserver_index] + pserver_ids = pserver_program.global_block().create_var( + name=trainer_ids.name, + type=trainer_ids.type, + shape=trainer_ids.shape, + dtype=trainer_ids.dtype) + trainer_out = self.all_prefetch_output_vars[index][pserver_index] + pserver_out = pserver_program.global_block().create_var( + name=trainer_out.name, + type=trainer_out.type, + shape=trainer_out.shape, + dtype=trainer_out.dtype) + prefetch_block.append_op( + type="lookup_sparse_table", + inputs={'Ids': pserver_ids, + "W": table_var}, + outputs={"Out": pserver_out}, + attrs={ + "is_sparse": True, # has no effect on lookup_table op + "is_distributed": True, + "padding_idx": -1 + }) + prefetch_var_name_to_block_id.append(trainer_ids.name + ":" + str( + prefetch_block.idx)) + return prefetch_var_name_to_block_id def _create_table_optimize_block(self, pserver_index, pserver_program, pre_block_idx, grad_to_block_id): From 4e36c0ecab5ea9c8b3445f475e289532938e48ac Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 16:58:51 +0800 Subject: [PATCH 2/9] update prefetch logic in grpc_server --- paddle/fluid/operators/detail/grpc_server.cc | 12 +-- .../operators/detail/grpc_server_test.cc | 10 ++- .../fluid/operators/detail/request_handler.h | 17 ++-- .../operators/detail/request_handler_impl.cc | 3 +- paddle/fluid/operators/listen_and_serv_op.cc | 85 ++++++++++++------- paddle/fluid/operators/listen_and_serv_op.h | 5 +- 6 files changed, 86 insertions(+), 46 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 57867aad4d..5c2979222a 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -155,16 +155,18 @@ class RequestPrefetch final : public RequestBase { void Process() override { // prefetch process... - std::string varname = request_->OutVarname(); - VLOG(3) << "RequestPrefetch " << varname; + std::string in_var_name = request_->Varname(); + std::string out_var_name = request_->OutVarname(); + VLOG(3) << "in_var_name: " << in_var_name + << " RequestPrefetch: " << out_var_name; auto scope = request_->GetMutableLocalScope(); - auto invar = scope->FindVar(varname); + auto invar = scope->FindVar(in_var_name); framework::Variable* outvar = nullptr; - request_handler_->Handle(varname, scope, invar, &outvar); + request_handler_->Handle(in_var_name, scope, invar, &outvar); - SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(), + SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); responder_.Finish(reply_, ::grpc::Status::OK, reinterpret_cast(static_cast(req_id_))); diff --git a/paddle/fluid/operators/detail/grpc_server_test.cc b/paddle/fluid/operators/detail/grpc_server_test.cc index a1f9ba15e6..e6a33903ad 100644 --- a/paddle/fluid/operators/detail/grpc_server_test.cc +++ b/paddle/fluid/operators/detail/grpc_server_test.cc @@ -99,11 +99,17 @@ void StartServer() { framework::Executor exe(place); platform::CPUDeviceContext ctx(place); auto* block = AppendPrefetchBlcok(&program); - auto prepared = exe.Prepare(program, block->ID()); + std::string in_var_name("ids"); + std::vector prefetch_block_ids{block->ID()}; + auto prepared = exe.Prepare(program, prefetch_block_ids); InitTensorsOnServer(&scope, &place, 10); + std::unordered_map> + prefetch_var_name_to_prepared; + prefetch_var_name_to_prepared[in_var_name] = prepared[0]; g_req_handler->SetProgram(&program); - g_req_handler->SetPrefetchPreparedCtx(std::move(prepared)); + g_req_handler->SetPrefetchPreparedCtx(&prefetch_var_name_to_prepared); g_req_handler->SetDevCtx(&ctx); g_req_handler->SetScope(&scope); g_req_handler->SetExecutor(&exe); diff --git a/paddle/fluid/operators/detail/request_handler.h b/paddle/fluid/operators/detail/request_handler.h index d74206aaba..373a6aaa09 100644 --- a/paddle/fluid/operators/detail/request_handler.h +++ b/paddle/fluid/operators/detail/request_handler.h @@ -57,9 +57,12 @@ class RequestHandler { void SetDevCtx(const platform::DeviceContext* dev_ctx) { dev_ctx_ = dev_ctx; } void SetProgram(framework::ProgramDesc* program) { program_ = program; } void SetExecutor(framework::Executor* executor) { executor_ = executor; } + + // Used for dist lookup table prefetch void SetPrefetchPreparedCtx( - std::unique_ptr prepared) { - prefetch_ctx_.reset(prepared.release()); + std::unordered_map< + std::string, std::shared_ptr>* g) { + prefetch_var_name_to_prepared_ctx_ = g; } // Used for async. @@ -75,9 +78,6 @@ class RequestHandler { bool sync_mode() { return sync_mode_; } framework::Scope* scope() { return scope_; } const platform::DeviceContext* dev_ctx() { return dev_ctx_; } - framework::ExecutorPrepareContext* prefetch_ctx() { - return prefetch_ctx_.get(); - } framework::ProgramDesc* program() { return program_; } framework::Executor* executor() { return executor_; } @@ -106,12 +106,17 @@ class RequestHandler { framework::Executor* executor_; framework::Scope* scope_; framework::ProgramDesc* program_; - std::unique_ptr prefetch_ctx_; + + // used for distribute lookup table prefetch + std::unordered_map>* + prefetch_var_name_to_prepared_ctx_; // Used for async. std::unordered_map>* grad_to_prepared_ctx_; + RPCServer* rpc_server_; }; diff --git a/paddle/fluid/operators/detail/request_handler_impl.cc b/paddle/fluid/operators/detail/request_handler_impl.cc index 9473dce550..dc28740bf0 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.cc +++ b/paddle/fluid/operators/detail/request_handler_impl.cc @@ -111,7 +111,8 @@ bool RequestPrefetchHandler::Handle(const std::string& varname, auto var_desc = program_->Block(0).FindVar(varname); *outvar = scope->FindVar(varname); InitializeVariable(*outvar, var_desc->GetType()); - executor_->RunPreparedContext(prefetch_ctx_.get(), scope); + executor_->RunPreparedContext( + (*prefetch_var_name_to_prepared_ctx_)[varname].get(), scope); return true; } diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 272d7905eb..4d35caff2c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -89,16 +89,19 @@ void ListenAndServOp::SavePort() const { rpc_service_->SavePort(); } -void ListenAndServOp::RunSyncLoop(framework::Executor *executor, - framework::ProgramDesc *program, - framework::Scope *recv_scope, - framework::BlockDesc *prefetch_block) const { +void ListenAndServOp::RunSyncLoop( + framework::Executor *executor, framework::ProgramDesc *program, + framework::Scope *recv_scope, + const std::vector &prefetch_block_id_list) const { + // FIXME(qiao) run should not run the block to do prefetch, currently prefetch + // block + // can only be at the last blocks of the program size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); std::vector block_list; - for (size_t blkid = 1; blkid < num_blocks; ++blkid) { + for (size_t blkid = 1; blkid < prefetch_block_id_list[0]; ++blkid) { block_list.push_back(blkid); } auto optimize_prepared = executor->Prepare(*program, block_list); @@ -128,16 +131,14 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, std::vector parallel_blkids; parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); - for (size_t blkid = 2; blkid < num_blocks; ++blkid) { - if (blkid != static_cast(prefetch_block->ID())) { - if (program->Block(blkid).Parent() != last_parent_blkid) { - ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, - program, recv_scope); - parallel_blkids.clear(); - last_parent_blkid = program->Block(blkid).Parent(); - } - parallel_blkids.push_back(blkid); + for (size_t blkid = 2; blkid < prefetch_block_id_list[0]; ++blkid) { + if (program->Block(blkid).Parent() != last_parent_blkid) { + ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, + program, recv_scope); + parallel_blkids.clear(); + last_parent_blkid = program->Block(blkid).Parent(); } + parallel_blkids.push_back(blkid); } ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -203,18 +204,19 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, } // while(true) } -static void FillRequestCtx(detail::RequestHandler *h, framework::Scope *scope, - platform::DeviceContext *dev_ctx, - framework::Executor *executor, - framework::ProgramDesc *program, - framework::ExecutorPrepareContext *prefetch_ctx, - detail::RPCServer *rpc_server) { +static void FillRequestCtx( + detail::RequestHandler *h, framework::Scope *scope, + platform::DeviceContext *dev_ctx, framework::Executor *executor, + framework::ProgramDesc *program, + std::unordered_map> + *prefetch_ctx, + detail::RPCServer *rpc_server) { h->SetScope(scope); h->SetDevCtx(dev_ctx); h->SetExecutor(executor); h->SetProgram(program); - h->SetPrefetchPreparedCtx( - std::unique_ptr(prefetch_ctx)); + h->SetPrefetchPreparedCtx(prefetch_ctx); h->SetRPCServer(rpc_server); } @@ -248,18 +250,41 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, request_prefetch_handler_.get()); auto *optimize_block = Attr(kOptimizeBlock); - auto grad_to_block_id_str = Attr>(kPrefetchBlock); - framework::BlockDesc *prefetch_block = nullptr; auto *program = optimize_block->Program(); framework::Executor executor(dev_place); // prepare for prefetch - VLOG(3) << "prefetch block id is " << prefetch_block->ID(); - auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); + std::vector prefetch_block_id_list; + std::unordered_map block_id_to_prefetch_var_name; + + auto prefetch_var_name_to_block_id_str = + Attr>(kPrefetchVarNameToBlockId); + for (const auto &prefetch_var_name_and_id : + prefetch_var_name_to_block_id_str) { + std::vector pieces; + split(prefetch_var_name_and_id, ':', &pieces); + VLOG(3) << "after split, grad = " << pieces[0] << ", id=" << pieces[1]; + PADDLE_ENFORCE_EQ(pieces.size(), 2); + + int block_id = std::stoi(pieces[1]); + prefetch_block_id_list.push_back(block_id); + block_id_to_prefetch_var_name[block_id] = pieces[0]; + } + + auto prefetch_prepared = executor.Prepare(*program, prefetch_block_id_list); + + std::unordered_map> + prefetch_var_name_to_prepared_ctx; + for (int i = 0; i < prefetch_block_id_list.size(); ++i) { + auto block_id = prefetch_block_id_list[i]; + auto prefetch_var_name = block_id_to_prefetch_var_name[block_id]; + prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i]; + } auto f = std::bind(FillRequestCtx, std::placeholders::_1, &recv_scope, - &dev_ctx, &executor, program, prefetch_prepared.release(), - rpc_service_.get()); + &dev_ctx, &executor, program, + &prefetch_var_name_to_prepared_ctx, rpc_service_.get()); f(request_send_handler_.get()); f(request_get_handler_.get()); @@ -277,7 +302,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // Write to a file of server selected port for python use. SavePort(); if (sync_mode) { - RunSyncLoop(&executor, program, &recv_scope, prefetch_block); + RunSyncLoop(&executor, program, &recv_scope, prefetch_block_id_list); } else { RunAsyncLoop(&executor, program); } @@ -303,7 +328,7 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); AddAttr(kOptimizeBlock, "BlockID to run on server side."); - AddAttr>(kPrefetchBlock, + AddAttr>(kPrefetchVarNameToBlockId, "prefetch block to run on server side."); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index db3582d9b4..46c3a19e20 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -18,6 +18,7 @@ limitations under the License. */ #include #include #include +#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -30,7 +31,7 @@ namespace paddle { namespace operators { constexpr char kOptimizeBlock[] = "OptimizeBlock"; -constexpr char kPrefetchBlock[] = "prefetch_var_name_to_block_id"; +constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; void RunServer(std::shared_ptr service); @@ -46,7 +47,7 @@ class ListenAndServOp : public framework::OperatorBase { void RunSyncLoop(framework::Executor* executor, framework::ProgramDesc* program, framework::Scope* recv_scope, - framework::BlockDesc* prefetch_block) const; + const std::vector& prefetch_block_id_list) const; void RunAsyncLoop(framework::Executor* executor, framework::ProgramDesc* program) const; From 8fb78f6c07db44e5bece996ca34e9429669e3466 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 17:39:50 +0800 Subject: [PATCH 3/9] fix grpc_server_test --- paddle/fluid/operators/detail/grpc_server.cc | 7 ++++--- paddle/fluid/operators/detail/request_handler.h | 4 ++-- .../fluid/operators/detail/request_handler_impl.cc | 12 +++++++----- paddle/fluid/operators/detail/request_handler_impl.h | 9 ++++++--- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 5c2979222a..82d422d110 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -158,13 +158,14 @@ class RequestPrefetch final : public RequestBase { std::string in_var_name = request_->Varname(); std::string out_var_name = request_->OutVarname(); VLOG(3) << "in_var_name: " << in_var_name + << "out_var_name: " << out_var_name << " RequestPrefetch: " << out_var_name; auto scope = request_->GetMutableLocalScope(); auto invar = scope->FindVar(in_var_name); - framework::Variable* outvar = nullptr; + framework::Variable* outvar = scope->FindVar(out_var_name); - request_handler_->Handle(in_var_name, scope, invar, &outvar); + request_handler_->Handle(in_var_name, scope, invar, &outvar, out_var_name); SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); @@ -284,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, } else if (rpc_name == kRequestPrefetch) { b = new RequestPrefetch(&service_, cq.get(), handler, req_id); } else { - PADDLE_ENFORCE(false, "not surpported rpc"); + PADDLE_ENFORCE(false, "not supported rpc"); } reqs[req_id] = b; diff --git a/paddle/fluid/operators/detail/request_handler.h b/paddle/fluid/operators/detail/request_handler.h index 373a6aaa09..e133df4896 100644 --- a/paddle/fluid/operators/detail/request_handler.h +++ b/paddle/fluid/operators/detail/request_handler.h @@ -96,8 +96,8 @@ class RequestHandler { // *request_handler_->dev_ctx(), &reply_); // } virtual bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, - framework::Variable** outvar) = 0; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") = 0; protected: const bool sync_mode_; diff --git a/paddle/fluid/operators/detail/request_handler_impl.cc b/paddle/fluid/operators/detail/request_handler_impl.cc index dc28740bf0..0f42daa5bc 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.cc +++ b/paddle/fluid/operators/detail/request_handler_impl.cc @@ -33,7 +33,8 @@ namespace detail { bool RequestSendHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestSendHandler:" << varname; // Async @@ -82,7 +83,8 @@ void RequestSendHandler::ResetSparseVarRecorder() { bool RequestGetHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestGetHandler:" << varname; if (varname != FETCH_BARRIER_MESSAGE) { @@ -105,11 +107,11 @@ bool RequestGetHandler::Handle(const std::string& varname, bool RequestPrefetchHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, - framework::Variable** outvar) { + framework::Variable** outvar, + const std::string& out_var_name) { VLOG(4) << "RequestPrefetchHandler " << varname; - auto var_desc = program_->Block(0).FindVar(varname); - *outvar = scope->FindVar(varname); + auto var_desc = program_->Block(0).FindVar(out_var_name); InitializeVariable(*outvar, var_desc->GetType()); executor_->RunPreparedContext( (*prefetch_var_name_to_prepared_ctx_)[varname].get(), scope); diff --git a/paddle/fluid/operators/detail/request_handler_impl.h b/paddle/fluid/operators/detail/request_handler_impl.h index 443d951914..67bf277b24 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.h +++ b/paddle/fluid/operators/detail/request_handler_impl.h @@ -40,7 +40,8 @@ class RequestSendHandler final : public RequestHandler { explicit RequestSendHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestSendHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; void ResetSparseVarRecorder(); private: @@ -53,7 +54,8 @@ class RequestGetHandler final : public RequestHandler { explicit RequestGetHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestGetHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; }; class RequestPrefetchHandler final : public RequestHandler { @@ -61,7 +63,8 @@ class RequestPrefetchHandler final : public RequestHandler { explicit RequestPrefetchHandler(bool sync_mode) : RequestHandler(sync_mode) {} virtual ~RequestPrefetchHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, - framework::Variable* var, framework::Variable** outvar) override; + framework::Variable* var, framework::Variable** outvar, + const std::string& out_var_name = "") override; }; } // namespace detail From 5aba10b5851d8a777a37c2f6fc95912f3f59cdf2 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 18:52:35 +0800 Subject: [PATCH 4/9] set the thread pool of prefetch to 1 to fix a bug --- paddle/fluid/operators/detail/grpc_server.cc | 1 - paddle/fluid/operators/listen_and_serv_op.cc | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 82d422d110..487abffefe 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -158,7 +158,6 @@ class RequestPrefetch final : public RequestBase { std::string in_var_name = request_->Varname(); std::string out_var_name = request_->OutVarname(); VLOG(3) << "in_var_name: " << in_var_name - << "out_var_name: " << out_var_name << " RequestPrefetch: " << out_var_name; auto scope = request_->GetMutableLocalScope(); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 4d35caff2c..5cd95f8efe 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -247,7 +247,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->RegisterRPC(detail::kRequestSend, request_send_handler_.get()); rpc_service_->RegisterRPC(detail::kRequestGet, request_get_handler_.get()); rpc_service_->RegisterRPC(detail::kRequestPrefetch, - request_prefetch_handler_.get()); + request_prefetch_handler_.get(), 1); auto *optimize_block = Attr(kOptimizeBlock); auto *program = optimize_block->Program(); @@ -263,7 +263,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, prefetch_var_name_to_block_id_str) { std::vector pieces; split(prefetch_var_name_and_id, ':', &pieces); - VLOG(3) << "after split, grad = " << pieces[0] << ", id=" << pieces[1]; + VLOG(3) << "after split, prefetch_var = " << pieces[0] + << ", id=" << pieces[1]; PADDLE_ENFORCE_EQ(pieces.size(), 2); int block_id = std::stoi(pieces[1]); From 7f4b9656a481617e832935abd30eaf0a2d13e100 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 19:57:49 +0800 Subject: [PATCH 5/9] set status before Finish in prefetch process --- paddle/fluid/operators/detail/grpc_server.cc | 2 +- paddle/fluid/operators/listen_and_serv_op.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 487abffefe..16c30da0a7 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -168,9 +168,9 @@ class RequestPrefetch final : public RequestBase { SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); + status_ = FINISH; responder_.Finish(reply_, ::grpc::Status::OK, reinterpret_cast(static_cast(req_id_))); - status_ = FINISH; } protected: diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 5cd95f8efe..dfa4504741 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -247,7 +247,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->RegisterRPC(detail::kRequestSend, request_send_handler_.get()); rpc_service_->RegisterRPC(detail::kRequestGet, request_get_handler_.get()); rpc_service_->RegisterRPC(detail::kRequestPrefetch, - request_prefetch_handler_.get(), 1); + request_prefetch_handler_.get()); auto *optimize_block = Attr(kOptimizeBlock); auto *program = optimize_block->Program(); From ea106c91e0184d30e547e2ecb305c401fe0f2c92 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 20:11:59 +0800 Subject: [PATCH 6/9] optimize comment and code --- paddle/fluid/operators/listen_and_serv_op.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index dfa4504741..84a1d5e3a5 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,15 +93,16 @@ void ListenAndServOp::RunSyncLoop( framework::Executor *executor, framework::ProgramDesc *program, framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { - // FIXME(qiao) run should not run the block to do prefetch, currently prefetch - // block - // can only be at the last blocks of the program + // FIXME(qiao) ParallelExecuteBlocks should only execute optimize blocks. + // the prefetch blocks should not be executed. Currently we put prefetch + // blocks + // at the end of programs. This may be misused. size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); std::vector block_list; - for (size_t blkid = 1; blkid < prefetch_block_id_list[0]; ++blkid) { + for (int blkid = 1; blkid < prefetch_block_id_list[0]; ++blkid) { block_list.push_back(blkid); } auto optimize_prepared = executor->Prepare(*program, block_list); @@ -131,7 +132,7 @@ void ListenAndServOp::RunSyncLoop( std::vector parallel_blkids; parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); - for (size_t blkid = 2; blkid < prefetch_block_id_list[0]; ++blkid) { + for (int blkid = 2; blkid < prefetch_block_id_list[0]; ++blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -255,7 +256,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // prepare for prefetch std::vector prefetch_block_id_list; - std::unordered_map block_id_to_prefetch_var_name; + std::unordered_map block_id_to_prefetch_var_name; auto prefetch_var_name_to_block_id_str = Attr>(kPrefetchVarNameToBlockId); @@ -277,7 +278,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, std::unordered_map> prefetch_var_name_to_prepared_ctx; - for (int i = 0; i < prefetch_block_id_list.size(); ++i) { + for (size_t i = 0; i < prefetch_block_id_list.size(); ++i) { auto block_id = prefetch_block_id_list[i]; auto prefetch_var_name = block_id_to_prefetch_var_name[block_id]; prefetch_var_name_to_prepared_ctx[prefetch_var_name] = prefetch_prepared[i]; From 506fc8d9e82b9520ce43d47570ae719ca7932d68 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 20:29:17 +0800 Subject: [PATCH 7/9] optimize code --- paddle/fluid/operators/listen_and_serv_op.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 84a1d5e3a5..362bc3ae11 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,19 +93,18 @@ void ListenAndServOp::RunSyncLoop( framework::Executor *executor, framework::ProgramDesc *program, framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { - // FIXME(qiao) ParallelExecuteBlocks should only execute optimize blocks. - // the prefetch blocks should not be executed. Currently we put prefetch - // blocks - // at the end of programs. This may be misused. size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector block_list; - for (int blkid = 1; blkid < prefetch_block_id_list[0]; ++blkid) { - block_list.push_back(blkid); + std::vector optimize_block_id_list; + for (int blkid = 1; blkid < num_blocks; ++blkid) { + if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(), + blkid) == prefetch_block_id_list.end()) { + optimize_block_id_list.push_back(blkid); + } } - auto optimize_prepared = executor->Prepare(*program, block_list); + auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( optimize_prepared.begin(), @@ -132,7 +131,10 @@ void ListenAndServOp::RunSyncLoop( std::vector parallel_blkids; parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); - for (int blkid = 2; blkid < prefetch_block_id_list[0]; ++blkid) { + for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { + // skip the first optimize block because it is already in the + // parallel_blkids. + int blkid = optimize_block_id_list[i]; if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); From 83a577e8ce4114640a9fb2189befc577091581ee Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 11 Jun 2018 21:02:43 +0800 Subject: [PATCH 8/9] fix build problem --- paddle/fluid/framework/details/ssa_graph_checker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/details/ssa_graph_checker.h b/paddle/fluid/framework/details/ssa_graph_checker.h index 542c4a1728..304b221e7e 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.h +++ b/paddle/fluid/framework/details/ssa_graph_checker.h @@ -19,7 +19,7 @@ namespace paddle { namespace framework { namespace details { -class SSAGraph; +struct SSAGraph; class SSAGraghBuilderWithChecker : public SSAGraphBuilder { public: From 2b9ff39f5f66663a60d4d33bdc2ee1da0c1ff364 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 12 Jun 2018 10:25:25 +0800 Subject: [PATCH 9/9] fix the default value prefetch_var_name_to_block_id --- paddle/fluid/operators/listen_and_serv_op.cc | 3 ++- .../fluid/transpiler/distribute_transpiler.py | 20 +++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 4cf2c8daa5..4d12278799 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -340,7 +340,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { AddAttr(kOptimizeBlock, "BlockID to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, - "prefetch block to run on server side."); + "prefetch blocks to run on server side.") + .SetDefault({}); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 924e5ba4f6..2480d4e76a 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -530,19 +530,23 @@ class DistributeTranspiler: else: assert len(prefetch_var_name_to_block_id) == 0 + attrs = { + "OptimizeBlock": pserver_program.block(1), + "endpoint": endpoint, + "Fanin": self.trainer_num, + "sync_mode": self.sync_mode, + "grad_to_block_id": grad_to_block_id + } + if len(prefetch_var_name_to_block_id) > 0: + attrs['prefetch_var_name_to_block_id'] \ + = prefetch_var_name_to_block_id + # step5 append the listen_and_serv op pserver_program.global_block().append_op( type="listen_and_serv", inputs={'X': recv_inputs}, outputs={}, - attrs={ - "OptimizeBlock": pserver_program.block(1), - "endpoint": endpoint, - "Fanin": self.trainer_num, - "prefetch_var_name_to_block_id": prefetch_var_name_to_block_id, - "sync_mode": self.sync_mode, - "grad_to_block_id": grad_to_block_id - }) + attrs=attrs) pserver_program.sync_with_cpp() return pserver_program