From eeed7af5c3b6d51399412ba3cd0cab2125b33e90 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 2 May 2018 20:19:58 +0800 Subject: [PATCH 01/16] add gen_nccl_id_op --- paddle/fluid/operators/CMakeLists.txt | 7 +- paddle/fluid/operators/gen_nccl_id_op.cc | 123 ++++++++++++++++++ .../fluid/operators/lookup_sparse_table_op.cc | 2 +- 3 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 paddle/fluid/operators/gen_nccl_id_op.cc diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 256aded8ca..ad0732131c 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -184,6 +184,11 @@ endif() add_subdirectory(detail) if(WITH_DISTRIBUTE) + if(WITH_GPU) + op_library(gen_nccl_id_op DEPS nccl_common) + else() + set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) + endif() set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") op_library(send_op DEPS ${DISTRIBUTE_DEPS}) @@ -201,7 +206,7 @@ if(WITH_DISTRIBUTE) set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor) else() - set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op) + set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op gen_nccl_id_op) endif() op_library(cross_entropy_op DEPS cross_entropy) diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc new file mode 100644 index 0000000000..e75e045fcb --- /dev/null +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -0,0 +1,123 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include + +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/threadpool.h" +#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/detail/grpc_server.h" + +namespace paddle { +namespace operators { + +class GenNCCLIdOp : public framework::OperatorBase { + public: + GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(dev_place); + int trainer_id = Attr("trainer_id"); + framework::Scope& local_scope = scope.NewScope(); + + if (trainer_id == 0) { + GenerateAndSend(&local_scope, dev_ctx); + } else { + GetIdByServer(&local_scope, dev_ctx); + } + } + + private: + void GenerateAndSend(framework::Scope* scope, + const platform::DeviceContext& dev_ctx) const { + auto var = scope->FindVar("NCCLID"); + PADDLE_ENFORCE_NOT_NULL(var); + auto id = var->GetMutable(); + ncclGetUniqueId(id); + + std::vector endpoint_list = + Attr>("endpoint_list"); + detail::RPCClient client; + for (auto& ep : endpoint_list) { + client.AsyncSendVariable(ep, dev_ctx, *scope, "NCCLID"); + } + client.Wait(); + } + + void GetIdByServer(framework::Scope* scope, + const platform::DeviceContext& dev_ctx) const { + std::string endpoint = Attr("endpoint"); + rpc_service_.reset(new detail::AsyncGRPCServer(endpoint, true)); + framework::ProgramDesc empty_program; + framework::Executor executor(dev_ctx.GetPlace()); + rpc_service_->SetScope(scope); + rpc_service_->SetDevCtx(&dev_ctx); + rpc_service_->SetProgram(&empty_program); + rpc_service_->SetExecutor(&executor); + + server_thread_.reset(new std::thread(std::bind( + &detail::AsyncGRPCServer::RunSyncUpdate, rpc_service_.get()))); + + auto recv = rpc_service_->Get(); + rpc_service_->ShutDown(); + // TODO(wuyi): reinit nccl communicators + } + + protected: + mutable std::shared_ptr rpc_service_; + mutable std::shared_ptr server_thread_; +}; + +class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { + public: + GenNCCLIdOpMaker(OpProto* proto, OpAttrChecker* op_checker) + : GenNCCLIdOpMaker(proto, op_checker) { + AddOutput("NCCLID", "Raw variable contains a NCCL UniqueId instaces."); + AddComment(R"DOC( +GenNCCLId operator + +For trainer 0: generate a new UniqueId and send it to all the other trainers. +For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the server. +)DOC"); + AddAttr("endpoint", + "(string), e.g. 127.0.0.1:6175 " + "current listen endpoint"); + AddAttr>( + "endpoint_list", + "['trainer1_ip:port', 'trainer2_ip:port', ...] " + "list of trainer endpoints start from trainer 1") + .SetDefault({}); + AddAttr("trainer_id", + "(int default 0) " + "The index of the trainer in distributed training.") + .SetDefault(0); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(gen_nccl_id_op, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker); diff --git a/paddle/fluid/operators/lookup_sparse_table_op.cc b/paddle/fluid/operators/lookup_sparse_table_op.cc index f1839e456d..66b626ed79 100644 --- a/paddle/fluid/operators/lookup_sparse_table_op.cc +++ b/paddle/fluid/operators/lookup_sparse_table_op.cc @@ -62,7 +62,7 @@ class LookupSparseTableOp : public framework::OperatorBase { auto w_t = w_var->GetMutable(); std::vector keys; keys.resize(ids_t.numel()); - for (size_t i = 0; i < ids_t.numel(); ++i) { + for (int64_t i = 0; i < ids_t.numel(); ++i) { keys[i] = ids_t.data()[i]; } From 7237323c5dfb2bac5e55cdf183e27f631d4c1d3d Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 3 May 2018 12:22:21 +0800 Subject: [PATCH 02/16] fix compile --- paddle/fluid/operators/gen_nccl_id_op.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index e75e045fcb..235995aeb4 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -92,7 +92,7 @@ class GenNCCLIdOp : public framework::OperatorBase { class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { public: GenNCCLIdOpMaker(OpProto* proto, OpAttrChecker* op_checker) - : GenNCCLIdOpMaker(proto, op_checker) { + : OpProtoAndCheckerMaker(proto, op_checker) { AddOutput("NCCLID", "Raw variable contains a NCCL UniqueId instaces."); AddComment(R"DOC( GenNCCLId operator From d9320dcd944bba599135a711113d831551fa4814 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 4 May 2018 16:23:30 +0800 Subject: [PATCH 03/16] complete code --- paddle/fluid/framework/parallel_executor.cc | 10 ++++-- paddle/fluid/framework/parallel_executor.h | 3 +- paddle/fluid/operators/detail/send_recv.proto | 1 + .../operators/detail/sendrecvop_utils.cc | 21 +++++++++++- .../operators/detail/variable_response.cc | 11 ++++++- paddle/fluid/operators/gen_nccl_id_op.cc | 4 +-- paddle/fluid/platform/nccl_helper.h | 32 +++++++++++++++---- paddle/fluid/pybind/pybind.cc | 6 ++-- 8 files changed, 73 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 4712efeff6..bd2a2cfba5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -58,7 +58,7 @@ ParallelExecutor::ParallelExecutor( const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, bool allow_op_delay, - bool use_default_grad_scale) + bool use_default_grad_scale, size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -80,7 +80,13 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_)); + auto *nccl_id_var = scope->FindVar("NCCLID"); + ncclUniqueId *nccl_id = nullptr; + if (nccl_id_var != nullptr) { + nccl_id = nccl_id_var->GetMutable(); + } + member_->nccl_ctxs_.reset(new platform::NCCLContextMap( + member_->places_, nccl_id, num_trainers, trainer_id)); #endif if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 && local_scopes.empty()) { // Is CUDA diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ecd107d81f..306d2bdfaf 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -40,7 +40,8 @@ class ParallelExecutor { const ProgramDesc& main_program, const std::string& loss_var_name, Scope* scope, const std::vector& local_scopes, - bool allow_op_delay, bool use_default_grad_scale); + bool allow_op_delay, bool use_default_grad_scale, + size_t num_trainers = 0, size_t trainer_id = 0); ~ParallelExecutor(); diff --git a/paddle/fluid/operators/detail/send_recv.proto b/paddle/fluid/operators/detail/send_recv.proto index 02bb2b9ceb..3b343a7e56 100644 --- a/paddle/fluid/operators/detail/send_recv.proto +++ b/paddle/fluid/operators/detail/send_recv.proto @@ -32,6 +32,7 @@ service SendRecvService { enum VarType { LOD_TENSOR = 0; SELECTED_ROWS = 1; + NCCL_ID = 2; } // NOTICE(gongwb):don't modify this proto if you are not diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 766bcf1ac5..800d6d8bd2 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -43,13 +43,16 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, void* buf = buffer.get(); void* payload = nullptr; - size_t payload_size; + size_t payload_size = 0; ProtoEncodeHelper e(static_cast(buf), 1024); e.WriteString(VarMsg::kVarnameFieldNumber, name); if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 0); } else if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 1); + } else if (var->IsType()) { + // NOTE: sendrecv only support RAW type for NCCL_ID + e.WriteUint64(VarMsg::kTypeFieldNumber, 2); } if (!out_name.empty()) { @@ -139,11 +142,27 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); } break; + case framework::proto::VarType_Type_RAW: { + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, + NCCL_UNIQUE_ID_BYTES); + ncclUniqueId* uid = var->GetMutable(); + e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + } break; default: PADDLE_THROW("Serialize does not support type: %s", typeid(var->Type()).name()); break; } + + if (framework::ToVarType(var->Type()) == framework::proto::VarType_Type_RAW) { + // for serialize NCCL_ID + ::grpc::Slice slices(e.size()); + memcpy(const_cast(slices.begin()), e.data(), e.size()); + ::grpc::ByteBuffer tmp(&slices, 1); + msg->Swap(&tmp); + return; + } + // steal reference of tensor data ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows int num_slices = 2; // only SelectedRows have rows buffer diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index fbef8d02a4..81d755f5fc 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -367,9 +367,18 @@ int VariableResponse::Parse(Source* source) { } case sendrecv::VariableMessage::kSerializedFieldNumber: { PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || - meta_.type() == sendrecv::LOD_TENSOR) && + meta_.type() == sendrecv::LOD_TENSOR || + meta_.type() == sendrecv::NCCL_ID) && meta_.varname() != "", "meta info should be got first!"); + if (meta_.type() == sendrecv::NCCL_ID) { + auto* var = scope_->FindVar(meta_.varname()); + if (var != nullptr) { + ncclUniqueId* id = var->GetMutable(); + memcpy(id->internal, meta_.serialized().c_str(), + meta_.serialized().size()); + } + } int length = 0; if (wt != WIRETYPE_LENGTH_DELIMITED || diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 235995aeb4..afb228fa6f 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -54,7 +54,7 @@ class GenNCCLIdOp : public framework::OperatorBase { auto var = scope->FindVar("NCCLID"); PADDLE_ENFORCE_NOT_NULL(var); auto id = var->GetMutable(); - ncclGetUniqueId(id); + platform::dynload::ncclGetUniqueId(id); std::vector endpoint_list = Attr>("endpoint_list"); @@ -120,4 +120,4 @@ For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the ser namespace ops = paddle::operators; -REGISTER_OPERATOR(gen_nccl_id_op, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker); +REGISTER_OPERATOR(gen_nccl_id, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 0013597fd5..3b52587a28 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -73,7 +73,9 @@ struct NCCLContextMap { std::unordered_map contexts_; std::vector order_; - explicit NCCLContextMap(const std::vector &places) { + explicit NCCLContextMap(const std::vector &places, + ncclUniqueId *nccl_id = nullptr, + size_t node_count = 0, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -85,18 +87,36 @@ struct NCCLContextMap { order_.size(), contexts_.size(), "NCCL Context Map does not support contain two or more same device"); - if (places.size() > 1) { - std::unique_ptr comms(new ncclComm_t[order_.size()]); + if (places.size() <= 1) { + return; + } + std::unique_ptr comms(new ncclComm_t[order_.size()]); + // if pass nccl_id here, can assume we are doing multi node training + if (nccl_id == nullptr) { { std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( comms.get(), static_cast(order_.size()), order_.data())); } - int i = 0; - for (auto &dev_id : order_) { - contexts_.at(dev_id).comm_ = comms[i++]; + } else { + PADDLE_ENFORCE_GT(node_count, 0); + PADDLE_ENFORCE_EQ(node_count % places.size(), 0, + "must have same number of GPUs on each node"); + { + std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); + int nranks = node_count * order_.size(); + for (auto &gpu_id : order_) { + int rank = trainer_id * order_.size() + gpu_id; + PADDLE_ENFORCE(cudaSetDevice(gpu_id)); + PADDLE_ENFORCE( + ncclCommInitRank(comms.get() + gpu_id, nranks, *nccl_id, rank)); + } } } + int i = 0; + for (auto &dev_id : order_) { + contexts_.at(dev_id).comm_ = comms[i++]; + } } NCCLContextMap(const NCCLContextMap &other) = delete; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c925686f83..827c2701ba 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -502,11 +502,13 @@ All parameter, weight, gradient are variables in Paddle. const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, std::vector &local_scopes, - bool allow_op_delay, bool use_default_grad_scale) { + bool allow_op_delay, bool use_default_grad_scale, + size_t num_trainers, size_t trainer_id) { new (&self) ParallelExecutor( num_threads, use_event, places, params, bcast_vars, main_program, loss_var_name, scope, local_scopes, - allow_op_delay, use_default_grad_scale); + allow_op_delay, use_default_grad_scale, num_trainers, + trainer_id); }) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. From 3667578ec2f820dddf5067bab5e918313d8bf383 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Sat, 5 May 2018 17:34:56 +0800 Subject: [PATCH 04/16] testing --- paddle/fluid/platform/nccl_helper.h | 4 ++-- python/paddle/fluid/parallel_executor.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 3b52587a28..f3c4c92afa 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -108,8 +108,8 @@ struct NCCLContextMap { for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; PADDLE_ENFORCE(cudaSetDevice(gpu_id)); - PADDLE_ENFORCE( - ncclCommInitRank(comms.get() + gpu_id, nranks, *nccl_id, rank)); + PADDLE_ENFORCE(platform::dynload::ncclCommInitRank( + comms.get() + gpu_id, nranks, *nccl_id, rank)); } } } diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index f4128dcbe9..34899a54b6 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -30,7 +30,9 @@ class ParallelExecutor(object): num_threads=None, allow_op_delay=False, share_vars_from=None, - use_default_grad_scale=True): + use_default_grad_scale=True, + num_nodes=0, + trainer_id=0): """ ParallelExecutor can run program in parallel. @@ -129,7 +131,9 @@ class ParallelExecutor(object): scope, local_scopes, allow_op_delay, - use_default_grad_scale) + use_default_grad_scale, + num_nodes, + trainer_id) self.scope = scope def run(self, fetch_list, feed=None, feed_dict=None): From 0598a4b36627b219c6a80c721e133e269c3b8bc5 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Sat, 5 May 2018 18:27:07 +0800 Subject: [PATCH 05/16] fix ci --- paddle/fluid/operators/detail/sendrecvop_utils.cc | 1 + python/paddle/fluid/framework.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 800d6d8bd2..d754630fd7 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include #include #include // NOLINT diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 2cdf010926..c6262d54ac 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -487,7 +487,7 @@ class Operator(object): 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv', 'listen_and_serv', 'parallel_do', 'save_combine', 'load_combine', 'ncclInit', 'channel_create', 'channel_close', - 'channel_send', 'channel_recv', 'select' + 'channel_send', 'channel_recv', 'select', 'gen_nccl_id' } if type not in no_kernel_op_set: self.desc.infer_var_type(self.block.desc) From 82c61dbde3cd496c5aeddb1d94eb0dd82d039268 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 7 May 2018 13:51:03 +0800 Subject: [PATCH 06/16] fix testing --- paddle/fluid/operators/detail/grpc_client.cc | 2 +- paddle/fluid/operators/detail/grpc_client.h | 6 +- paddle/fluid/operators/detail/grpc_server.cc | 2 +- paddle/fluid/operators/detail/grpc_server.h | 2 + .../operators/detail/sendrecvop_utils.cc | 169 +++++++++--------- .../operators/detail/variable_response.cc | 23 ++- paddle/fluid/operators/gen_nccl_id_op.cc | 14 +- 7 files changed, 117 insertions(+), 101 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index 661dfa69fe..ae60ab1532 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -52,7 +52,7 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, // stub context SendProcessor* s = new SendProcessor(ch); s->Prepare(var_h, time_out); - s->response_call_back_ = NULL; + s->response_call_back_ = nullptr; auto call = s->stub_g_.PrepareUnaryCall( s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_); diff --git a/paddle/fluid/operators/detail/grpc_client.h b/paddle/fluid/operators/detail/grpc_client.h index f6229b71bc..dabce7414d 100644 --- a/paddle/fluid/operators/detail/grpc_client.h +++ b/paddle/fluid/operators/detail/grpc_client.h @@ -57,7 +57,9 @@ void ProcGetResponse(const VarHandle& var_h, const grpc::ByteBuffer& msg); class BaseProcessor { public: - explicit BaseProcessor(std::shared_ptr ch) { context_ = NULL; } + explicit BaseProcessor(std::shared_ptr ch) { + context_ = nullptr; + } virtual ~BaseProcessor() {} @@ -105,7 +107,7 @@ class SendProcessor : public BaseProcessor { ::grpc::GenericStub stub_g_; ::grpc::ByteBuffer reply_; - RequestSendCallBack response_call_back_ = NULL; + RequestSendCallBack response_call_back_ = nullptr; }; typedef std::function diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 7ca694886e..1cdfe01170 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -261,8 +261,8 @@ void AsyncGRPCServer::ShutdownQueue() { // This URL explains why shutdown is complicate: void AsyncGRPCServer::ShutDown() { is_shut_down_ = true; - ShutdownQueue(); server_->Shutdown(); + ShutdownQueue(); } void AsyncGRPCServer::TryToRegisterNewSendOne() { diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 99b87b8c6c..0e1592eed4 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -47,6 +47,8 @@ class AsyncGRPCServer final { explicit AsyncGRPCServer(const std::string &address, bool sync_mode) : address_(address), sync_mode_(sync_mode) {} + ~AsyncGRPCServer() {} + void RunSyncUpdate(); // functions to sync server barrier status. diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index d754630fd7..207ea3cb8b 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -53,109 +53,106 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, e.WriteUint64(VarMsg::kTypeFieldNumber, 1); } else if (var->IsType()) { // NOTE: sendrecv only support RAW type for NCCL_ID + VLOG(3) << "serilizing: setting var type nccl id"; e.WriteUint64(VarMsg::kTypeFieldNumber, 2); } if (!out_name.empty()) { e.WriteString(VarMsg::kOutVarnameFieldNumber, out_name); } - switch (framework::ToVarType(var->Type())) { - case framework::proto::VarType_Type_LOD_TENSOR: { - auto tensor = var->Get(); - e.WriteUint64(VarMsg::kDataTypeFieldNumber, - framework::ToDataType(tensor.type())); - for (auto& dim : framework::vectorize(tensor.dims())) { - e.WriteUint64(VarMsg::kDimsFieldNumber, dim); - } - auto lod = tensor.lod(); // std::vector> - if (lod.size() > 0) { - e.WriteUint64(VarMsg::kLodLevelFieldNumber, lod.size()); - - for (auto& each : lod) { - e.WriteVarlengthBeginning(VarMsg::kLodFieldNumber, - 2 + // tag + varintlength of submessage - 1 + // kLodDataFieldNumber - each.size()); - // auto copied from GPU - for (auto& d : each) { - e.WriteUint64(VarMsg::LodData::kLodDataFieldNumber, d); - } + if (var->IsType()) { + // ===========================Tensor================================== + auto tensor = var->Get(); + e.WriteUint64(VarMsg::kDataTypeFieldNumber, + framework::ToDataType(tensor.type())); + for (auto& dim : framework::vectorize(tensor.dims())) { + e.WriteUint64(VarMsg::kDimsFieldNumber, dim); + } + auto lod = tensor.lod(); // std::vector> + if (lod.size() > 0) { + e.WriteUint64(VarMsg::kLodLevelFieldNumber, lod.size()); + + for (auto& each : lod) { + e.WriteVarlengthBeginning(VarMsg::kLodFieldNumber, + 2 + // tag + varintlength of submessage + 1 + // kLodDataFieldNumber + each.size()); + // auto copied from GPU + for (auto& d : each) { + e.WriteUint64(VarMsg::LodData::kLodDataFieldNumber, d); } } - if (platform::is_gpu_place(ctx.GetPlace())) { + } + if (platform::is_gpu_place(ctx.GetPlace())) { #ifdef PADDLE_WITH_CUDA - PADDLE_ENFORCE(platform::is_gpu_place(tensor.place())); + PADDLE_ENFORCE(platform::is_gpu_place(tensor.place())); + platform::CPUPlace cpu; + auto& gpu_dev_ctx = static_cast(ctx); + auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); + payload = memory::Alloc(cpu, copy_size); + + memory::Copy(cpu, payload, + boost::get(tensor.place()), + reinterpret_cast(tensor.data()), + copy_size, gpu_dev_ctx.stream()); + ctx.Wait(); + destroy_callback = [](void* backing) { platform::CPUPlace cpu; - auto& gpu_dev_ctx = - static_cast(ctx); - auto copy_size = tensor.numel() * framework::SizeOfType(tensor.type()); - payload = memory::Alloc(cpu, copy_size); - - memory::Copy(cpu, payload, - boost::get(tensor.place()), - reinterpret_cast(tensor.data()), - copy_size, gpu_dev_ctx.stream()); - ctx.Wait(); - destroy_callback = [](void* backing) { - platform::CPUPlace cpu; - memory::Free(cpu, backing); - }; + memory::Free(cpu, backing); + }; #endif - } else { - payload = tensor.data(); - } - payload_size = tensor.numel() * framework::SizeOfType(tensor.type()); - e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); - } break; - case framework::proto::VarType_Type_SELECTED_ROWS: { - // TODO(typhoonzero): selectedrows implement should not use unique_ptr - auto* slr = var->GetMutable(); - e.WriteUint64(VarMsg::kDataTypeFieldNumber, - framework::ToDataType(slr->value().type())); - for (auto& dim : framework::vectorize(slr->value().dims())) { - e.WriteUint64(VarMsg::kDimsFieldNumber, dim); - } - e.WriteUint64(VarMsg::kLodLevelFieldNumber, 0); - e.WriteUint64(VarMsg::kSlrHeightFieldNumber, slr->height()); - auto* tensor = slr->mutable_value(); - if (platform::is_gpu_place(ctx.GetPlace())) { + } else { + payload = tensor.data(); + } + payload_size = tensor.numel() * framework::SizeOfType(tensor.type()); + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); + } else if (var->IsType()) { + // ===========================SELECTED + // ROWS================================== + // TODO(typhoonzero): selectedrows implement should not use unique_ptr + auto* slr = var->GetMutable(); + e.WriteUint64(VarMsg::kDataTypeFieldNumber, + framework::ToDataType(slr->value().type())); + for (auto& dim : framework::vectorize(slr->value().dims())) { + e.WriteUint64(VarMsg::kDimsFieldNumber, dim); + } + e.WriteUint64(VarMsg::kLodLevelFieldNumber, 0); + e.WriteUint64(VarMsg::kSlrHeightFieldNumber, slr->height()); + auto* tensor = slr->mutable_value(); + if (platform::is_gpu_place(ctx.GetPlace())) { #ifdef PADDLE_WITH_CUDA + platform::CPUPlace cpu; + auto& gpu_dev_ctx = static_cast(ctx); + auto copy_size = tensor->numel() * framework::SizeOfType(tensor->type()); + payload = memory::Alloc(cpu, copy_size); + memory::Copy(cpu, payload, + boost::get(tensor->place()), + reinterpret_cast(tensor->data()), + copy_size, gpu_dev_ctx.stream()); + ctx.Wait(); + destroy_callback = [](void* backing) { platform::CPUPlace cpu; - auto& gpu_dev_ctx = - static_cast(ctx); - auto copy_size = - tensor->numel() * framework::SizeOfType(tensor->type()); - payload = memory::Alloc(cpu, copy_size); - memory::Copy(cpu, payload, - boost::get(tensor->place()), - reinterpret_cast(tensor->data()), - copy_size, gpu_dev_ctx.stream()); - ctx.Wait(); - destroy_callback = [](void* backing) { - platform::CPUPlace cpu; - memory::Free(cpu, backing); - }; + memory::Free(cpu, backing); + }; #endif - } else { - payload = slr->mutable_value()->data(); - } - payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); - e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); - } break; - case framework::proto::VarType_Type_RAW: { - e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, - NCCL_UNIQUE_ID_BYTES); - ncclUniqueId* uid = var->GetMutable(); - e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); - } break; - default: - PADDLE_THROW("Serialize does not support type: %s", - typeid(var->Type()).name()); - break; + } else { + payload = slr->mutable_value()->data(); + } + payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); + } else if (var->IsType()) { + // ===========================NCCL ID================================== + e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, + NCCL_UNIQUE_ID_BYTES); + ncclUniqueId* uid = var->GetMutable(); + e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + } else { + PADDLE_THROW("Serialize does not support type: %s", + typeid(var->Type()).name()); } - if (framework::ToVarType(var->Type()) == framework::proto::VarType_Type_RAW) { + if (var->IsType()) { // for serialize NCCL_ID ::grpc::Slice slices(e.size()); memcpy(const_cast(slices.begin()), e.data(), e.size()); diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index 81d755f5fc..64fd84736d 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -371,19 +371,26 @@ int VariableResponse::Parse(Source* source) { meta_.type() == sendrecv::NCCL_ID) && meta_.varname() != "", "meta info should be got first!"); + int length = 0; + if (wt != WIRETYPE_LENGTH_DELIMITED || + !ReadVarintSizeAsInt(&input, &length)) { + return tag; + } + if (meta_.type() == sendrecv::NCCL_ID) { + VLOG(3) << "parse nccl id request"; auto* var = scope_->FindVar(meta_.varname()); if (var != nullptr) { + VLOG(3) << "parse nccl id: length " << length; ncclUniqueId* id = var->GetMutable(); - memcpy(id->internal, meta_.serialized().c_str(), - meta_.serialized().size()); + if (!ReadRaw(&input, *dev_ctx_, platform::CPUPlace(), id->internal, + length)) { + return tag; + } + // memcpy(id->internal, meta_.serialized().c_str(), + // meta_.serialized().size()); } - } - - int length = 0; - if (wt != WIRETYPE_LENGTH_DELIMITED || - !ReadVarintSizeAsInt(&input, &length)) { - return tag; + break; } framework::DDim dims = GetDims(meta_.dims()); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index afb228fa6f..8d28be35a8 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -37,7 +37,8 @@ class GenNCCLIdOp : public framework::OperatorBase { void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& dev_ctx = *pool.Get(dev_place); + // put nccl id in CPUPlace + auto& dev_ctx = *pool.Get(platform::CPUPlace()); int trainer_id = Attr("trainer_id"); framework::Scope& local_scope = scope.NewScope(); @@ -60,9 +61,11 @@ class GenNCCLIdOp : public framework::OperatorBase { Attr>("endpoint_list"); detail::RPCClient client; for (auto& ep : endpoint_list) { + VLOG(3) << "sending nccl id to " << ep; client.AsyncSendVariable(ep, dev_ctx, *scope, "NCCLID"); } client.Wait(); + VLOG(3) << "sending completed..."; } void GetIdByServer(framework::Scope* scope, @@ -78,9 +81,14 @@ class GenNCCLIdOp : public framework::OperatorBase { server_thread_.reset(new std::thread(std::bind( &detail::AsyncGRPCServer::RunSyncUpdate, rpc_service_.get()))); - + rpc_service_->SetCond(0); + VLOG(3) << "start getting nccl id from trainer 0..."; auto recv = rpc_service_->Get(); - rpc_service_->ShutDown(); + VLOG(3) << "got nccl id and stop server..."; + // rpc_service_->SetCond(1); + // rpc_service_->ShutDown(); + rpc_service->Push(LISTEN_TERMINATE_MESSAGE); + VLOG(3) << "rpc server stopped"; // TODO(wuyi): reinit nccl communicators } From 17009d0627a97274299249e3680f58d6e2270ff7 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 7 May 2018 17:10:53 +0800 Subject: [PATCH 07/16] workable version --- paddle/fluid/operators/CMakeLists.txt | 1 + paddle/fluid/operators/test_send_nccl_id.cc | 88 +++++++++++++++++++++ paddle/fluid/platform/nccl_helper.h | 7 +- python/paddle/fluid/parallel_executor.py | 5 ++ 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 paddle/fluid/operators/test_send_nccl_id.cc diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index ad0732131c..2b8df6c35f 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -205,6 +205,7 @@ if(WITH_DISTRIBUTE) set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor) + cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS send_op listen_and_serv_op executor) else() set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op gen_nccl_id_op) endif() diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc new file mode 100644 index 0000000000..7a8b425665 --- /dev/null +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -0,0 +1,88 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include // NOLINT + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/listen_and_serv_op.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/string/printf.h" + +USE_NO_KERNEL_OP(listen_and_serv); + +namespace f = paddle::framework; +namespace p = paddle::platform; +namespace m = paddle::operators::math; +namespace detail = paddle::operators::detail; +namespace string = paddle::string; + +std::unique_ptr rpc_service; + +void StartServer() { + f::Scope scope; + p::CPUPlace place; + scope.Var("NCCLID"); + p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(p::CPUPlace()); + + rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", true)); + + f::ProgramDesc empty_program; + f::Executor executor(dev_ctx.GetPlace()); + rpc_service->SetScope(&scope); + rpc_service->SetDevCtx(&dev_ctx); + rpc_service->SetProgram(&empty_program); + rpc_service->SetExecutor(&executor); + + std::thread server_thread( + std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service.get())); + rpc_service->SetCond(0); + auto recv = rpc_service->Get(); + LOG(INFO) << "got nccl id and stop server..."; + rpc_service->ShutDown(); + server_thread.join(); +} + +TEST(SendNcclId, Normal) { + std::thread server_thread(StartServer); + // wait server to start + sleep(2); + + f::Scope scope; + p::CPUPlace place; + p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); + auto& dev_ctx = *pool.Get(p::CPUPlace()); + + auto var = scope.Var("NCCLID"); + // var->SetType(f::proto::VarType_Type_RAW); + auto id = var->GetMutable(); + p::dynload::ncclGetUniqueId(id); + + int port = rpc_service->GetSelectedPort(); + std::string ep = string::Sprintf("127.0.0.1:%d", port); + detail::RPCClient client; + + client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID"); + client.Wait(); + server_thread.join(); + auto* ptr = rpc_service.release(); + delete ptr; +} diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index f3c4c92afa..094c47007e 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -14,6 +14,7 @@ #pragma once +#include #include // NOLINT #include #include @@ -100,13 +101,13 @@ struct NCCLContextMap { } } else { PADDLE_ENFORCE_GT(node_count, 0); - PADDLE_ENFORCE_EQ(node_count % places.size(), 0, - "must have same number of GPUs on each node"); + // TODO(wuyi): need to ensure each node have same number of GPUs { - std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); int nranks = node_count * order_.size(); + NCCLGroupGuard gurad; for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; + VLOG(3) << "init nccl rank: " << rank << " nranks: " << nranks; PADDLE_ENFORCE(cudaSetDevice(gpu_id)); PADDLE_ENFORCE(platform::dynload::ncclCommInitRank( comms.get() + gpu_id, nranks, *nccl_id, rank)); diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 50ec438bee..bd92ac548d 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -53,6 +53,11 @@ class ParallelExecutor(object): gradients of each device and scaled gradients would be aggregated. Otherwise, a customized scale value should be fed to the network. + num_nodes(int, default 0): If greater than 0, NCCL will be + initialized with multpile rank of nodes, each node should have + same number of GPUs. Distributed training will be enabled then. + trainer_id(int, default 0): Must use together with num_nodes. + trainer_id is the "rank" of current node starts from 0. Returns: A ParallelExecutor object. From 0f86397d812e89fc96c31ef573d0f938fac24b09 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 7 May 2018 19:11:50 +0800 Subject: [PATCH 08/16] fix build --- paddle/fluid/operators/detail/sendrecvop_utils.cc | 10 ++++++++-- paddle/fluid/operators/detail/variable_response.cc | 11 +++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 207ea3cb8b..bde418a45a 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -14,7 +14,9 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#ifdef PADDLE_WITH_CUDA #include +#endif #include #include // NOLINT @@ -51,10 +53,12 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, e.WriteUint64(VarMsg::kTypeFieldNumber, 0); } else if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 1); +#ifdef PADDLE_WITH_CUDA } else if (var->IsType()) { // NOTE: sendrecv only support RAW type for NCCL_ID VLOG(3) << "serilizing: setting var type nccl id"; e.WriteUint64(VarMsg::kTypeFieldNumber, 2); +#endif } if (!out_name.empty()) { @@ -141,17 +145,19 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, } payload_size = tensor->numel() * framework::SizeOfType(tensor->type()); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); +#ifdef PADDLE_WITH_CUDA } else if (var->IsType()) { // ===========================NCCL ID================================== e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, NCCL_UNIQUE_ID_BYTES); ncclUniqueId* uid = var->GetMutable(); e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); +#endif } else { PADDLE_THROW("Serialize does not support type: %s", typeid(var->Type()).name()); } - +#ifdef PADDLE_WITH_CUDA if (var->IsType()) { // for serialize NCCL_ID ::grpc::Slice slices(e.size()); @@ -160,7 +166,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, msg->Swap(&tmp); return; } - +#endif // steal reference of tensor data ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows int num_slices = 2; // only SelectedRows have rows buffer diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index 64fd84736d..9cf6dd90fc 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -17,6 +17,9 @@ #include #include #include +#ifdef PADDLE_WITH_CUDA +#include +#endif #include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" @@ -378,19 +381,19 @@ int VariableResponse::Parse(Source* source) { } if (meta_.type() == sendrecv::NCCL_ID) { - VLOG(3) << "parse nccl id request"; +#ifdef PADDLE_WITH_CUDA auto* var = scope_->FindVar(meta_.varname()); if (var != nullptr) { - VLOG(3) << "parse nccl id: length " << length; ncclUniqueId* id = var->GetMutable(); if (!ReadRaw(&input, *dev_ctx_, platform::CPUPlace(), id->internal, length)) { return tag; } - // memcpy(id->internal, meta_.serialized().c_str(), - // meta_.serialized().size()); } break; +#else + PADDLE_THROW("Not compiled with CUDA!"); +#endif } framework::DDim dims = GetDims(meta_.dims()); From f5840d89258b4d2dd8e3da77db8645d58bbf0d4d Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 11 May 2018 19:54:35 +0800 Subject: [PATCH 09/16] follow comments --- paddle/fluid/framework/parallel_executor.cc | 2 +- paddle/fluid/operators/CMakeLists.txt | 2 +- .../fluid/operators/detail/sendrecvop_utils.cc | 4 ++-- paddle/fluid/operators/gen_nccl_id_op.cc | 9 ++++++--- paddle/fluid/operators/test_send_nccl_id.cc | 6 +++--- paddle/fluid/platform/nccl_helper.h | 16 ++++++++-------- python/paddle/fluid/parallel_executor.py | 8 ++++---- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b080e516de..4d62edfef4 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -80,7 +80,7 @@ ParallelExecutor::ParallelExecutor( // Bcast Parameters to all GPUs #ifdef PADDLE_WITH_CUDA - auto *nccl_id_var = scope->FindVar("NCCLID"); + auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); ncclUniqueId *nccl_id = nullptr; if (nccl_id_var != nullptr) { nccl_id = nccl_id_var->GetMutable(); diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 2b8df6c35f..48d0af1a5b 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -187,7 +187,7 @@ if(WITH_DISTRIBUTE) if(WITH_GPU) op_library(gen_nccl_id_op DEPS nccl_common) else() - set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) + set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op) endif() set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 247130f571..f3ac149947 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -162,8 +162,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, if (var->IsType()) { e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, NCCL_UNIQUE_ID_BYTES); - ncclUniqueId* uid = var->GetMutable(); - e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES)); + ncclUniqueId& uid = var->Get(); + e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES)); // for serialize NCCL_ID ::grpc::Slice slices(e.size()); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index cfdeaee00b..1cddc998e4 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -52,17 +52,17 @@ class GenNCCLIdOp : public framework::OperatorBase { private: void GenerateAndSend(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { - auto var = scope->FindVar("NCCLID"); + auto var = scope->FindVar(NCCL_ID_VARNAME); PADDLE_ENFORCE_NOT_NULL(var); auto id = var->GetMutable(); - platform::dynload::ncclGetUniqueId(id); + PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); std::vector endpoint_list = Attr>("endpoint_list"); detail::RPCClient client; for (auto& ep : endpoint_list) { VLOG(3) << "sending nccl id to " << ep; - client.AsyncSendVariable(ep, dev_ctx, *scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, *scope, NCCL_ID_VARNAME); } client.Wait(); VLOG(3) << "sending completed..."; @@ -71,6 +71,9 @@ class GenNCCLIdOp : public framework::OperatorBase { void GetIdByServer(framework::Scope* scope, const platform::DeviceContext& dev_ctx) const { std::string endpoint = Attr("endpoint"); + // NOTE: Can not use unique_ptr here because the default + // deleter will call GRPC Server's base class's dtor and + // that will cause a wired crash. rpc_service_ = new detail::AsyncGRPCServer(endpoint, true); framework::ProgramDesc empty_program; framework::Executor executor(dev_ctx.GetPlace()); diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc index 6781f85c4a..2c3c5ea0a0 100644 --- a/paddle/fluid/operators/test_send_nccl_id.cc +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -39,7 +39,7 @@ std::unique_ptr rpc_service; void StartServer() { f::Scope scope; p::CPUPlace place; - scope.Var("NCCLID"); + scope.Var(NCCL_ID_VARNAME); p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); @@ -71,7 +71,7 @@ TEST(SendNcclId, Normal) { p::DeviceContextPool& pool = p::DeviceContextPool::Instance(); auto& dev_ctx = *pool.Get(p::CPUPlace()); - auto var = scope.Var("NCCLID"); + auto var = scope.Var(NCCL_ID_VARNAME); // var->SetType(f::proto::VarType_Type_RAW); auto id = var->GetMutable(); p::dynload::ncclGetUniqueId(id); @@ -80,7 +80,7 @@ TEST(SendNcclId, Normal) { std::string ep = string::Sprintf("127.0.0.1:%d", port); detail::RPCClient client; - client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID"); + client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME); client.Wait(); server_thread.join(); auto* ptr = rpc_service.release(); diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 094c47007e..408721be8b 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -21,6 +21,8 @@ #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" +#define NCCL_ID_VARNAME "NCCLID" + namespace paddle { namespace platform { @@ -76,7 +78,7 @@ struct NCCLContextMap { explicit NCCLContextMap(const std::vector &places, ncclUniqueId *nccl_id = nullptr, - size_t node_count = 0, size_t trainer_id = 0) { + size_t num_trainers = 0, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -94,16 +96,14 @@ struct NCCLContextMap { std::unique_ptr comms(new ncclComm_t[order_.size()]); // if pass nccl_id here, can assume we are doing multi node training if (nccl_id == nullptr) { - { - std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); - PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - comms.get(), static_cast(order_.size()), order_.data())); - } + std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); + PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( + comms.get(), static_cast(order_.size()), order_.data())); } else { - PADDLE_ENFORCE_GT(node_count, 0); + PADDLE_ENFORCE_GT(num_trainers, 0); // TODO(wuyi): need to ensure each node have same number of GPUs { - int nranks = node_count * order_.size(); + int nranks = num_trainers * order_.size(); NCCLGroupGuard gurad; for (auto &gpu_id : order_) { int rank = trainer_id * order_.size() + gpu_id; diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index bd92ac548d..4f6db7c2be 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -31,7 +31,7 @@ class ParallelExecutor(object): allow_op_delay=False, share_vars_from=None, use_default_grad_scale=True, - num_nodes=0, + num_trainers=0, trainer_id=0): """ ParallelExecutor can run program in parallel. @@ -53,10 +53,10 @@ class ParallelExecutor(object): gradients of each device and scaled gradients would be aggregated. Otherwise, a customized scale value should be fed to the network. - num_nodes(int, default 0): If greater than 0, NCCL will be + num_trainers(int, default 0): If greater than 0, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. - trainer_id(int, default 0): Must use together with num_nodes. + trainer_id(int, default 0): Must use together with num_trainers. trainer_id is the "rank" of current node starts from 0. Returns: @@ -137,7 +137,7 @@ class ParallelExecutor(object): local_scopes, allow_op_delay, use_default_grad_scale, - num_nodes, + num_trainers, trainer_id) self.scope = scope From 7a7d27b33e26e5008b905cc105ff9667f6b0ec2a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 11 May 2018 19:56:21 +0800 Subject: [PATCH 10/16] update op --- paddle/fluid/operators/gen_nccl_id_op.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 1cddc998e4..77cd4a460a 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -102,8 +102,7 @@ class GenNCCLIdOp : public framework::OperatorBase { class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { public: - GenNCCLIdOpMaker(OpProto* proto, OpAttrChecker* op_checker) - : OpProtoAndCheckerMaker(proto, op_checker) { + void Make() override { AddOutput("NCCLID", "Raw variable contains a NCCL UniqueId instaces."); AddComment(R"DOC( GenNCCLId operator From 6ef60de6f1cb4552b2ecb0b02e6282072b062b3a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sat, 12 May 2018 00:05:43 +0800 Subject: [PATCH 11/16] update --- .clang_format.hook | 2 +- paddle/fluid/operators/detail/sendrecvop_utils.cc | 2 +- paddle/fluid/operators/gen_nccl_id_op.cc | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.clang_format.hook b/.clang_format.hook index 1d92821686..edec286b77 100755 --- a/.clang_format.hook +++ b/.clang_format.hook @@ -1,7 +1,7 @@ #!/bin/bash set -e -readonly VERSION="3.8" +readonly VERSION="7.0" version=$(clang-format -version) diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index f3ac149947..07c43554bc 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -162,7 +162,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, if (var->IsType()) { e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, NCCL_UNIQUE_ID_BYTES); - ncclUniqueId& uid = var->Get(); + const ncclUniqueId& uid = var->Get(); e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES)); // for serialize NCCL_ID diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 77cd4a460a..6d50bd13e8 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -23,6 +23,7 @@ limitations under the License. */ #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/grpc_client.h" #include "paddle/fluid/operators/detail/grpc_server.h" +#include "paddle/fluid/platform/nccl_helper.h" namespace paddle { namespace operators { From 6387a15be237031ff11847f7ee88f113bbd65b50 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sat, 12 May 2018 00:19:01 +0800 Subject: [PATCH 12/16] update --- .clang_format.hook | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.clang_format.hook b/.clang_format.hook index edec286b77..1d92821686 100755 --- a/.clang_format.hook +++ b/.clang_format.hook @@ -1,7 +1,7 @@ #!/bin/bash set -e -readonly VERSION="7.0" +readonly VERSION="3.8" version=$(clang-format -version) From 5ae0c664b01d36a0fbdaf78886b08bc9e5d3b883 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sat, 12 May 2018 09:02:39 +0800 Subject: [PATCH 13/16] fix build and merge develop --- paddle/fluid/operators/test_send_nccl_id.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc index 2c3c5ea0a0..bbae1d54aa 100644 --- a/paddle/fluid/operators/test_send_nccl_id.cc +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -24,6 +24,7 @@ limitations under the License. */ #include "paddle/fluid/operators/listen_and_serv_op.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/platform/nccl_helper.h" #include "paddle/fluid/string/printf.h" USE_NO_KERNEL_OP(listen_and_serv); @@ -36,7 +37,7 @@ namespace string = paddle::string; std::unique_ptr rpc_service; -void StartServer() { +void StartServer(std::atomic* initialized) { f::Scope scope; p::CPUPlace place; scope.Var(NCCL_ID_VARNAME); @@ -54,6 +55,7 @@ void StartServer() { std::thread server_thread( std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service.get())); + *initialized = true; rpc_service->SetCond(0); auto recv = rpc_service->Get(); LOG(INFO) << "got nccl id and stop server..."; @@ -62,9 +64,13 @@ void StartServer() { } TEST(SendNcclId, Normal) { - std::thread server_thread(StartServer); + std::atomic initialized{false}; + std::thread server_thread(StartServer, &initialized); + while (!initialized) { + } // wait server to start - rpc_service.WaitServerReady(); + // sleep(2); + rpc_service->WaitServerReady(); f::Scope scope; p::CPUPlace place; From 7b0c0273f4a5d0dc7a65248a1c2cb7a8ebf49909 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 14 May 2018 14:05:30 +0800 Subject: [PATCH 14/16] update by comments --- paddle/fluid/framework/parallel_executor.h | 2 +- paddle/fluid/operators/gen_nccl_id_op.cc | 24 +++++++++++----------- paddle/fluid/platform/nccl_helper.h | 4 ++-- python/paddle/fluid/parallel_executor.py | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 7ab17de389..9e279876cf 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -42,7 +42,7 @@ class ParallelExecutor { const std::vector& local_scopes, bool allow_op_delay, bool use_default_grad_scale, bool balance_parameter_opt_between_cards, - size_t num_trainers = 0, size_t trainer_id = 0); + size_t num_trainers = 1, size_t trainer_id = 0); ~ParallelExecutor(); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index b4ff6b7598..36fc862213 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -75,29 +75,29 @@ class GenNCCLIdOp : public framework::OperatorBase { // NOTE: Can not use unique_ptr here because the default // deleter will call GRPC Server's base class's dtor and // that will cause a wired crash. - rpc_service_ = new detail::AsyncGRPCServer(endpoint, true); + + detail::AsyncGRPCServer rpc_service(endpoint, true); framework::ProgramDesc empty_program; framework::Executor executor(dev_ctx.GetPlace()); - rpc_service_->SetScope(scope); - rpc_service_->SetDevCtx(&dev_ctx); - rpc_service_->SetProgram(&empty_program); - rpc_service_->SetExecutor(&executor); + rpc_service.SetScope(scope); + rpc_service.SetDevCtx(&dev_ctx); + rpc_service.SetProgram(&empty_program); + rpc_service.SetExecutor(&executor); std::thread server_thread( - std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service_)); - rpc_service_->SetCond(0); + std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, &rpc_service)); + rpc_service.SetCond(0); VLOG(3) << "start getting nccl id from trainer 0..."; - auto recv = rpc_service_->Get(); + auto recv = rpc_service.Get(); VLOG(3) << "got nccl id and stop server..."; - rpc_service_->ShutDown(); + rpc_service.ShutDown(); VLOG(3) << "rpc server stopped"; // TODO(wuyi): reinit nccl communicators server_thread.join(); - delete rpc_service_; } - protected: - mutable detail::AsyncGRPCServer* rpc_service_ = nullptr; + // protected: + // mutable detail::AsyncGRPCServer* rpc_service_ = nullptr; }; class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 408721be8b..e30c1a9ebf 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -78,7 +78,7 @@ struct NCCLContextMap { explicit NCCLContextMap(const std::vector &places, ncclUniqueId *nccl_id = nullptr, - size_t num_trainers = 0, size_t trainer_id = 0) { + size_t num_trainers = 1, size_t trainer_id = 0) { PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { @@ -100,7 +100,7 @@ struct NCCLContextMap { PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( comms.get(), static_cast(order_.size()), order_.data())); } else { - PADDLE_ENFORCE_GT(num_trainers, 0); + PADDLE_ENFORCE_GT(num_trainers, 1); // TODO(wuyi): need to ensure each node have same number of GPUs { int nranks = num_trainers * order_.size(); diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 59294d9721..7358c4b60e 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -32,7 +32,7 @@ class ParallelExecutor(object): share_vars_from=None, use_default_grad_scale=True, balance_parameter_opt_between_cards=False, - num_trainers=0, + num_trainers=1, trainer_id=0): """ ParallelExecutor can run program in parallel. @@ -57,7 +57,7 @@ class ParallelExecutor(object): balance_parameter_opt_between_cards(bool, default True): Whether updating different gradients on different cards. Currently, it is not recommended. - num_trainers(int, default 0): If greater than 0, NCCL will be + num_trainers(int, default 1): If greater than 1, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. trainer_id(int, default 0): Must use together with num_trainers. From 373a2e66eb339f7b6d1bd9ef292ce1b693126ff5 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 14 May 2018 15:47:11 +0800 Subject: [PATCH 15/16] remove comments --- paddle/fluid/operators/gen_nccl_id_op.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 36fc862213..804d43e4cd 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -92,12 +92,8 @@ class GenNCCLIdOp : public framework::OperatorBase { VLOG(3) << "got nccl id and stop server..."; rpc_service.ShutDown(); VLOG(3) << "rpc server stopped"; - // TODO(wuyi): reinit nccl communicators server_thread.join(); } - - // protected: - // mutable detail::AsyncGRPCServer* rpc_service_ = nullptr; }; class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { From 872e55bce5e51e466844367dc1f552d9031eef38 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 14 May 2018 15:47:39 +0800 Subject: [PATCH 16/16] remove comments --- paddle/fluid/operators/gen_nccl_id_op.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index 804d43e4cd..a5678f6346 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -75,7 +75,6 @@ class GenNCCLIdOp : public framework::OperatorBase { // NOTE: Can not use unique_ptr here because the default // deleter will call GRPC Server's base class's dtor and // that will cause a wired crash. - detail::AsyncGRPCServer rpc_service(endpoint, true); framework::ProgramDesc empty_program; framework::Executor executor(dev_ctx.GetPlace());