From 5030681c36e9e9497f3c45cdbd451c8739bdba1f Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Thu, 8 Mar 2018 20:41:31 +0800 Subject: [PATCH 01/32] add MKL for fluid static and shared library --- cmake/external/mklml.cmake | 2 +- cmake/inference_lib.cmake | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index 739a910c7c..f24cb2d11b 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -34,7 +34,7 @@ SET(MKLML_DOWNLOAD_DIR "${MKLML_SOURCE_DIR}/src/${MKLML_PROJECT}") SET(MKLML_DST_DIR "mklml") SET(MKLML_INSTALL_ROOT "${THIRD_PARTY_PATH}/install") SET(MKLML_INSTALL_DIR ${MKLML_INSTALL_ROOT}/${MKLML_DST_DIR}) -SET(MKLML_ROOT ${MKLML_INSTALL_DIR}/${MKLML_VER}) +SET(MKLML_ROOT ${MKLML_INSTALL_DIR}) SET(MKLML_INC_DIR ${MKLML_ROOT}/include) SET(MKLML_LIB_DIR ${MKLML_ROOT}/lib) SET(MKLML_LIB ${MKLML_LIB_DIR}/libmklml_intel.so) diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index 6b2237b858..fb81498fd6 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -69,6 +69,12 @@ if(NOT CBLAS_FOUND) SRCS ${CBLAS_INSTALL_DIR}/lib ${CBLAS_INSTALL_DIR}/include DSTS ${dst_dir} ${dst_dir} ) +else() + set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/mklml") + copy(mklml_lib + SRCS ${MKLML_LIB_DIR} ${MKLML_INC_DIR} + DSTS ${dst_dir} ${dst_dir} + ) endif() # paddle fluid module From bc0cfb2283633b65669be1d8f7a7f2040d6726f2 Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Thu, 8 Mar 2018 20:42:16 +0800 Subject: [PATCH 02/32] remove PADDLE_USE_ATLAS --- paddle/fluid/operators/math/math_function.h | 7 ------- paddle/math/MathFunctions.cpp | 15 ++++----------- paddle/math/MathFunctions.h | 2 +- 3 files changed, 5 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/operators/math/math_function.h b/paddle/fluid/operators/math/math_function.h index 47e2386d05..cdbc7bfb37 100644 --- a/paddle/fluid/operators/math/math_function.h +++ b/paddle/fluid/operators/math/math_function.h @@ -19,13 +19,6 @@ limitations under the License. */ #include #endif -#ifdef PADDLE_USE_ATLAS -extern "C" { -#include -#include -} -#endif - #ifdef PADDLE_USE_OPENBLAS #include #include diff --git a/paddle/math/MathFunctions.cpp b/paddle/math/MathFunctions.cpp index b2ff4bc323..de404cad89 100644 --- a/paddle/math/MathFunctions.cpp +++ b/paddle/math/MathFunctions.cpp @@ -59,17 +59,10 @@ void* lapack_dso_handle = nullptr; } __name; // struct DynLoad__##__name #endif -#ifdef PADDLE_USE_ATLAS - #define PADDLE_SGETRF clapack_sgetrf - #define PADDLE_DGETRF clapack_dgetrf - #define PADDLE_SGETRI clapack_sgetri - #define PADDLE_DGETRI clapack_dgetri -#else - #define PADDLE_SGETRF LAPACKE_sgetrf - #define PADDLE_DGETRF LAPACKE_dgetrf - #define PADDLE_SGETRI LAPACKE_sgetri - #define PADDLE_DGETRI LAPACKE_dgetri -#endif +#define PADDLE_SGETRF LAPACKE_sgetrf +#define PADDLE_DGETRF LAPACKE_dgetrf +#define PADDLE_SGETRI LAPACKE_sgetri +#define PADDLE_DGETRI LAPACKE_dgetri #define LAPACK_ROUTINE_EACH(__macro) \ __macro(PADDLE_SGETRF) \ diff --git a/paddle/math/MathFunctions.h b/paddle/math/MathFunctions.h index f4cf6bd6c2..f3d8b1a39e 100644 --- a/paddle/math/MathFunctions.h +++ b/paddle/math/MathFunctions.h @@ -21,7 +21,7 @@ limitations under the License. */ #include #endif -#if defined(PADDLE_USE_ATLAS) || defined(PADDLE_USE_VECLIB) +#if defined(PADDLE_USE_VECLIB) extern "C" { #include #include From e42b8f8a11c344173c6d276fbdfdef1f13c17d19 Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Tue, 13 Mar 2018 16:03:26 +0800 Subject: [PATCH 03/32] fix mklml install path --- cmake/external/mklml.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index f24cb2d11b..df3f0c7f0c 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -46,7 +46,7 @@ INCLUDE_DIRECTORIES(${MKLML_INC_DIR}) FILE(WRITE ${MKLML_DOWNLOAD_DIR}/CMakeLists.txt "PROJECT(MKLML)\n" "cmake_minimum_required(VERSION 3.0)\n" - "install(DIRECTORY ${MKLML_VER}\n" + "install(DIRECTORY ${MKLML_VER}/include ${MKLML_VER}/lib \n" " DESTINATION ${MKLML_DST_DIR})\n") ExternalProject_Add( From 236b7dd2bde254f83479ca632756b4dfaa1b8bdc Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Tue, 20 Mar 2018 14:28:07 +0800 Subject: [PATCH 04/32] add pinned memory --- .../fluid/memory/detail/system_allocator.cc | 41 ++++++++++++++ paddle/fluid/memory/detail/system_allocator.h | 12 +++++ paddle/fluid/memory/memory.cc | 53 ++++++++++++++++--- paddle/fluid/memory/memory.h | 12 +++-- 4 files changed, 107 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/memory/detail/system_allocator.cc b/paddle/fluid/memory/detail/system_allocator.cc index 8ac8978120..df9d28ede8 100644 --- a/paddle/fluid/memory/detail/system_allocator.cc +++ b/paddle/fluid/memory/detail/system_allocator.cc @@ -119,6 +119,47 @@ void GPUAllocator::Free(void* p, size_t size, size_t index) { bool GPUAllocator::UseGpu() const { return true; } +void* CUDAPinnedAllocator::Alloc(size_t& index, size_t size) { + if (size <= 0) return nullptr; + void* p; + // NOTE: here, we use GpuMaxAllocSize() as the maximum memory size + // of host fallback allocation. Allocates too much would reduce + // the amount of memory available to the underlying system for paging. + + size_t usable = paddle::platform::GpuMaxAllocSize() - fallback_alloc_size_; + + if (size > usable) return nullptr; + + cudaError_t result = cudaMallocHost(&p, size); + if (result == cudaSuccess) { + index = 1; + fallback_alloc_size_ += size; + return p; + } + + return nullptr; +} + +void CUDAPinnedAllocator::Free(void* p, size_t size, size_t index) { + cudaError_t err; + PADDLE_ASSERT(index == 1); + + PADDLE_ASSERT(fallback_alloc_size_ >= size); + fallback_alloc_size_ -= size; + err = cudaFreeHost(p); + + // Purposefully allow cudaErrorCudartUnloading, because + // that is returned if you ever call cudaFree after the + // driver has already shutdown. This happens only if the + // process is terminating, in which case we don't care if + // cudaFree succeeds. + if (err != cudaErrorCudartUnloading) { + PADDLE_ENFORCE(err, "cudaFreeHost failed in GPUPinnedAllocator::Free."); + } +} + +bool CUDAPinnedAllocator::UseGpu() const { return true; } + #endif } // namespace detail diff --git a/paddle/fluid/memory/detail/system_allocator.h b/paddle/fluid/memory/detail/system_allocator.h index e93c2c1e32..3e024125fa 100644 --- a/paddle/fluid/memory/detail/system_allocator.h +++ b/paddle/fluid/memory/detail/system_allocator.h @@ -51,6 +51,18 @@ class GPUAllocator : public SystemAllocator { size_t gpu_alloc_size_ = 0; size_t fallback_alloc_size_ = 0; }; + +class CUDAPinnedAllocator : public SystemAllocator { + public: + virtual void* Alloc(size_t& index, size_t size); + virtual void Free(void* p, size_t size, size_t index); + virtual bool UseGpu() const; + + private: + size_t gpu_alloc_size_ = + 0; // TODO(zcd): how to define the upper limit of CUDAPinnedMemory? + size_t fallback_alloc_size_ = 0; +}; #endif } // namespace detail diff --git a/paddle/fluid/memory/memory.cc b/paddle/fluid/memory/memory.cc index d07f89439a..c5577587aa 100644 --- a/paddle/fluid/memory/memory.cc +++ b/paddle/fluid/memory/memory.cc @@ -38,7 +38,8 @@ BuddyAllocator* GetCPUBuddyAllocator() { } template <> -void* Alloc(platform::CPUPlace place, size_t size) { +void* Alloc(platform::CPUPlace place, size_t size, + bool use_pinned) { VLOG(10) << "Allocate " << size << " bytes on " << platform::Place(place); void* p = GetCPUBuddyAllocator()->Alloc(size); VLOG(10) << " pointer=" << p; @@ -46,7 +47,8 @@ void* Alloc(platform::CPUPlace place, size_t size) { } template <> -void Free(platform::CPUPlace place, void* p) { +void Free(platform::CPUPlace place, void* p, + bool use_pinned) { VLOG(10) << "Free pointer=" << p << " on " << platform::Place(place); GetCPUBuddyAllocator()->Free(p); } @@ -82,15 +84,47 @@ BuddyAllocator* GetGPUBuddyAllocator(int gpu_id) { return as[gpu_id]; } +BuddyAllocator* GetCUDAPinnedBuddyAllocator(int gpu_id) { + static BuddyAllocator** as = NULL; + if (as == NULL) { + int gpu_num = platform::GetCUDADeviceCount(); + as = new BuddyAllocator*[gpu_num]; + for (int gpu = 0; gpu < gpu_num; gpu++) { + as[gpu] = nullptr; + } + } + platform::SetDeviceId(gpu_id); + if (!as[gpu_id]) { + as[gpu_id] = new BuddyAllocator(new detail::CUDAPinnedAllocator, + platform::GpuMinChunkSize(), + platform::GpuMaxChunkSize()); + VLOG(10) << "\n\nNOTE: each GPU device use " + << FLAGS_fraction_of_gpu_memory_to_use * 100 + << "% of GPU memory.\n" + << "You can set GFlags environment variable '" + << "FLAGS_fraction_of_gpu_memory_to_use" + << "' to change the fraction of GPU usage.\n\n"; + } + return as[gpu_id]; +} + template <> size_t Used(platform::CUDAPlace place) { return GetGPUBuddyAllocator(place.device)->Used(); } template <> -void* Alloc(platform::CUDAPlace place, size_t size) { - auto* buddy_allocator = GetGPUBuddyAllocator(place.device); - auto* ptr = buddy_allocator->Alloc(size); +void* Alloc(platform::CUDAPlace place, size_t size, + bool use_pinned) { + void* ptr; + if (use_pinned) { + auto* buddy_allocator = GetCUDAPinnedBuddyAllocator(place.device); + ptr = buddy_allocator->Alloc(size); + } else { + auto* buddy_allocator = GetGPUBuddyAllocator(place.device); + ptr = buddy_allocator->Alloc(size); + } + if (ptr == nullptr) { int cur_dev = platform::GetCurrentDeviceId(); platform::SetDeviceId(place.device); @@ -108,8 +142,13 @@ void* Alloc(platform::CUDAPlace place, size_t size) { } template <> -void Free(platform::CUDAPlace place, void* p) { - GetGPUBuddyAllocator(place.device)->Free(p); +void Free(platform::CUDAPlace place, void* p, + bool use_pinned) { + if (use_pinned) { + GetCUDAPinnedBuddyAllocator(place.device)->Free(p); + } else { + GetGPUBuddyAllocator(place.device)->Free(p); + } } #endif diff --git a/paddle/fluid/memory/memory.h b/paddle/fluid/memory/memory.h index 7c5db815d6..9bc48ac68f 100644 --- a/paddle/fluid/memory/memory.h +++ b/paddle/fluid/memory/memory.h @@ -33,7 +33,7 @@ namespace memory { * address is valid or not. */ template -void* Alloc(Place place, size_t size); +void* Alloc(Place place, size_t size, bool use_pinned = false); /** * \brief Free memory block in one place. @@ -43,7 +43,7 @@ void* Alloc(Place place, size_t size); * */ template -void Free(Place place, void* ptr); +void Free(Place place, void* ptr, bool use_pinned = false); /** * \brief Total size of used memory in one place. @@ -74,11 +74,15 @@ class PODDeleter { static_assert(std::is_pod::value, "T must be POD"); public: - explicit PODDeleter(Place place) : place_(place) {} - void operator()(T* ptr) { Free(place_, static_cast(ptr)); } + explicit PODDeleter(Place place, bool use_pinned = false) + : place_(place), use_pinned_(use_pinned) {} + void operator()(T* ptr) { + Free(place_, static_cast(ptr), use_pinned_); + } private: Place place_; + bool use_pinned_; }; /** From eaa90d38ad121ae019688f024380526cf7d504c8 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Tue, 20 Mar 2018 15:12:15 +0800 Subject: [PATCH 05/32] add use_pinned --- paddle/fluid/framework/tensor.h | 32 +++++++++++++++++++--------- paddle/fluid/framework/tensor_impl.h | 23 ++++++++++++-------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/framework/tensor.h b/paddle/fluid/framework/tensor.h index 6f878541e6..aa8f44ea30 100644 --- a/paddle/fluid/framework/tensor.h +++ b/paddle/fluid/framework/tensor.h @@ -45,10 +45,11 @@ class Tensor { friend struct EigenVector; public: - Tensor() : offset_(0) {} + Tensor() : offset_(0), use_pinned_(false) {} /*! Constructor with place should only be used in pybind. */ - explicit Tensor(const platform::Place& place) : offset_(0) { + explicit Tensor(const platform::Place& place) + : offset_(0), use_pinned_(false) { holder_->set_place(place); } @@ -69,11 +70,12 @@ class Tensor { * @note If not exist, then allocation. */ template - inline T* mutable_data(platform::Place place); + inline T* mutable_data(platform::Place place, bool use_pinned = false); - inline void* mutable_data(platform::Place place, std::type_index type); + inline void* mutable_data(platform::Place place, std::type_index type, + bool use_pinned = false); - inline void* mutable_data(platform::Place place); + inline void* mutable_data(platform::Place place, bool use_pinned = false); /** * @brief Return a pointer to mutable memory block. @@ -84,7 +86,8 @@ class Tensor { * @note If not exist, then allocation. */ template - inline T* mutable_data(DDim dims, platform::Place place); + inline T* mutable_data(DDim dims, platform::Place place, + bool use_pinned = false); /*! Return the dimensions of the memory block. */ inline const DDim& dims() const; @@ -92,6 +95,9 @@ class Tensor { /*! Return the numel of the memory block. */ inline int64_t numel() const; + /*! Return the numel of the memory block. */ + inline bool isPinned() const; + /*! Resize the dimensions of the memory block. */ inline Tensor& Resize(const DDim& dims); @@ -146,12 +152,14 @@ class Tensor { template struct PlaceholderImpl : public Placeholder { - PlaceholderImpl(Place place, size_t size, std::type_index type) - : ptr_(static_cast(memory::Alloc(place, size)), - memory::PODDeleter(place)), + PlaceholderImpl(Place place, size_t size, std::type_index type, + bool use_pinned = false) + : ptr_(static_cast(memory::Alloc(place, size, use_pinned)), + memory::PODDeleter(place, use_pinned)), place_(place), size_(size), - type_(type) { + type_(type), + use_pinned_(use_pinned) { PADDLE_ENFORCE_NOT_NULL(ptr_, "Insufficient %s memory to allocation.", (is_cpu_place(place_) ? "CPU" : "GPU")); } @@ -174,6 +182,9 @@ class Tensor { /* the current type of memory */ std::type_index type_; + + /*! use pinned memory or not. */ + bool use_pinned_; }; /*! holds the memory block if allocated. */ @@ -208,6 +219,7 @@ class Tensor { * PlaceHolder::ptr_ and where the tensor data really begins. */ size_t offset_; + bool use_pinned_; }; inline void Tensor::switch_place(platform::Place new_place) { diff --git a/paddle/fluid/framework/tensor_impl.h b/paddle/fluid/framework/tensor_impl.h index 638bd0db9d..e882cce69e 100644 --- a/paddle/fluid/framework/tensor_impl.h +++ b/paddle/fluid/framework/tensor_impl.h @@ -101,19 +101,21 @@ inline T* Tensor::data() { } template -inline T* Tensor::mutable_data(DDim dims, platform::Place place) { +inline T* Tensor::mutable_data(DDim dims, platform::Place place, + bool use_pinned) { static_assert(std::is_pod::value, "T must be POD"); Resize(dims); - return mutable_data(place); + return mutable_data(place, use_pinned); } template -inline T* Tensor::mutable_data(platform::Place place) { +inline T* Tensor::mutable_data(platform::Place place, bool use_pinned) { static_assert(std::is_pod::value, "T must be POD"); - return reinterpret_cast(mutable_data(place, typeid(T))); + return reinterpret_cast(mutable_data(place, typeid(T), use_pinned)); } -inline void* Tensor::mutable_data(platform::Place place, std::type_index type) { +inline void* Tensor::mutable_data(platform::Place place, std::type_index type, + bool use_pinned) { if (holder_ != nullptr) { holder_->set_type(type); } @@ -127,26 +129,27 @@ inline void* Tensor::mutable_data(platform::Place place, std::type_index type) { holder_->size() < size + offset_) { if (platform::is_cpu_place(place)) { holder_.reset(new PlaceholderImpl( - boost::get(place), size, type)); + boost::get(place), size, type, use_pinned)); } else if (platform::is_gpu_place(place)) { #ifndef PADDLE_WITH_CUDA PADDLE_THROW("'CUDAPlace' is not supported in CPU only device."); } #else holder_.reset(new PlaceholderImpl( - boost::get(place), size, type)); + boost::get(place), size, type, use_pinned)); } #endif offset_ = 0; + use_pinned_ = use_pinned; } return reinterpret_cast(reinterpret_cast(holder_->ptr()) + offset_); } -inline void* Tensor::mutable_data(platform::Place place) { +inline void* Tensor::mutable_data(platform::Place place, bool use_pinned) { PADDLE_ENFORCE(this->holder_ != nullptr, "Cannot invoke mutable data if current hold nothing"); - return mutable_data(place, holder_->type()); + return mutable_data(place, holder_->type(), use_pinned); } inline Tensor& Tensor::ShareDataWith(const Tensor& src) { @@ -188,6 +191,8 @@ inline const DDim& Tensor::dims() const { return dims_; } inline int64_t Tensor::numel() const { return product(dims_); } +inline bool Tensor::isPinned() const { return use_pinned_; } + inline Tensor ReshapeToMatrix(const Tensor& src, int num_col_dims) { Tensor res; res.ShareDataWith(src); From ba9f4c787393c57e8f29477e01a3c6b3f43e3fa2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Mar 2018 20:07:26 +0800 Subject: [PATCH 06/32] fix test_recv_op --- python/paddle/fluid/layers/io.py | 17 ++++++++--------- .../fluid/tests/unittests/test_recv_op.py | 17 +++++++++-------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index bc5e291ad8..bd7e9c30fe 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -113,9 +113,9 @@ class ListenAndServ(object): which can receive variables from clients and run a block. """ - def __init__(self, endpoint, fan_in=1, optimizer_mode=True): + def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True): self.helper = LayerHelper("listen_and_serv") - self.inputs = [] + self.inputs = inputs self.outputs = [] self.endpoint = endpoint self.fan_in = fan_in @@ -160,18 +160,13 @@ class ListenAndServ(object): current_block = main_program.current_block() parent_block = self.parent_block() - params, grads = self.get_params_and_grads() - param_names = [p.name for p in params] - grad_names = [g.name for g in grads] parent_block.append_op( type='listen_and_serv', - inputs={}, + inputs={"X": self.inputs}, outputs={}, attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'ParamList': param_names, - 'GradList': grad_names, 'OptimizeBlock': current_block }) @@ -196,10 +191,14 @@ def Send(endpoints, send_vars, get_vars): endpoints = list(set(epmap)) helper = LayerHelper("Send", **locals()) + rpc_client_var = default_main_program().global_block().create_var( + name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) + helper.append_op( type="send", inputs={"X": send_vars}, - outputs={"Out": get_vars}, + outputs={"Out": get_vars, + "RPCClient": rpc_client_var}, attrs={"endpoints": endpoints, "epmap": epmap}) diff --git a/python/paddle/fluid/tests/unittests/test_recv_op.py b/python/paddle/fluid/tests/unittests/test_recv_op.py index 985d892c56..f8b7724039 100644 --- a/python/paddle/fluid/tests/unittests/test_recv_op.py +++ b/python/paddle/fluid/tests/unittests/test_recv_op.py @@ -32,20 +32,21 @@ class TestRecvOp(unittest.TestCase): time.sleep(1) self.init_client(place) # FIXME(typhoonzero): find a way to gracefully shutdown the server. - os.system("kill -9 %d" % p.pid) + # os.system("kill -9 %d" % p.pid) p.join() def init_serv(self, place): main = fluid.Program() with fluid.program_guard(main): - x = layers.data( - shape=[32, 32], - dtype='float32', - name="X", - append_batch_size=False) - fluid.initializer.Constant(value=1.0)(x, main.global_block()) - serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False) + serv = layers.ListenAndServ( + "127.0.0.1:6174", ["X"], optimizer_mode=False) with serv.do(): + x = layers.data( + shape=[32, 32], + dtype='float32', + name="X", + append_batch_size=False) + fluid.initializer.Constant(value=1.0)(x, main.global_block()) o = layers.scale(x=x, scale=10.0) main.global_block().create_var( name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) From 6cebbd7bcb9d9a88aa482efd38ecfc3a5d4e9fa9 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Mar 2018 20:16:24 +0800 Subject: [PATCH 07/32] update --- python/paddle/fluid/tests/unittests/test_recv_op.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_recv_op.py b/python/paddle/fluid/tests/unittests/test_recv_op.py index f8b7724039..854238c627 100644 --- a/python/paddle/fluid/tests/unittests/test_recv_op.py +++ b/python/paddle/fluid/tests/unittests/test_recv_op.py @@ -32,7 +32,7 @@ class TestRecvOp(unittest.TestCase): time.sleep(1) self.init_client(place) # FIXME(typhoonzero): find a way to gracefully shutdown the server. - # os.system("kill -9 %d" % p.pid) + os.system("kill -9 %d" % p.pid) p.join() def init_serv(self, place): From 14ba67c0ef3bcff13d95788406518bb132fe4a28 Mon Sep 17 00:00:00 2001 From: Tomasz Patejko Date: Thu, 22 Mar 2018 08:46:20 -0400 Subject: [PATCH 08/32] Function for running MKLDNN primitive added. Unittest added for is_test attribute --- paddle/fluid/operators/lrn_mkldnn_op.cc | 23 +++++++++++-------- paddle/fluid/operators/lrn_op.cc | 2 +- .../fluid/tests/unittests/test_lrn_op.py | 19 +++++++++++++++ 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/operators/lrn_mkldnn_op.cc b/paddle/fluid/operators/lrn_mkldnn_op.cc index 3bead16ce4..0a18882e81 100644 --- a/paddle/fluid/operators/lrn_mkldnn_op.cc +++ b/paddle/fluid/operators/lrn_mkldnn_op.cc @@ -36,6 +36,14 @@ std::shared_ptr insert_to_context(const std::string& key, return p; } + +template +void run_primitive(Args&&... args) { + auto forward_op = mkldnn::lrn_forward{args...}; + + std::vector pipeline = {forward_op}; + mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait(); +} } // namespace template @@ -87,8 +95,6 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel { auto dst_memory = mkldnn::memory{{dst_md, mkldnn_engine}, static_cast(output_data)}; - std::unique_ptr forward_op = nullptr; - if (!is_test) { const std::string key = ctx.op().Output("Out"); const std::string key_src_memory = key + "@lrn_src_memory"; @@ -108,9 +114,7 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel { key_workspace_memory, dev_ctx, forward_pd->workspace_primitive_desc()); - forward_op.reset(new mkldnn::lrn_forward{*forward_pd, *src_memory, - *workspace_memory, dst_memory}); - + run_primitive(*forward_pd, *src_memory, *workspace_memory, dst_memory); } else { auto forward_pd = mkldnn::lrn_forward::primitive_desc{forward_desc, mkldnn_engine}; @@ -119,12 +123,8 @@ class LRNMKLDNNOpKernel : public paddle::framework::OpKernel { auto workspace_memory = mkldnn::memory{forward_pd.workspace_primitive_desc()}; - forward_op.reset(new mkldnn::lrn_forward{forward_pd, src_memory, - workspace_memory, dst_memory}); + run_primitive(forward_pd, src_memory, workspace_memory, dst_memory); } - - std::vector pipeline = {*forward_op}; - mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait(); } }; @@ -136,6 +136,9 @@ class LRNMKLDNNGradOpKernel : public paddle::framework::OpKernel { "MKLDNN LRN must use float data."); PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()), "MKLDNN LRN must use CPUPlace."); + PADDLE_ENFORCE( + !ctx.Attr("is_test"), + "is_test attribute should be set to False in training phase."); auto x = ctx.Input("X"); diff --git a/paddle/fluid/operators/lrn_op.cc b/paddle/fluid/operators/lrn_op.cc index 2b1947a187..b36b5c3a33 100644 --- a/paddle/fluid/operators/lrn_op.cc +++ b/paddle/fluid/operators/lrn_op.cc @@ -155,8 +155,8 @@ class LRNOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ(x_dim.size(), 4, "Input(X)'rank of LRNOp should be 4."); ctx->SetOutputDim("Out", x_dim); - ctx->SetOutputDim("MidOut", x_dim); ctx->ShareLoD("X", /*->*/ "Out"); + ctx->SetOutputDim("MidOut", x_dim); } framework::OpKernelType GetExpectedKernelType( diff --git a/python/paddle/fluid/tests/unittests/test_lrn_op.py b/python/paddle/fluid/tests/unittests/test_lrn_op.py index 2268eafdbd..8fa480b9bc 100644 --- a/python/paddle/fluid/tests/unittests/test_lrn_op.py +++ b/python/paddle/fluid/tests/unittests/test_lrn_op.py @@ -97,5 +97,24 @@ class TestLRNMKLDNNOp(TestLRNOp): self.check_output(atol=0.002) +class TestLRNMKLDNNOpWithIsTest(TestLRNMKLDNNOp): + def get_attrs(self): + attrs = TestLRNMKLDNNOp.get_attrs(self) + attrs['is_test'] = True + return attrs + + def test_check_grad_normal(self): + def check_raise_is_test(): + try: + self.check_grad(['X'], 'Out', max_relative_error=0.01) + except Exception as e: + t = \ + "is_test attribute should be set to False in training phase." + if t in str(e): + raise AttributeError + + self.assertRaises(AttributeError, check_raise_is_test) + + if __name__ == "__main__": unittest.main() From 904fa05f4692eebdcebd8b3966a09c162ccd1da4 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Sun, 25 Mar 2018 02:29:02 -0700 Subject: [PATCH 09/32] Improve layer_norm speed transfomer on a single device step time reduces from 0.157 to 0.125 --- paddle/fluid/operators/layer_norm_op.h | 137 +++++++++++++++++++++---- 1 file changed, 116 insertions(+), 21 deletions(-) diff --git a/paddle/fluid/operators/layer_norm_op.h b/paddle/fluid/operators/layer_norm_op.h index 605b5c258c..63561aaa31 100644 --- a/paddle/fluid/operators/layer_norm_op.h +++ b/paddle/fluid/operators/layer_norm_op.h @@ -22,6 +22,99 @@ limitations under the License. */ namespace paddle { namespace operators { +// Wrap RowwiseMean and ColwiseMean. +// Reuse the cpu codes and replace the gpu codes with cublas_gemv, which is +// significantly faster. Unlike the RowwiseMean and ColwiseMean, the +// implementation only considers 2D. +template +struct RowwiseMean2D { + RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx); + + void operator()(const platform::DeviceContext& context, + const framework::Tensor& input, framework::Tensor* vec); +}; + +template +class RowwiseMean2D { + public: + RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx) + : left_(left), right_(right) { + framework::DDim ones_dim({right_}); + divisor_.mutable_data(ones_dim, dev_ctx.GetPlace()); + math::set_constant(dev_ctx, &divisor_, 1.0 / right); + } + void operator()(const platform::CUDADeviceContext& context, + const framework::Tensor& input, framework::Tensor* out) { + math::gemv( + context, false, left_, right_, 1., input.data(), divisor_.data(), + 0., out->data()); + } + + private: + int left_; + int right_; + framework::Tensor divisor_; +}; + +template +class RowwiseMean2D { + public: + RowwiseMean2D(int left, int right, const platform::DeviceContext& dev_ctx) {} + + void operator()(const platform::CPUDeviceContext& context, + const framework::Tensor& input, framework::Tensor* out) { + row_mean_(context, input, out); + } + + private: + math::RowwiseMean row_mean_; +}; + +template +struct ColwiseSum2D { + ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx); + + void operator()(const platform::DeviceContext& context, + const framework::Tensor& input, framework::Tensor* vec); +}; + +template +class ColwiseSum2D { + public: + ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx) + : left_(left), right_(right) { + framework::DDim ones_dim({left_}); + divisor_.mutable_data(ones_dim, dev_ctx.GetPlace()); + math::set_constant(dev_ctx, &divisor_, 1.0); + } + + void operator()(const platform::CUDADeviceContext& context, + const framework::Tensor& input, framework::Tensor* out) { + math::gemv( + context, true, left_, right_, 1., input.data(), divisor_.data(), + 0., out->data()); + } + + private: + int left_; + int right_; + framework::Tensor divisor_; +}; + +template +class ColwiseSum2D { + public: + ColwiseSum2D(int left, int right, const platform::DeviceContext& dev_ctx) {} + + void operator()(const platform::CPUDeviceContext& context, + const framework::Tensor& input, framework::Tensor* out) { + col_wise_(context, input, out); + } + + private: + math::ColwiseSum col_wise_; +}; + template struct SubAndSquareFunctor { inline HOSTDEVICE T operator()(T a, T b) const { return (a - b) * (a - b); } @@ -67,15 +160,15 @@ using DataLayout = framework::DataLayout; template class LayerNormKernel : public framework::OpKernel { public: - void Compute(const framework::ExecutionContext &ctx) const override { + void Compute(const framework::ExecutionContext& ctx) const override { const float epsilon = ctx.Attr("epsilon"); - auto *scale = ctx.Input("Scale"); - auto *bias = ctx.Input("Bias"); + auto* scale = ctx.Input("Scale"); + auto* bias = ctx.Input("Bias"); auto x = *ctx.Input("X"); - auto *y = ctx.Output("Y"); - auto *mean = ctx.Output("Mean"); - auto *var = ctx.Output("Variance"); + auto* y = ctx.Output("Y"); + auto* mean = ctx.Output("Mean"); + auto* var = ctx.Output("Variance"); const auto begin_norm_axis = ctx.Attr("begin_norm_axis"); const auto x_dims = x.dims(); @@ -94,8 +187,8 @@ class LayerNormKernel : public framework::OpKernel { out.ShareDataWith(*y); out.Resize(matrix_shape); - auto &dev_ctx = ctx.template device_context(); - math::RowwiseMean row_mean; + auto& dev_ctx = ctx.template device_context(); + RowwiseMean2D row_mean(left, right, ctx.device_context()); // get mean row_mean(dev_ctx, x, mean); @@ -126,31 +219,32 @@ class LayerNormKernel : public framework::OpKernel { template class LayerNormGradKernel : public framework::OpKernel { public: - void Compute(const framework::ExecutionContext &ctx) const override { + void Compute(const framework::ExecutionContext& ctx) const override { const float epsilon = ctx.Attr("epsilon"); auto x = *ctx.Input("X"); - auto *y = ctx.Input("Y"); - auto *mean = ctx.Input("Mean"); - auto *var = ctx.Input("Variance"); - auto *scale = ctx.Input("Scale"); - auto *bias = ctx.Input("Bias"); + auto* y = ctx.Input("Y"); + auto* mean = ctx.Input("Mean"); + auto* var = ctx.Input("Variance"); + auto* scale = ctx.Input("Scale"); + auto* bias = ctx.Input("Bias"); auto d_y = *ctx.Input(framework::GradVarName("Y")); const auto begin_norm_axis = ctx.Attr("begin_norm_axis"); // init output - auto *d_x = ctx.Output(framework::GradVarName("X")); - auto *d_scale = ctx.Output(framework::GradVarName("Scale")); - auto *d_bias = ctx.Output(framework::GradVarName("Bias")); + auto* d_x = ctx.Output(framework::GradVarName("X")); + auto* d_scale = ctx.Output(framework::GradVarName("Scale")); + auto* d_bias = ctx.Output(framework::GradVarName("Bias")); - const auto &x_dims = x.dims(); + const auto& x_dims = x.dims(); auto matrix_dim = framework::flatten_to_2d(x_dims, begin_norm_axis); int left = static_cast(matrix_dim[0]); int right = static_cast(matrix_dim[1]); framework::DDim matrix_shape({left, right}); d_y.Resize(matrix_shape); - auto &dev_ctx = ctx.template device_context(); - math::ColwiseSum colwise_sum; + auto& dev_ctx = ctx.template device_context(); + ColwiseSum2D colwise_sum(left, right, + ctx.device_context()); Tensor temp; Tensor temp_norm; @@ -190,7 +284,8 @@ class LayerNormGradKernel : public framework::OpKernel { Tensor temp_vec; temp_vec.mutable_data(vec_shape, ctx.GetPlace()); - math::RowwiseMean row_mean; + RowwiseMean2D row_mean(left, right, + ctx.device_context()); if (d_scale) { // dy_dx From 1a4be55a476e2d02dc35fc945220f9aa9c205808 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Sun, 25 Mar 2018 02:46:59 -0700 Subject: [PATCH 10/32] Pass cpu build --- paddle/fluid/operators/layer_norm_op.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paddle/fluid/operators/layer_norm_op.h b/paddle/fluid/operators/layer_norm_op.h index 63561aaa31..7b84ba0a7d 100644 --- a/paddle/fluid/operators/layer_norm_op.h +++ b/paddle/fluid/operators/layer_norm_op.h @@ -34,6 +34,7 @@ struct RowwiseMean2D { const framework::Tensor& input, framework::Tensor* vec); }; +#ifdef PADDLE_WITH_CUDA template class RowwiseMean2D { public: @@ -55,6 +56,7 @@ class RowwiseMean2D { int right_; framework::Tensor divisor_; }; +#endif template class RowwiseMean2D { @@ -78,6 +80,7 @@ struct ColwiseSum2D { const framework::Tensor& input, framework::Tensor* vec); }; +#ifdef PADDLE_WITH_CUDA template class ColwiseSum2D { public: @@ -100,6 +103,7 @@ class ColwiseSum2D { int right_; framework::Tensor divisor_; }; +#endif template class ColwiseSum2D { From efd7ee8521986e7789ea88ec0e9a2c7ff5c83ca9 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Sun, 25 Mar 2018 19:35:20 +0800 Subject: [PATCH 11/32] translate Cluster Training and Prediction --- doc/v2/faq/cluster/index_en.rst | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/doc/v2/faq/cluster/index_en.rst b/doc/v2/faq/cluster/index_en.rst index 855b7e8e53..7cbcaeefcb 100644 --- a/doc/v2/faq/cluster/index_en.rst +++ b/doc/v2/faq/cluster/index_en.rst @@ -2,4 +2,15 @@ Cluster Training and Prediction ############################### -TBD +.. contents:: + +1. Network connection errors in the log during muliti-node cluster training +------------------------------------------------ +The errors in the log belong to network connection during mulilti-node cluster training, for example, :code:`Connection reset by peer`. +This kind of error is usually caused by the abnormal exit of the training process in some node, and the others cannot connect with this node any longer. Steps to troubleshoot the problem as follows: + +* Find the first error in the :code:`train.log`, :code:`server.log`, check whether other fault casued the problem, such as FPE, lacking of memory or disk. + +* If network connection gave rise to the first error in the log, this may be caused by the port conflict of the non-exclusive execution. Connect with the operator to check if the current MPI cluster supports jobs submitted with parameter :code:`resource=full`. If so, change the port of job. + +* If the currnet MPI cluster does not support exclusive pattern, ask the operator to replace or update the current cluster. From f96f2860f9ca88a9967c73179c7d3f198ea778a7 Mon Sep 17 00:00:00 2001 From: wanglun Date: Mon, 26 Mar 2018 09:42:07 +0800 Subject: [PATCH 12/32] Fix typo of Softmax document --- python/paddle/trainer_config_helpers/activations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/trainer_config_helpers/activations.py b/python/paddle/trainer_config_helpers/activations.py index 00efc01c05..3683968262 100644 --- a/python/paddle/trainer_config_helpers/activations.py +++ b/python/paddle/trainer_config_helpers/activations.py @@ -77,7 +77,7 @@ class SoftmaxActivation(BaseActivation): .. math:: - P(y=j|x) = \\frac{e^{x_j}} {\\sum^K_{k=1} e^{x_j} } + P(y=j|x) = \\frac{e^{x_j}} {\\sum^K_{k=1} e^{x_k} } """ def __init__(self): From d573195dde9dfe64724b536654760e2f954f42b3 Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Mon, 26 Mar 2018 12:46:50 +0800 Subject: [PATCH 13/32] rm libmklml_gnu.so --- cmake/inference_lib.cmake | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index fb81498fd6..0323cd9698 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -69,11 +69,11 @@ if(NOT CBLAS_FOUND) SRCS ${CBLAS_INSTALL_DIR}/lib ${CBLAS_INSTALL_DIR}/include DSTS ${dst_dir} ${dst_dir} ) -else() +elseif (WITH_MKLML) set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/mklml") copy(mklml_lib - SRCS ${MKLML_LIB_DIR} ${MKLML_INC_DIR} - DSTS ${dst_dir} ${dst_dir} + SRCS ${MKLML_LIB} ${MKLML_IOMP_LIB} ${MKLML_INC_DIR} + DSTS ${dst_dir}/lib ${dst_dir}/lib ${dst_dir} ) endif() From 54a85b7bfd1836585ed6f257ed67651e0d516557 Mon Sep 17 00:00:00 2001 From: dragonwarrior Date: Mon, 26 Mar 2018 13:24:10 +0800 Subject: [PATCH 14/32] Add lrn layer (#9157) * add LRN layer for fluid * add LRN layer for fluid * add documentation for LRN layer * add paper reference for LRN layer * add seperate documentation for LRN layer * rm lrn.py in doc/fluid/dev/src * change code style in lrn * fix style of comments in lrn --- python/paddle/fluid/layers/nn.py | 71 +++++++++++++++++++ .../fluid/tests/unittests/test_layers.py | 7 ++ 2 files changed, 78 insertions(+) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 679de6ce2a..2db4e5d27d 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -74,6 +74,7 @@ __all__ = [ 'one_hot', 'autoincreased_step_counter', 'lod_reset', + 'lrn', ] @@ -3410,3 +3411,73 @@ def lod_reset(x, y=None, target_lod=None): raise ValueError("y and target_lod should not be both None.") return out + + +def lrn(input, n=5, k=1.0, alpha=1e-4, beta=0.75, name=None): + """ + Local Response Normalization Layer. This layer performs a type of + "lateral inhibition" by normalizing over local input regions. + + The formula is as follows: + + .. math:: + + Output(i, x, y) = Input(i, x, y) / \left( + k + \alpha \sum\limits^{\min(C, c + n/2)}_{j = \max(0, c - n/2)} + (Input(j, x, y))^2 \right)^{\beta} + + In the above equation: + + * :math:`n`: The number of channels to sum over. + * :math:`k`: The offset (avoid being divided by 0). + * :math:`alpha`: The scaling parameter. + * :math:`beta`: The exponent parameter. + + Refer to `ImageNet Classification with Deep Convolutional Neural Networks + `_ + + Args: + input (Variable): The input tensor of this layer, and the dimension of input tensor must be 4. + n (int, default 5): The number of channels to sum over. + k (float, default 1.0): An offset (usually positive to avoid dividing by 0). + alpha (float, default 1e-4): The scaling parameter. + beta (float, default 0.75): The exponent. + name (str, default None): A name for this operation. + + Raises: + ValueError: If rank of the input tensor is not 4. + + Returns: + A tensor variable storing the transformation result. + + Examples: + .. code-block:: python + + data = fluid.layers.data(name="data", shape=[3, 112, 112], dtype="float32") + lrn = fluid.layers.lrn(input=data) + """ + helper = LayerHelper('lrn', **locals()) + dtype = helper.input_dtype() + input_shape = input.shape + dims = len(input_shape) + + if dims != 4: + raise ValueError( + "dims of input must be 4(not %d), and it's order must be NCHW" % + (dims)) + + mid_out = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) + lrn_out = helper.create_tmp_variable(dtype) + helper.append_op( + type="lrn", + inputs={"X": input}, + outputs={ + "Out": lrn_out, + "MidOut": mid_out, + }, + attrs={"n": n, + "k": k, + "alpha": alpha, + "beta": beta}) + + return lrn_out diff --git a/python/paddle/fluid/tests/unittests/test_layers.py b/python/paddle/fluid/tests/unittests/test_layers.py index b5fd59cf3a..2179826d81 100644 --- a/python/paddle/fluid/tests/unittests/test_layers.py +++ b/python/paddle/fluid/tests/unittests/test_layers.py @@ -231,6 +231,13 @@ class TestBook(unittest.TestCase): self.assertIsNotNone(layers.softmax(hid)) print(str(program)) + def test_lrn(self): + program = Program() + with program_guard(program): + data = layers.data(name='data', shape=[6, 2, 2], dtype='float32') + self.assertIsNotNone(layers.lrn(data)) + print(str(program)) + def test_get_places(self): program = Program() with program_guard(program): From 39004080f4f5358890dc7dcf1be1339ba0efd7b4 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Mon, 26 Mar 2018 16:52:30 +0800 Subject: [PATCH 15/32] replace use_pinned with is_pinned --- paddle/fluid/framework/tensor.h | 24 +++++++++---------- paddle/fluid/framework/tensor_impl.h | 22 ++++++++--------- .../fluid/memory/detail/system_allocator.cc | 7 +++--- paddle/fluid/memory/memory.cc | 12 +++++----- paddle/fluid/memory/memory.h | 14 +++++------ 5 files changed, 39 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/framework/tensor.h b/paddle/fluid/framework/tensor.h index aa8f44ea30..f7a6b5ba84 100644 --- a/paddle/fluid/framework/tensor.h +++ b/paddle/fluid/framework/tensor.h @@ -45,11 +45,11 @@ class Tensor { friend struct EigenVector; public: - Tensor() : offset_(0), use_pinned_(false) {} + Tensor() : offset_(0), is_pinned_(false) {} /*! Constructor with place should only be used in pybind. */ explicit Tensor(const platform::Place& place) - : offset_(0), use_pinned_(false) { + : offset_(0), is_pinned_(false) { holder_->set_place(place); } @@ -70,12 +70,12 @@ class Tensor { * @note If not exist, then allocation. */ template - inline T* mutable_data(platform::Place place, bool use_pinned = false); + inline T* mutable_data(platform::Place place, bool is_pinned = false); inline void* mutable_data(platform::Place place, std::type_index type, - bool use_pinned = false); + bool is_pinned = false); - inline void* mutable_data(platform::Place place, bool use_pinned = false); + inline void* mutable_data(platform::Place place, bool is_pinned = false); /** * @brief Return a pointer to mutable memory block. @@ -87,7 +87,7 @@ class Tensor { */ template inline T* mutable_data(DDim dims, platform::Place place, - bool use_pinned = false); + bool is_pinned = false); /*! Return the dimensions of the memory block. */ inline const DDim& dims() const; @@ -153,13 +153,13 @@ class Tensor { template struct PlaceholderImpl : public Placeholder { PlaceholderImpl(Place place, size_t size, std::type_index type, - bool use_pinned = false) - : ptr_(static_cast(memory::Alloc(place, size, use_pinned)), - memory::PODDeleter(place, use_pinned)), + bool is_pinned = false) + : ptr_(static_cast(memory::Alloc(place, size, is_pinned)), + memory::PODDeleter(place, is_pinned)), place_(place), size_(size), type_(type), - use_pinned_(use_pinned) { + is_pinned_(is_pinned) { PADDLE_ENFORCE_NOT_NULL(ptr_, "Insufficient %s memory to allocation.", (is_cpu_place(place_) ? "CPU" : "GPU")); } @@ -184,7 +184,7 @@ class Tensor { std::type_index type_; /*! use pinned memory or not. */ - bool use_pinned_; + bool is_pinned_; }; /*! holds the memory block if allocated. */ @@ -219,7 +219,7 @@ class Tensor { * PlaceHolder::ptr_ and where the tensor data really begins. */ size_t offset_; - bool use_pinned_; + bool is_pinned_; }; inline void Tensor::switch_place(platform::Place new_place) { diff --git a/paddle/fluid/framework/tensor_impl.h b/paddle/fluid/framework/tensor_impl.h index e882cce69e..08e2f1a95b 100644 --- a/paddle/fluid/framework/tensor_impl.h +++ b/paddle/fluid/framework/tensor_impl.h @@ -102,20 +102,20 @@ inline T* Tensor::data() { template inline T* Tensor::mutable_data(DDim dims, platform::Place place, - bool use_pinned) { + bool is_pinned) { static_assert(std::is_pod::value, "T must be POD"); Resize(dims); - return mutable_data(place, use_pinned); + return mutable_data(place, is_pinned); } template -inline T* Tensor::mutable_data(platform::Place place, bool use_pinned) { +inline T* Tensor::mutable_data(platform::Place place, bool is_pinned) { static_assert(std::is_pod::value, "T must be POD"); - return reinterpret_cast(mutable_data(place, typeid(T), use_pinned)); + return reinterpret_cast(mutable_data(place, typeid(T), is_pinned)); } inline void* Tensor::mutable_data(platform::Place place, std::type_index type, - bool use_pinned) { + bool is_pinned) { if (holder_ != nullptr) { holder_->set_type(type); } @@ -129,27 +129,27 @@ inline void* Tensor::mutable_data(platform::Place place, std::type_index type, holder_->size() < size + offset_) { if (platform::is_cpu_place(place)) { holder_.reset(new PlaceholderImpl( - boost::get(place), size, type, use_pinned)); + boost::get(place), size, type, is_pinned)); } else if (platform::is_gpu_place(place)) { #ifndef PADDLE_WITH_CUDA PADDLE_THROW("'CUDAPlace' is not supported in CPU only device."); } #else holder_.reset(new PlaceholderImpl( - boost::get(place), size, type, use_pinned)); + boost::get(place), size, type, is_pinned)); } #endif offset_ = 0; - use_pinned_ = use_pinned; + is_pinned_ = is_pinned; } return reinterpret_cast(reinterpret_cast(holder_->ptr()) + offset_); } -inline void* Tensor::mutable_data(platform::Place place, bool use_pinned) { +inline void* Tensor::mutable_data(platform::Place place, bool is_pinned) { PADDLE_ENFORCE(this->holder_ != nullptr, "Cannot invoke mutable data if current hold nothing"); - return mutable_data(place, holder_->type(), use_pinned); + return mutable_data(place, holder_->type(), is_pinned); } inline Tensor& Tensor::ShareDataWith(const Tensor& src) { @@ -191,7 +191,7 @@ inline const DDim& Tensor::dims() const { return dims_; } inline int64_t Tensor::numel() const { return product(dims_); } -inline bool Tensor::isPinned() const { return use_pinned_; } +inline bool Tensor::isPinned() const { return is_pinned_; } inline Tensor ReshapeToMatrix(const Tensor& src, int num_col_dims) { Tensor res; diff --git a/paddle/fluid/memory/detail/system_allocator.cc b/paddle/fluid/memory/detail/system_allocator.cc index df9d28ede8..62a75c8196 100644 --- a/paddle/fluid/memory/detail/system_allocator.cc +++ b/paddle/fluid/memory/detail/system_allocator.cc @@ -123,8 +123,9 @@ void* CUDAPinnedAllocator::Alloc(size_t& index, size_t size) { if (size <= 0) return nullptr; void* p; // NOTE: here, we use GpuMaxAllocSize() as the maximum memory size - // of host fallback allocation. Allocates too much would reduce + // of host pinned allocation. Allocates too much would reduce // the amount of memory available to the underlying system for paging. + // Because the memory is in CPU side, other device can access it too. size_t usable = paddle::platform::GpuMaxAllocSize() - fallback_alloc_size_; @@ -149,10 +150,10 @@ void CUDAPinnedAllocator::Free(void* p, size_t size, size_t index) { err = cudaFreeHost(p); // Purposefully allow cudaErrorCudartUnloading, because - // that is returned if you ever call cudaFree after the + // that is returned if you ever call cudaFreeHost after the // driver has already shutdown. This happens only if the // process is terminating, in which case we don't care if - // cudaFree succeeds. + // cudaFreeHost succeeds. if (err != cudaErrorCudartUnloading) { PADDLE_ENFORCE(err, "cudaFreeHost failed in GPUPinnedAllocator::Free."); } diff --git a/paddle/fluid/memory/memory.cc b/paddle/fluid/memory/memory.cc index c5577587aa..f2d5f250bf 100644 --- a/paddle/fluid/memory/memory.cc +++ b/paddle/fluid/memory/memory.cc @@ -39,7 +39,7 @@ BuddyAllocator* GetCPUBuddyAllocator() { template <> void* Alloc(platform::CPUPlace place, size_t size, - bool use_pinned) { + bool is_pinned) { VLOG(10) << "Allocate " << size << " bytes on " << platform::Place(place); void* p = GetCPUBuddyAllocator()->Alloc(size); VLOG(10) << " pointer=" << p; @@ -48,7 +48,7 @@ void* Alloc(platform::CPUPlace place, size_t size, template <> void Free(platform::CPUPlace place, void* p, - bool use_pinned) { + bool is_pinned) { VLOG(10) << "Free pointer=" << p << " on " << platform::Place(place); GetCPUBuddyAllocator()->Free(p); } @@ -115,9 +115,9 @@ size_t Used(platform::CUDAPlace place) { template <> void* Alloc(platform::CUDAPlace place, size_t size, - bool use_pinned) { + bool is_pinned) { void* ptr; - if (use_pinned) { + if (is_pinned) { auto* buddy_allocator = GetCUDAPinnedBuddyAllocator(place.device); ptr = buddy_allocator->Alloc(size); } else { @@ -143,8 +143,8 @@ void* Alloc(platform::CUDAPlace place, size_t size, template <> void Free(platform::CUDAPlace place, void* p, - bool use_pinned) { - if (use_pinned) { + bool is_pinned) { + if (is_pinned) { GetCUDAPinnedBuddyAllocator(place.device)->Free(p); } else { GetGPUBuddyAllocator(place.device)->Free(p); diff --git a/paddle/fluid/memory/memory.h b/paddle/fluid/memory/memory.h index 9bc48ac68f..062bfc880e 100644 --- a/paddle/fluid/memory/memory.h +++ b/paddle/fluid/memory/memory.h @@ -33,7 +33,7 @@ namespace memory { * address is valid or not. */ template -void* Alloc(Place place, size_t size, bool use_pinned = false); +void* Alloc(Place place, size_t size, bool is_pinned = false); /** * \brief Free memory block in one place. @@ -43,7 +43,7 @@ void* Alloc(Place place, size_t size, bool use_pinned = false); * */ template -void Free(Place place, void* ptr, bool use_pinned = false); +void Free(Place place, void* ptr, bool is_pinned = false); /** * \brief Total size of used memory in one place. @@ -74,15 +74,13 @@ class PODDeleter { static_assert(std::is_pod::value, "T must be POD"); public: - explicit PODDeleter(Place place, bool use_pinned = false) - : place_(place), use_pinned_(use_pinned) {} - void operator()(T* ptr) { - Free(place_, static_cast(ptr), use_pinned_); - } + explicit PODDeleter(Place place, bool is_pinned = false) + : place_(place), is_pinned_(is_pinned) {} + void operator()(T* ptr) { Free(place_, static_cast(ptr), is_pinned_); } private: Place place_; - bool use_pinned_; + bool is_pinned_; }; /** From 9e99446e250e071c3d086e0c945374c4498e5aeb Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Mon, 26 Mar 2018 18:19:24 +0800 Subject: [PATCH 16/32] Add note for cudaMallocHost --- paddle/fluid/memory/detail/system_allocator.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/memory/detail/system_allocator.cc b/paddle/fluid/memory/detail/system_allocator.cc index 62a75c8196..71d28dcbad 100644 --- a/paddle/fluid/memory/detail/system_allocator.cc +++ b/paddle/fluid/memory/detail/system_allocator.cc @@ -119,18 +119,20 @@ void GPUAllocator::Free(void* p, size_t size, size_t index) { bool GPUAllocator::UseGpu() const { return true; } +// PINNED memory allows direct DMA transfers by the GPU to and from system +// memory. It’s locked to a physical address. void* CUDAPinnedAllocator::Alloc(size_t& index, size_t size) { if (size <= 0) return nullptr; void* p; // NOTE: here, we use GpuMaxAllocSize() as the maximum memory size // of host pinned allocation. Allocates too much would reduce // the amount of memory available to the underlying system for paging. - // Because the memory is in CPU side, other device can access it too. size_t usable = paddle::platform::GpuMaxAllocSize() - fallback_alloc_size_; if (size > usable) return nullptr; + // PINNED memory is visible to all CUDA contexts. cudaError_t result = cudaMallocHost(&p, size); if (result == cudaSuccess) { index = 1; From f3dc3112cce45bbe30d292ffcc9103105222f05c Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Mon, 26 Mar 2018 20:17:16 +0800 Subject: [PATCH 17/32] add split ids op (#9370) * add split_ids_op * add TestSplitIdsOp * fix comment * add test for empty tensor * clean code * rm unused code --- paddle/fluid/operators/split_ids_op.cc | 76 +++++++++++++++++++ paddle/fluid/operators/split_ids_op.h | 65 ++++++++++++++++ .../tests/unittests/test_split_ids_op.py | 35 +++++++++ 3 files changed, 176 insertions(+) create mode 100644 paddle/fluid/operators/split_ids_op.cc create mode 100644 paddle/fluid/operators/split_ids_op.h create mode 100644 python/paddle/fluid/tests/unittests/test_split_ids_op.py diff --git a/paddle/fluid/operators/split_ids_op.cc b/paddle/fluid/operators/split_ids_op.cc new file mode 100644 index 0000000000..a54f8a2878 --- /dev/null +++ b/paddle/fluid/operators/split_ids_op.cc @@ -0,0 +1,76 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/split_ids_op.h" + +namespace paddle { +namespace operators { + +class SplitIdsOpMaker : public framework::OpProtoAndCheckerMaker { + public: + SplitIdsOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}"); + AddOutput("Out", "(LoDTensor) The outputs of the input Ids.") + .AsDuplicable(); + + AddComment(R"DOC( +Split a LoDTensor of Ids into multi LoDTensors, the number is pserver's number +Example: + Input: + X = [1,2,3,4,5,6] + + Out(3 output): + out0 = [3, 6] + out1 = [1, 4] + out2 = [2, 5] +)DOC"); + } +}; + +class SplitIdsOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(ctx->HasInput("Ids"), "SplitIdsOp must has input Ids."); + PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out."); + + auto ids_var_type = ctx->GetInputsVarType("Ids").front(); + PADDLE_ENFORCE_EQ(ids_var_type, framework::proto::VarType::LOD_TENSOR); + + auto ids_dims = ctx->GetInputDim("Ids"); + PADDLE_ENFORCE_EQ(ids_dims.size(), 2); + PADDLE_ENFORCE_EQ(ids_dims[1], 1); + } +}; + +class SplitIdsOpInferVarType : public framework::VarTypeInference { + public: + void operator()(const framework::OpDesc &op_desc, + framework::BlockDesc *block) const override { + for (auto &out_var : op_desc.Output("Out")) { + block->Var(out_var)->SetType(framework::proto::VarType::LOD_TENSOR); + } + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker, + ops::SplitIdsOpInferVarType); +REGISTER_OP_CPU_KERNEL( + split_ids, ops::SplitIdsOpKernel); diff --git a/paddle/fluid/operators/split_ids_op.h b/paddle/fluid/operators/split_ids_op.h new file mode 100644 index 0000000000..3e750ed2d1 --- /dev/null +++ b/paddle/fluid/operators/split_ids_op.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/math/selected_rows_functor.h" + +namespace paddle { +namespace operators { + +template +class SplitIdsOpKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { + auto place = ctx.GetPlace(); + if (!platform::is_cpu_place(place)) { + PADDLE_THROW("SplitIds do not support GPU kernel"); + } + + const auto* ids_t = ctx.Input("Ids"); + auto& ids_dims = ids_t->dims(); + auto outs = ctx.MultiOutput("Out"); + + const T* ids = ids_t->data(); + + const size_t shard_num = outs.size(); + + std::vector> out_ids; + out_ids.resize(outs.size()); + + // split id by their shard_num. + for (size_t i = 0; i < ids_dims[0]; ++i) { + T id = ids[i]; + size_t shard_id = static_cast(id) % shard_num; + out_ids[shard_id].push_back(id); + } + + // create tensor for each shard and send to parameter server + for (size_t i = 0; i < out_ids.size(); ++i) { + auto* shard_t = outs[i]; + std::vector ids = out_ids[i]; + auto* shard_data = shard_t->mutable_data( + framework::make_ddim({static_cast(ids.size()), 1}), place); + for (size_t i = 0; i < ids.size(); ++i) { + shard_data[i] = ids[i]; + } + } + } +}; + +} // namespace operators +} // namespace paddle diff --git a/python/paddle/fluid/tests/unittests/test_split_ids_op.py b/python/paddle/fluid/tests/unittests/test_split_ids_op.py new file mode 100644 index 0000000000..e9f0a06a56 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_split_ids_op.py @@ -0,0 +1,35 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +from op_test import OpTest + + +class TestSplitIdsOp(OpTest): + def setUp(self): + self.op_type = "split_ids" + ids = np.array([[0], [2], [2], [3], [5], [5], [6]]).astype('int64') + out0 = np.array([[0], [3], [6]]).astype('int64') + out1 = np.array([[]]).astype('int64') + out2 = np.array([[2], [2], [5], [5]]).astype('int64') + self.inputs = {'Ids': ids} + self.outputs = {'Out': [('out0', out0), ('out1', out1), ('out2', out2)]} + + def test_check_output(self): + self.check_output() + + +if __name__ == '__main__': + unittest.main() From 6a97c02e56120893ed0c4ca0dfbd45c1a358935e Mon Sep 17 00:00:00 2001 From: legend06hvl Date: Tue, 27 Mar 2018 02:41:41 +0800 Subject: [PATCH 18/32] Update index_en.rst (#9321) * Update index_en.rst New file * Update index_en.rst Fix refer to suggestions --- doc/v2/dev/index_en.rst | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/doc/v2/dev/index_en.rst b/doc/v2/dev/index_en.rst index 549f5fa9aa..36516b7953 100644 --- a/doc/v2/dev/index_en.rst +++ b/doc/v2/dev/index_en.rst @@ -1,9 +1,27 @@ Development ------------ + +PaddlePaddle adheres to the following three sections of code and document specifications. + + +PaddlePaddle uses git for version control and Docker is used for building and testing environment. The code includes Cuda, C++, Python, Shell and other programming languages,which comply with Google C++ Style, Pep-8, and the code base includes style checking by an automatic inspection tool. Code comments need to follow the Doxygen specification. The code that does not meet the style requirements will fail to compile. We provide the following guidelines for the use of Git, build tests and code development. .. toctree:: :maxdepth: 1 contribute_to_paddle_en.md + + +PaddlePaddle is well documented in English and Chinese. We recommend using the English version of the documents and problem description. The design documents focus on problem descriptions, backgrounds, and are followed by solutions. As documents are generated by Sphinx, code comments should comply with the Sphinx documentation standard. We recommend to use the paddlepaddle.org tool to compile and generate and preview documents locally. Please refer to: + +.. toctree:: + :maxdepth: 1 + write_docs_en.rst + +PaddlePaddle V2 defines new operations by adding new Layers. You can implement various complex layers by combining basic APIs to satisfy most applications. If you want to customize layer, please refer to the following, and welcome to propose patch. + +.. toctree:: + :maxdepth: 1 + new_layer_en.rst From f4925755dbf6c5470a6f0436b80acbdd32cf74b1 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Mon, 26 Mar 2018 16:10:16 -0700 Subject: [PATCH 19/32] fix submit_local's paddle pip name issue --- paddle/scripts/submit_local.sh.in | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/paddle/scripts/submit_local.sh.in b/paddle/scripts/submit_local.sh.in index 80fa0c72af..1283de9d95 100755 --- a/paddle/scripts/submit_local.sh.in +++ b/paddle/scripts/submit_local.sh.in @@ -153,9 +153,15 @@ if [ $? -ne 0 ]; then exit 1 fi -INSTALLED_VERSION=`pip freeze 2>/dev/null | grep '^paddle' | sed 's/.*==//g'` +if [ "@WITH_GPU@" == "ON" ]; then + PADDLE_NAME="paddlepaddle-gpu" +else + PADDLE_NAME="paddlepaddle" +fi + +INSTALLED_VERSION=`pip freeze 2>/dev/null | grep "^${PADDLE_NAME}==" | sed 's/.*==//g'` -if [ -z ${INSTALLED_VERSION} ]; then +if [ -z "${INSTALLED_VERSION}" ]; then INSTALLED_VERSION="0.0.0" # not installed fi cat < Date: Mon, 26 Mar 2018 17:17:40 -0700 Subject: [PATCH 20/32] Create go_op design doc (#9389) * Create go_op design doc --- doc/fluid/design/concurrent/go_op.md | 231 +++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 doc/fluid/design/concurrent/go_op.md diff --git a/doc/fluid/design/concurrent/go_op.md b/doc/fluid/design/concurrent/go_op.md new file mode 100644 index 0000000000..c18b788e80 --- /dev/null +++ b/doc/fluid/design/concurrent/go_op.md @@ -0,0 +1,231 @@ +# go_op Design + +## Introduction + +The **go_op** allows user's of PaddlePaddle to run program blocks on a detached +thread. It works in conjuction with CSP operators (channel_send, +channel_receive, channel_open, channel_close, and select) to allow users to +concurrently process data and communicate easily between different threads. + +## How to use it + +``` +channel = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) + +with fluid.Go(): + # Send a tensor of value 99 to "channel" on a detached thread + tensor = fill_constant(shape=[1], dtype='int', value=99) + tensor.stop_gradient = True + fluid.channel_send(channel, tensor) + +# Receive sent tensor from "channel" on the main thread +result = fill_constant(shape=[1], dtype='int', value=-1) +fluid.channel_recv(ch, result) +``` + +The go operator can be accessed by using the fluid.Go() control flow. This +will create a new sub block, where the user can add additional operators +to be ran on the thread. + +**Note:** Since back propegation is currently not support in the go_op, users +should ensure that operators in the go block does not require gradient +calculations. + +## How it Works + +Similar to other control blocks, go_op will create a sub block and add it +as a child to the current block. Operators and variables defined in this +block will be added to the go sub_block. + +In addition, the go operator will create a new child scope whose parent is +the global scope. Please refer to [block captures](#block-captures) for more +information. + +When Paddle executor runs go_op, go_op will take the sub_block and pass it to +the executor.run method (along with a newly created local scope) on a detached +thread. + +An example of the generated program description is shown below. Take note of +the **go_op** in particular. It is added as an operator in the current +block (in this example, block0). The **go_op** contains a `sub_block` +attribute, which points to the id of the block that will be executed in a +detached thread. + +``` +blocks { + idx: 0 + parent_idx: -1 + vars { + name: "return_value" + type { + type: LOD_TENSOR + lod_tensor { + tensor { + data_type: INT64 + } + } + } + } + vars { + name: "status_recv" + type { + type: LOD_TENSOR + lod_tensor { + tensor { + data_type: BOOL + } + } + } + } + ... + ops { + outputs { + parameter: "Out" + arguments: "channel" + } + type: "channel_create" + attrs { + name: "data_type" + type: INT + i: 7 + } + attrs { + name: "capacity" + type: INT + i: 0 + } + } + ops { + inputs { + parameter: "X" + arguments: "channel" + } + type: "go" + attrs { + name: "sub_block" + type: BLOCK + block_idx: 1 + } + } + ops { + inputs { + parameter: "Channel" + arguments: "channel" + } + outputs { + parameter: "Out" + arguments: "return_value" + } + outputs { + parameter: "Status" + arguments: "status_recv" + } + type: "channel_recv" + } + ... +} + +blocks { + idx: 1 + parent_idx: 0 + vars { + name: "status" + type { + type: LOD_TENSOR + lod_tensor { + tensor { + data_type: BOOL + } + } + } + } + ... + + ops { + outputs { + parameter: "Out" + arguments: "fill_constant_1.tmp_0" + } + type: "fill_constant" + attrs { + name: "force_cpu" + type: BOOLEAN + b: false + } + attrs { + name: "value" + type: FLOAT + f: 99.0 + } + attrs { + name: "shape" + type: INTS + ints: 1 + } + attrs { + name: "dtype" + type: INT + i: 3 + } + } + ops { + inputs { + parameter: "Channel" + arguments: "channel" + } + inputs { + parameter: "X" + arguments: "fill_constant_1.tmp_0" + } + outputs { + parameter: "Status" + arguments: "status" + } + type: "channel_send" + attrs { + name: "copy" + type: BOOLEAN + b: false + } + } +``` + +## Current Limitations + +#### Scopes and block captures: + +Paddle utilizes [scopes](./../concepts/scope.md) to store variables used in a +block. When a block is executed, a new local scope is created from the parent +scope (ie: scope derived from the parent block) and associated with the new +child block. After the block finishes executing, then the local scope and +all associated variables in the scope is deleted. + +This works well in a single threaded scenario, however with introduction of +go_op, a child block may continue to execute even after the parent block has +exited. If the go_op tries to access variables located in the parent block's +scope, it may receive a segmentation fault because the parent scope may have +been deleted. + +We need to implement block closures in order to prevent access to parent +scope variables from causing a segmentation fault. As a temporary workaround, +please ensure that all variables accessed in the go block is not destructed +before it is being accessed. Currently, the go_op will explicitly enforce +this requirement and raise an exception if a variable could not be found in +the scope. + +Please refer to [Closure issue](https://github.com/PaddlePaddle/Paddle/issues/8502) +for more details. + +#### Green Threads + +Golang utilizes `green threads`, which is a mechnism for the runtime library to +manage multiple threads (instead of natively by the OS). Green threads usually +allows for faster thread creation and switching, as there is less overhead +when spawning these threads. For the first version of CSP, we only support +OS threads. + + +#### Backward Propegation: + +go_op currently does not support backwards propagation. Please use go_op with +non training operators. From 65534c47625239ce68b5e5c02ae72c3bb1532214 Mon Sep 17 00:00:00 2001 From: Abhinav Arora Date: Mon, 26 Mar 2018 19:11:54 -0700 Subject: [PATCH 21/32] Fluid channels should match the semantics of Go Channels (#9265) * Fluid Channel should match Go Channel in Semantics * Fix Python channel_send * Address code rveiew feedback * Fix open_files_op.cc * Add description to Channel Asserts --- paddle/fluid/framework/channel.h | 93 +++++++++++-------- paddle/fluid/framework/channel_impl.h | 35 ++++--- paddle/fluid/framework/channel_test.cc | 93 +++++++++++++++---- paddle/fluid/operators/channel_send_op.cc | 25 +---- .../operators/concurrency/channel_util.cc | 14 +-- .../operators/concurrency/channel_util.h | 2 +- .../reader/create_double_buffer_reader_op.cc | 4 +- .../fluid/operators/reader/open_files_op.cc | 9 +- python/paddle/fluid/concurrency.py | 15 +-- 9 files changed, 172 insertions(+), 118 deletions(-) diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index adfaba26ac..019bea600f 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -34,7 +34,7 @@ class Channel { public: virtual bool CanSend() = 0; virtual bool CanReceive() = 0; - virtual bool Send(T*) = 0; + virtual void Send(T*) = 0; virtual bool Receive(T*) = 0; virtual size_t Cap() = 0; virtual void Lock() = 0; @@ -84,69 +84,81 @@ class ChannelHolder { } template - bool Send(T* data) { - if (!IsInitialized()) return false; - PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); + void Send(T* data) { + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + PADDLE_ENFORCE_EQ( + holder_->Type(), std::type_index(typeid(T)), + "Channel type is not same as the type of the data being sent"); // Static cast should be safe because we have ensured that types are same Channel* channel = static_cast*>(holder_->Ptr()); - return channel != nullptr ? channel->Send(data) : false; + PADDLE_ENFORCE_EQ(channel != nullptr, true, "Channel should not be null."); + channel->Send(data); } template bool Receive(T* data) { - if (!IsInitialized()) return false; - PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + PADDLE_ENFORCE_EQ( + holder_->Type(), std::type_index(typeid(T)), + "Channel type is not same as the type of the data being sent"); Channel* channel = static_cast*>(holder_->Ptr()); - return channel != nullptr ? channel->Receive(data) : false; + PADDLE_ENFORCE_EQ(channel != nullptr, true, "Channel should not be null."); + return channel->Receive(data); } bool IsClosed() { - if (IsInitialized()) { - return holder_->IsClosed(); - } - return false; + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + return holder_->IsClosed(); } bool CanSend() { - if (IsInitialized()) { - return holder_->CanSend(); - } - return false; + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + return holder_->CanSend(); } bool CanReceive() { - if (IsInitialized()) { - return holder_->CanReceive(); - } - return false; + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + return holder_->CanReceive(); } void close() { - if (IsInitialized()) holder_->Close(); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + holder_->Close(); } size_t Cap() { - if (IsInitialized()) return holder_->Cap(); - return -1; + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + return holder_->Cap(); } void Lock() { - if (IsInitialized()) holder_->Lock(); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + holder_->Lock(); } void Unlock() { - if (IsInitialized()) holder_->Unlock(); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + holder_->Unlock(); } template void AddToSendQ(const void* referrer, T* data, std::shared_ptr cond, std::function cb) { - if (IsInitialized()) { - Channel* channel = static_cast*>(holder_->Ptr()); - if (channel != nullptr) { - channel->AddToSendQ(referrer, data, cond, cb); - } + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + Channel* channel = static_cast*>(holder_->Ptr()); + if (channel != nullptr) { + channel->AddToSendQ(referrer, data, cond, cb); } } @@ -154,26 +166,31 @@ class ChannelHolder { void AddToReceiveQ(const void* referrer, T* data, std::shared_ptr cond, std::function cb) { - if (IsInitialized()) { - Channel* channel = static_cast*>(holder_->Ptr()); - if (channel != nullptr) { - channel->AddToReceiveQ(referrer, data, cond, cb); - } + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + Channel* channel = static_cast*>(holder_->Ptr()); + if (channel != nullptr) { + channel->AddToReceiveQ(referrer, data, cond, cb); } } void RemoveFromSendQ(const void* referrer) { - if (IsInitialized()) holder_->RemoveFromSendQ(referrer); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + holder_->RemoveFromSendQ(referrer); } void RemoveFromReceiveQ(const void* referrer) { - if (IsInitialized()) holder_->RemoveFromReceiveQ(referrer); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); + holder_->RemoveFromReceiveQ(referrer); } inline bool IsInitialized() const { return holder_ != nullptr; } inline const std::type_index Type() { - PADDLE_ENFORCE_EQ(IsInitialized(), true); + PADDLE_ENFORCE_EQ(IsInitialized(), true, + "The Channel hasn't been initialized"); return holder_->Type(); } diff --git a/paddle/fluid/framework/channel_impl.h b/paddle/fluid/framework/channel_impl.h index 457abbf373..378a0bab1c 100644 --- a/paddle/fluid/framework/channel_impl.h +++ b/paddle/fluid/framework/channel_impl.h @@ -31,7 +31,7 @@ class ChannelImpl : public paddle::framework::Channel { public: virtual bool CanSend(); virtual bool CanReceive(); - virtual bool Send(T *); + virtual void Send(T *); virtual bool Receive(T *); virtual size_t Cap() { return cap_; } virtual void Lock(); @@ -76,10 +76,9 @@ class ChannelImpl : public paddle::framework::Channel { } }; - bool send_return(bool value) { + void send_return() { send_ctr--; destructor_cond_.notify_all(); - return value; } bool recv_return(bool value) { @@ -118,15 +117,15 @@ bool ChannelImpl::CanReceive() { } template -bool ChannelImpl::Send(T *item) { +void ChannelImpl::Send(T *item) { send_ctr++; std::unique_lock lock{mu_}; - // If channel is closed, do nothing + // If channel is closed, throw exception if (closed_) { lock.unlock(); - // TODO(abhinavarora) Should panic on closed channel - return send_return(false); + send_return(); + PADDLE_THROW("Cannot send on closed channel"); } // If there is a receiver, directly pass the value we want @@ -143,7 +142,7 @@ bool ChannelImpl::Send(T *item) { if (m->callback != nullptr) do_send = m->callback(ChannelAction::SEND); if (do_send) *(m->data) = std::move(*item); - else + else { // We cannot do the data transfer because // this QueueMessage was added by Select // and some other case was executed. @@ -151,12 +150,17 @@ bool ChannelImpl::Send(T *item) { // We do not care about notifying other // because they would have been notified // by the executed select case. - return send_return(Send(item)); + lock.unlock(); + Send(item); + send_return(); + return; + } // Wake up the blocked process and unlock m->Notify(); lock.unlock(); - return send_return(true); + send_return(); + return; } // Unbuffered channel will always bypass this @@ -167,7 +171,8 @@ bool ChannelImpl::Send(T *item) { buf_.push_back(std::move(*item)); // Release lock and return true lock.unlock(); - return send_return(true); + send_return(); + return; } // Block on channel, because some receiver will complete @@ -175,8 +180,12 @@ bool ChannelImpl::Send(T *item) { auto m = std::make_shared(item); sendq.push_back(m); m->Wait(lock); - // TODO(abhinavarora) Should panic on closed channel - return send_return(!m->chan_closed); + if (m->chan_closed) { + lock.unlock(); + send_return(); + PADDLE_THROW("Cannot send on closed channel"); + } + send_return(); } template diff --git a/paddle/fluid/framework/channel_test.cc b/paddle/fluid/framework/channel_test.cc index 73be5cdbe2..e2380bb54b 100644 --- a/paddle/fluid/framework/channel_test.cc +++ b/paddle/fluid/framework/channel_test.cc @@ -16,7 +16,6 @@ limitations under the License. */ #include #include - #include "gtest/gtest.h" using paddle::framework::Channel; @@ -41,7 +40,7 @@ void RecevingOrderEqualToSendingOrder(Channel *ch) { unsigned sum_send = 0; std::thread t([&]() { for (int i = 0; i < 5; i++) { - EXPECT_EQ(ch->Send(&i), true); + ch->Send(&i); sum_send += i; } }); @@ -61,7 +60,7 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) { const size_t buffer_size = 10; auto ch = MakeChannel(buffer_size); for (size_t i = 0; i < buffer_size; ++i) { - EXPECT_EQ(ch->Send(&i), true); // should not block + ch->Send(&i); } size_t out; @@ -82,7 +81,7 @@ void SendReceiveWithACloseChannelShouldPanic(Channel *ch) { const size_t data = 5; std::thread send_thread{[&]() { size_t i = data; - EXPECT_EQ(ch->Send(&i), true); // should not block + ch->Send(&i); // should not block }}; std::thread recv_thread{[&]() { @@ -94,12 +93,18 @@ void SendReceiveWithACloseChannelShouldPanic(Channel *ch) { send_thread.join(); recv_thread.join(); - // After closing send should return false. Receive should - // also return false as there is no data in queue. + // After closing send should panic. Receive should + // also false as there is no data in queue. CloseChannel(ch); send_thread = std::thread{[&]() { size_t i = data; - EXPECT_EQ(ch->Send(&i), false); // should return false + bool is_exception = false; + try { + ch->Send(&i); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + EXPECT_EQ(is_exception, true); }}; recv_thread = std::thread{[&]() { size_t i; @@ -129,7 +134,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) { auto ch = MakeChannel(buffer_size); for (size_t i = 0; i < buffer_size; ++i) { - EXPECT_EQ(ch->Send(&i), true); // sending should not block + ch->Send(&i); // sending should not block } size_t out; @@ -160,9 +165,16 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { // Try to write more than buffer size. for (size_t i = 0; i < 2 * buffer_size; ++i) { if (i < buffer_size) - EXPECT_EQ(ch->Send(&i), true); // should block after 10 iterations - else - EXPECT_EQ(ch->Send(&i), false); + ch->Send(&i); // should block after 10 iterations + else { + bool is_exception = false; + try { + ch->Send(&i); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + EXPECT_EQ(is_exception, true); + } } }); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait 0.2 sec @@ -231,7 +243,13 @@ void ChannelCloseUnblocksSendersTest(Channel *ch, bool isBuffered) { t[i] = std::thread( [&](bool *ended, bool *success) { int data = 10; - *success = ch->Send(&data); + bool is_exception = false; + try { + ch->Send(&data); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + *success = !is_exception; *ended = true; }, &thread_ended[i], &send_success[i]); @@ -316,8 +334,11 @@ TEST(Channel, UnbufferedLessReceiveMoreSendTest) { // Try to send more number of times // than receivers for (int i = 0; i < 4; i++) { - ch->Send(&i); - sum_send += i; + try { + ch->Send(&i); + sum_send += i; + } catch (paddle::platform::EnforceNotMet e) { + } } }); for (int i = 0; i < 3; i++) { @@ -382,7 +403,13 @@ void ChannelDestroyUnblockSenders(Channel *ch, bool isBuffered) { t[i] = std::thread( [&](bool *ended, bool *success) { int data = 10; - *success = ch->Send(&data); + bool is_exception = false; + try { + ch->Send(&data); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + *success = !is_exception; *ended = true; }, &thread_ended[i], &send_success[i]); @@ -508,7 +535,7 @@ void ChannelHolderSendReceive(ChannelHolder *ch) { unsigned sum_send = 0; std::thread t([&]() { for (int i = 0; i < 5; i++) { - EXPECT_EQ(ch->Send(&i), true); + ch->Send(&i); sum_send += i; } }); @@ -541,8 +568,22 @@ TEST(ChannelHolder, ChannelUninitializedTest) { ChannelHolder *ch = new ChannelHolder(); EXPECT_EQ(ch->IsInitialized(), false); int i = 10; - EXPECT_EQ(ch->Send(&i), false); - EXPECT_EQ(ch->Receive(&i), false); + bool send_exception = false; + try { + ch->Send(&i); + } catch (paddle::platform::EnforceNotMet e) { + send_exception = true; + } + EXPECT_EQ(send_exception, true); + + bool recv_exception = false; + try { + ch->Receive(&i); + } catch (paddle::platform::EnforceNotMet e) { + recv_exception = true; + } + EXPECT_EQ(recv_exception, true); + bool is_exception = false; try { ch->Type(); @@ -669,7 +710,13 @@ void ChannelHolderCloseUnblocksSendersTest(ChannelHolder *ch, bool isBuffered) { t[i] = std::thread( [&](bool *ended, bool *success) { int data = 10; - *success = ch->Send(&data); + bool is_exception = false; + try { + ch->Send(&data); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + *success = !is_exception; *ended = true; }, &thread_ended[i], &send_success[i]); @@ -760,7 +807,13 @@ void ChannelHolderDestroyUnblockSenders(ChannelHolder *ch, bool isBuffered) { t[i] = std::thread( [&](bool *ended, bool *success) { int data = 10; - *success = ch->Send(&data); + bool is_exception = false; + try { + ch->Send(&data); + } catch (paddle::platform::EnforceNotMet e) { + is_exception = true; + } + *success = !is_exception; *ended = true; }, &thread_ended[i], &send_success[i]); diff --git a/paddle/fluid/operators/channel_send_op.cc b/paddle/fluid/operators/channel_send_op.cc index 47cf7d7efc..66d33617ed 100644 --- a/paddle/fluid/operators/channel_send_op.cc +++ b/paddle/fluid/operators/channel_send_op.cc @@ -23,21 +23,10 @@ limitations under the License. */ static constexpr char Channel[] = "Channel"; static constexpr char X[] = "X"; -static constexpr char Status[] = "Status"; -static constexpr char copy[] = "copy"; namespace paddle { namespace operators { -void SetSendStatus(const platform::Place &dev_place, - framework::Variable &status_var, bool status) { - auto cpu = platform::CPUPlace(); - auto status_tensor = - status_var.GetMutable()->mutable_data({1}, - cpu); - status_tensor[0] = status; -} - class ChannelSendOp : public framework::OperatorBase { public: ChannelSendOp(const std::string &type, @@ -51,9 +40,6 @@ class ChannelSendOp : public framework::OperatorBase { "Input(Channel) of ChannelSendOp should not be null."); PADDLE_ENFORCE(ctx->HasInput(X), "Input(X) of ChannelSendOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutput(Status), - "Output(Status) of ChannelSendOp should not be null."); - ctx->SetOutputDim("Status", {1}); } private: @@ -65,10 +51,7 @@ class ChannelSendOp : public framework::OperatorBase { auto input_var = scope.FindVar(Input(X)); // Send the input data through the channel. - bool ok = concurrency::ChannelSend(ch, input_var); - - // Set the status output of the `ChannelSend` call. - SetSendStatus(dev_place, *scope.FindVar(Output(Status)), ok); + concurrency::ChannelSend(ch, input_var); } }; @@ -82,12 +65,6 @@ class ChannelSendOpMaker : public framework::OpProtoAndCheckerMaker { .AsDuplicable(); AddInput(X, "(Variable) The value which gets sent by the channel.") .AsDuplicable(); - AddOutput(Status, - "(Tensor) An LoD Tensor that returns a boolean status of the" - "result of the send operation.") - .AsDuplicable(); - AddAttr(copy, "(bool, default false) Should copy before send") - .SetDefault(false); AddComment(R"DOC( )DOC"); } diff --git a/paddle/fluid/operators/concurrency/channel_util.cc b/paddle/fluid/operators/concurrency/channel_util.cc index a483af7aff..246c99489c 100644 --- a/paddle/fluid/operators/concurrency/channel_util.cc +++ b/paddle/fluid/operators/concurrency/channel_util.cc @@ -17,20 +17,20 @@ limitations under the License. */ namespace poc = paddle::operators::concurrency; -bool poc::ChannelSend(framework::ChannelHolder *ch, framework::Variable *var) { +void poc::ChannelSend(framework::ChannelHolder *ch, framework::Variable *var) { auto type = framework::ToVarType(var->Type()); if (type == framework::proto::VarType_Type_LOD_TENSOR) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else if (type == framework::proto::VarType_Type_LOD_RANK_TABLE) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else if (type == framework::proto::VarType_Type_LOD_TENSOR_ARRAY) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else if (type == framework::proto::VarType_Type_SELECTED_ROWS) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else if (type == framework::proto::VarType_Type_READER) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else if (type == framework::proto::VarType_Type_CHANNEL) - return ch->Send(var->GetMutable()); + ch->Send(var->GetMutable()); else PADDLE_THROW("ChannelSend:Unsupported type"); } diff --git a/paddle/fluid/operators/concurrency/channel_util.h b/paddle/fluid/operators/concurrency/channel_util.h index c3674bd981..cd18ca78c6 100644 --- a/paddle/fluid/operators/concurrency/channel_util.h +++ b/paddle/fluid/operators/concurrency/channel_util.h @@ -21,7 +21,7 @@ namespace paddle { namespace operators { namespace concurrency { -bool ChannelSend(framework::ChannelHolder *ch, framework::Variable *var); +void ChannelSend(framework::ChannelHolder *ch, framework::Variable *var); bool ChannelReceive(framework::ChannelHolder *ch, framework::Variable *var); void ChannelAddToSendQ(framework::ChannelHolder *ch, const void *referrer, diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 76cdb794cc..141a3eb935 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -166,7 +166,9 @@ void DoubleBufferReader::PrefetchThreadFunc() { std::swap(gpu_batch, batch.payloads_); } - if (!buffer_->Send(&batch)) { + try { + buffer_->Send(&batch); + } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread will terminate."; break; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 414c76fea0..b6ac7b21d5 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -146,14 +146,19 @@ void MultipleReader::PrefetchThreadFunc(std::string file_name, while (reader->HasNext()) { std::vector ins; reader->ReadNext(&ins); - if (!buffer_->Send(&ins)) { + try { + buffer_->Send(&ins); + } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch " "thread of file '" << file_name << "' will terminate."; break; } } - if (!available_thread_idx_->Send(&thread_idx)) { + + try { + available_thread_idx_->Send(&thread_idx); + } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. " "Fail to send thread_idx."; } diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index d65e1a6858..a0f5ef2329 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -339,11 +339,6 @@ def channel_send(channel, value, is_copy=False): main_program = helper.main_program channel_send_block = main_program.current_block() - status = helper.create_variable( - name=unique_name.generate('status'), - type=core.VarDesc.VarType.LOD_TENSOR, - dtype=core.VarDesc.VarType.BOOL) - X = value if is_copy is True: @@ -359,15 +354,11 @@ def channel_send(channel, value, is_copy=False): type="assign_op", inputs={"X": value}, outputs={"Out": copied_X}) X = copied_X - channel_send_op = channel_send_block.append_op( - type="channel_send", - inputs={ + channel_send_block.append_op( + type="channel_send", inputs={ "Channel": channel, "X": X, - }, - outputs={"Status": status}) - - return status + }) def channel_recv(channel, return_value): From c7bf77d0e14ca1ec8caac53badb4f80adb8b02d1 Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Mon, 26 Mar 2018 19:18:21 -0700 Subject: [PATCH 22/32] Add in is_copy attribute to SelectCase. (#9393) This is a temporary solution to allowing for variables to be copied during a channel send operations. Also fixed issue with is_copy for "channel_send" method, and also updated unit tests. --- python/paddle/fluid/concurrency.py | 41 ++++++++++++++----- python/paddle/fluid/tests/test_concurrency.py | 23 ++--------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index a0f5ef2329..470dd0df52 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -82,11 +82,14 @@ class SelectCase(object): RECEIVE = 2 def __init__(self, + select, case_idx, case_to_execute, channel_action_fn=None, channel=None, - value=None): + value=None, + is_copy=False): + self.select = select self.helper = LayerHelper('conditional_block') self.main_program = self.helper.main_program self.is_scalar_condition = True @@ -99,7 +102,24 @@ class SelectCase(object): self.action = (self.SEND if channel_action_fn.__name__ == ('channel_send') else self.RECEIVE) if channel_action_fn else self.DEFAULT - self.value = value + + X = value + if self.action == self.SEND and is_copy: + # We create of copy of the data we want to send + copied_X = self.select.parent_block.create_var( + name=unique_name.generate(value.name + '_copy'), + type=value.type, + dtype=value.dtype, + shape=value.shape, + lod_level=value.lod_level, + capacity=value.capacity + if hasattr(value, 'capacity') else None, ) + + self.select.parent_block.append_op( + type="assign", inputs={"X": value}, outputs={"Out": copied_X}) + X = copied_X + + self.value = X self.channel = channel def __enter__(self): @@ -173,6 +193,7 @@ class SelectCase(object): class Select(BlockGuard): def __init__(self, name=None): self.helper = LayerHelper('select', name=name) + self.parent_block = self.helper.main_program.current_block() self.cases = [] super(Select, self).__init__(self.helper.main_program) @@ -183,12 +204,12 @@ class Select(BlockGuard): super(Select, self).__enter__() return self - def case(self, channel_action_fn, channel, value): + def case(self, channel_action_fn, channel, value, is_copy=False): """Create a new block for this condition. """ - select_case = SelectCase( - len(self.cases), self.case_to_execute, channel_action_fn, channel, - value) + select_case = SelectCase(self, + len(self.cases), self.case_to_execute, + channel_action_fn, channel, value, is_copy) self.cases.append(select_case) @@ -197,7 +218,7 @@ class Select(BlockGuard): def default(self): """Create a default case block for this condition. """ - default_case = SelectCase(len(self.cases), self.case_to_execute) + default_case = SelectCase(self, len(self.cases), self.case_to_execute) self.cases.append(default_case) @@ -341,17 +362,17 @@ def channel_send(channel, value, is_copy=False): X = value - if is_copy is True: + if is_copy: copied_X = helper.create_variable( name=unique_name.generate(value.name + '_copy'), type=value.type, dtype=value.dtype, shape=value.shape, lod_level=value.lod_level, - capacity=value.capacity) + capacity=value.capacity if hasattr(value, 'capacity') else None) assign_op = channel_send_block.append_op( - type="assign_op", inputs={"X": value}, outputs={"Out": copied_X}) + type="assign", inputs={"X": value}, outputs={"Out": copied_X}) X = copied_X channel_send_block.append_op( diff --git a/python/paddle/fluid/tests/test_concurrency.py b/python/paddle/fluid/tests/test_concurrency.py index 924895a9af..e8f6cfb4a9 100644 --- a/python/paddle/fluid/tests/test_concurrency.py +++ b/python/paddle/fluid/tests/test_concurrency.py @@ -173,16 +173,10 @@ class TestRoutineOp(unittest.TestCase): with while_op.block(): result2 = fill_constant( shape=[1], dtype=core.VarDesc.VarType.INT32, value=0) - x_to_send_tmp = fill_constant( - shape=[1], dtype=core.VarDesc.VarType.INT32, value=0) - - # TODO(abhinav): Need to perform copy when doing a channel send. - # Once this is complete, we can remove these lines - assign(input=x, output=x_to_send_tmp) with fluid.Select() as select: - with select.case(fluid.channel_send, channel, - x_to_send_tmp): + with select.case( + fluid.channel_send, channel, x, is_copy=True): assign(input=x, output=x_tmp) assign(input=y, output=x) assign(elementwise_add(x=x_tmp, y=y), output=y) @@ -230,21 +224,12 @@ class TestRoutineOp(unittest.TestCase): core.VarDesc.VarType.LOD_TENSOR, core.VarDesc.VarType.FP64) - pong_result = self._create_tensor('pong_return_value', - core.VarDesc.VarType.LOD_TENSOR, - core.VarDesc.VarType.FP64) - def ping(ch, message): - message_to_send_tmp = fill_constant( - shape=[1], dtype=core.VarDesc.VarType.FP64, value=0) - - assign(input=message, output=message_to_send_tmp) - fluid.channel_send(ch, message_to_send_tmp) + fluid.channel_send(ch, message, is_copy=True) def pong(ch1, ch2): fluid.channel_recv(ch1, ping_result) - assign(input=ping_result, output=pong_result) - fluid.channel_send(ch2, pong_result) + fluid.channel_send(ch2, ping_result, is_copy=True) pings = fluid.make_channel( dtype=core.VarDesc.VarType.LOD_TENSOR, capacity=1) From e0b5691e41f8dd28bdbf8d4ca7140824f918bec8 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Tue, 27 Mar 2018 11:10:53 +0800 Subject: [PATCH 23/32] Add drop_out_op unit test (#9364) --- paddle/fluid/operators/CMakeLists.txt | 1 + paddle/fluid/operators/dropout_op.cu | 5 +- paddle/fluid/operators/dropout_op_test.cc | 96 +++++++++++++++++++++++ 3 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 paddle/fluid/operators/dropout_op_test.cc diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 9a11e1be70..8341170d68 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -264,3 +264,4 @@ cc_test(strided_memcpy_test SRCS strided_memcpy_test.cc DEPS tensor paddle_memor cc_test(save_load_op_test SRCS save_load_op_test.cc DEPS save_op load_op) cc_test(save_load_combine_op_test SRCS save_load_combine_op_test.cc DEPS save_combine_op load_combine_op) nv_test(nccl_op_test SRCS nccl_op_test.cu.cc DEPS nccl_op gpu_info device_context) +nv_test(dropout_op_test SRCS dropout_op_test.cc DEPS dropout_op tensor) diff --git a/paddle/fluid/operators/dropout_op.cu b/paddle/fluid/operators/dropout_op.cu index 94382739b5..184c095e48 100644 --- a/paddle/fluid/operators/dropout_op.cu +++ b/paddle/fluid/operators/dropout_op.cu @@ -55,9 +55,6 @@ class GPUDropoutKernel : public framework::OpKernel { y->mutable_data(context.GetPlace()); float dropout_prob = context.Attr("dropout_prob"); - auto X = EigenMatrix::Reshape(*x, 1); - auto Y = EigenMatrix::Reshape(*y, 1); - auto& place = *context.template device_context().eigen_device(); if (!context.Attr("is_test")) { auto* mask = context.Output("Mask"); @@ -76,6 +73,8 @@ class GPUDropoutKernel : public framework::OpKernel { T><<>>( size, seed, dropout_prob, x_data, mask_data, y_data); } else { + auto X = EigenMatrix::Reshape(*x, 1); + auto Y = EigenMatrix::Reshape(*y, 1); Y.device(place) = X * static_cast(1.0f - dropout_prob); } } diff --git a/paddle/fluid/operators/dropout_op_test.cc b/paddle/fluid/operators/dropout_op_test.cc new file mode 100644 index 0000000000..db97ba4f64 --- /dev/null +++ b/paddle/fluid/operators/dropout_op_test.cc @@ -0,0 +1,96 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/dropout_op.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/string/printf.h" + +namespace f = paddle::framework; +namespace p = paddle::platform; +namespace m = paddle::operators::math; + +USE_OP(dropout); + +void Compare(f::Scope& scope, p::DeviceContext& ctx) { + // init + auto var = scope.Var("X"); + auto tensor = var->GetMutable(); + tensor->Resize({10, 10}); + + std::vector init; + for (int64_t i = 0; i < 10 * 10; ++i) { + init.push_back(1.0); + } + + TensorFromVector(init, ctx, tensor); + + auto place = ctx.GetPlace(); + auto out_var = scope.Var("Out"); + auto out_tensor = out_var->GetMutable(); + out_tensor->Resize({10, 10}); + out_tensor->mutable_data(place); // allocate + + auto mask_var = scope.Var("Mask"); + auto mask_tensor = mask_var->GetMutable(); + mask_tensor->Resize({10, 10}); + mask_tensor->mutable_data(place); // allocate + + // run + f::AttributeMap attrs; + float dropout_prob = 0.5; + attrs.insert({"fix_seed", 1}); + attrs.insert({"seed", 3}); + attrs.insert({"dropout_prob", dropout_prob}); + auto dropout_op = f::OpRegistry::CreateOp( + "dropout", {{"X", {"X"}}}, {{"Out", {"Out"}}, {"Mask", {"Mask"}}}, attrs); + + dropout_op->Run(scope, place); + + std::vector out_vec; + TensorToVector(*out_tensor, ctx, &out_vec); + + std::vector std_out = { + 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, + 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, + 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1, + 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, + 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1}; + + EXPECT_EQ(out_vec.size(), std_out.size()); + for (uint32_t i = 0; i < out_vec.size(); i++) { + EXPECT_EQ(out_vec[i], std_out[i]); + } +} + +TEST(Dropout, CPUDense) { + f::Scope scope; + p::CPUPlace place; + p::CPUDeviceContext ctx(place); + Compare(scope, ctx); +} + +TEST(Dropout, GPUDense) { + f::Scope scope; + p::CUDAPlace place; + p::CUDADeviceContext ctx(place); + Compare(scope, ctx); +} From 123cf165fb031e8e0e9170c17ba59deb95e9dc76 Mon Sep 17 00:00:00 2001 From: qingqing01 Date: Tue, 27 Mar 2018 11:11:24 +0800 Subject: [PATCH 24/32] Set stop_gradient=True for some variables in SSD API. (#9396) --- python/paddle/fluid/layers/detection.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/paddle/fluid/layers/detection.py b/python/paddle/fluid/layers/detection.py index cd519e1ee0..3e649dc5fd 100644 --- a/python/paddle/fluid/layers/detection.py +++ b/python/paddle/fluid/layers/detection.py @@ -134,6 +134,7 @@ def detection_output(loc, scores = nn.softmax(input=scores) scores = ops.reshape(x=scores, shape=old_shape) scores = nn.transpose(scores, perm=[0, 2, 1]) + scores.stop_gradient = True nmsed_outs = helper.create_tmp_variable(dtype=decoded_box.dtype) helper.append_op( type="multiclass_nms", @@ -148,6 +149,7 @@ def detection_output(loc, 'score_threshold': score_threshold, 'nms_eta': 1.0 }) + nmsed_outs.stop_gradient = True return nmsed_outs @@ -837,4 +839,6 @@ def multi_box_head(inputs, mbox_locs_concat = tensor.concat(mbox_locs, axis=1) mbox_confs_concat = tensor.concat(mbox_confs, axis=1) + box.stop_gradient = True + var.stop_gradient = True return mbox_locs_concat, mbox_confs_concat, box, var From 68c199432b67049e39be585979c0af35c9f06c10 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Tue, 27 Mar 2018 12:31:02 +0800 Subject: [PATCH 25/32] fix typo --- doc/v2/faq/cluster/index_en.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/v2/faq/cluster/index_en.rst b/doc/v2/faq/cluster/index_en.rst index 7cbcaeefcb..fa942a0962 100644 --- a/doc/v2/faq/cluster/index_en.rst +++ b/doc/v2/faq/cluster/index_en.rst @@ -4,13 +4,13 @@ Cluster Training and Prediction .. contents:: -1. Network connection errors in the log during muliti-node cluster training +1. Network connection errors in the log during multi-node cluster training ------------------------------------------------ -The errors in the log belong to network connection during mulilti-node cluster training, for example, :code:`Connection reset by peer`. -This kind of error is usually caused by the abnormal exit of the training process in some node, and the others cannot connect with this node any longer. Steps to troubleshoot the problem as follows: +There are maybe some errors in the log belonging to network connection problem during multi-node cluster training, for example, :code:`Connection reset by peer`. +This kind of error is usually caused by the abnormal exit of a training process in some node, and the other nodes cannot connect with this node any longer. Steps to troubleshoot the problem are as follows: * Find the first error in the :code:`train.log`, :code:`server.log`, check whether other fault casued the problem, such as FPE, lacking of memory or disk. -* If network connection gave rise to the first error in the log, this may be caused by the port conflict of the non-exclusive execution. Connect with the operator to check if the current MPI cluster supports jobs submitted with parameter :code:`resource=full`. If so, change the port of job. +* If the first error in server.log says "Address already used", this may be caused by the port conflict of the non-exclusive execution. Connect the sys-admin to check if the current MPI cluster supports jobs submitted with parameter :code:`resource=full`. If the current MPI cluster does not support this parameter, change the server port and try agian. -* If the currnet MPI cluster does not support exclusive pattern, ask the operator to replace or update the current cluster. +* If the current MPI cluster does not support exclusive pattern which allows a process to occupy the whole node, ask the administrator to replace or update the this cluster. From 25317bd312124cb3f26a2248c04215591d4e8446 Mon Sep 17 00:00:00 2001 From: qingqing01 Date: Tue, 27 Mar 2018 16:32:31 +0800 Subject: [PATCH 26/32] Make the first device share data with the global scope in parallel_do_op. (#9398) --- paddle/fluid/operators/parallel_do_op.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/parallel_do_op.cc b/paddle/fluid/operators/parallel_do_op.cc index 4001b9a130..b28c16b13f 100644 --- a/paddle/fluid/operators/parallel_do_op.cc +++ b/paddle/fluid/operators/parallel_do_op.cc @@ -144,7 +144,12 @@ class ParallelDoOp : public framework::OperatorBase { PADDLE_ENFORCE(scope.FindVar(param)->IsType(), "Only support parameter type as LoDTensor"); auto &src = scope.FindVar(param)->Get(); - for (size_t i = 0; i < sub_scopes.size(); ++i) { + + auto *sub_scope0 = sub_scopes[0]; + auto *dst0 = sub_scope0->Var(param)->GetMutable(); + dst0->ShareDataWith(src); + + for (size_t i = 1; i < sub_scopes.size(); ++i) { auto &place = places[i]; auto *sub_scope = sub_scopes[i]; auto *dst = sub_scope->Var(param)->GetMutable(); From 587781153eb21ad69e571d012002dd97b93d9a88 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 27 Mar 2018 20:41:21 +0800 Subject: [PATCH 27/32] fix slr deser --- paddle/fluid/operators/detail/variable_response.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index 12e8eb0b4d..d0f103c455 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -153,6 +153,7 @@ bool VariableResponse::CopySelectRowsData( const platform::DeviceContext& ctx, int length) { auto var = scope_->FindVar(meta_.varname()); auto* slr = var->GetMutable(); + slr->mutable_rows()->resize(length / 8); int64_t* rows_data = slr->mutable_rows()->data(); // copy rows CPU data, GPU data will be copied lazily. From 094d5096899344206892cc2f82b85bfe2bae2bac Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 27 Mar 2018 20:41:33 +0800 Subject: [PATCH 28/32] fix slr deser --- paddle/fluid/operators/detail/variable_response.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index d0f103c455..3787b139a5 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -153,7 +153,7 @@ bool VariableResponse::CopySelectRowsData( const platform::DeviceContext& ctx, int length) { auto var = scope_->FindVar(meta_.varname()); auto* slr = var->GetMutable(); - slr->mutable_rows()->resize(length / 8); + slr->mutable_rows()->resize(length / 8); // int64 int64_t* rows_data = slr->mutable_rows()->data(); // copy rows CPU data, GPU data will be copied lazily. From cc1c6afbbf6df880b2954b61cf1afdc9c368597d Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 27 Mar 2018 23:17:30 +0800 Subject: [PATCH 29/32] fix slr serde --- .../operators/detail/variable_response.cc | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index 3787b139a5..bdda570343 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -48,6 +48,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, void* dest, int size) { const void* data = NULL; int size_to_write = 0; + int length = size; + int total_written = 0; if (platform::is_gpu_place(place)) { #ifdef PADDLE_WITH_CUDA @@ -56,16 +58,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, platform::CPUPlace cpu; char* p = reinterpret_cast(dest); - while (size > 0) { + while (total_written < length) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) { return false; } - + // NOTE: if raw buffer is large and have two neighbor fields of raw + // buffers GetDirectBufferPointer can get all of them, use length to + // truncate it. + if (total_written + size_to_write > length) { + size_to_write = length - total_written; + } memory::Copy(boost::get(place), reinterpret_cast(p), cpu, data, size_to_write, gpu_dev_ctx.stream()); p += size_to_write; - size -= size_to_write; + total_written += size_to_write; input->Skip(size_to_write); } @@ -77,16 +84,21 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, } char* p = reinterpret_cast(dest); - while (size > 0) { + while (total_written < length) { if (!input->GetDirectBufferPointer(&data, &size_to_write)) { return false; } + // NOTE: if raw buffer is large and have two neighbor fields of raw buffers + // GetDirectBufferPointer can get all of them, use length to truncate it. + if (total_written + size_to_write > length) { + size_to_write = length - total_written; + } // TODO(gongwb): can we avoid copy? platform::CPUPlace cpu; memory::Copy(cpu, reinterpret_cast(p), cpu, data, size_to_write); p += size_to_write; - size -= size_to_write; + total_written += size_to_write; input->Skip(size_to_write); } @@ -234,7 +246,6 @@ int VariableResponse::Parse(Source* source) { if (tag != 0) { return -1; } - return 0; } From 54a8c04fab9310ef78f0b000ae411fd7ae706ee7 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 22:09:43 +0000 Subject: [PATCH 30/32] add inplace attr to bn --- python/paddle/fluid/layers/nn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 2db4e5d27d..0332556f62 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -1483,6 +1483,7 @@ def batch_norm(input, param_attr=None, bias_attr=None, data_layout='NCHW', + in_place=False, name=None, moving_mean_name=None, moving_variance_name=None): @@ -1538,7 +1539,7 @@ def batch_norm(input, saved_mean = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) saved_variance = helper.create_tmp_variable(dtype=dtype, stop_gradient=True) - batch_norm_out = helper.create_tmp_variable(dtype) + batch_norm_out = input if in_place else helper.create_tmp_variable(dtype) helper.append_op( type="batch_norm", From f34f2d40267ce7334af6092242c7eef83e3f33aa Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 27 Mar 2018 22:10:32 +0000 Subject: [PATCH 31/32] make bn inplace in img_conv_group by default --- python/paddle/fluid/nets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/nets.py b/python/paddle/fluid/nets.py index 3b2e1a3073..bbedf6fde0 100644 --- a/python/paddle/fluid/nets.py +++ b/python/paddle/fluid/nets.py @@ -98,7 +98,7 @@ def img_conv_group(input, use_mkldnn=use_mkldnn) if conv_with_batchnorm[i]: - tmp = layers.batch_norm(input=tmp, act=conv_act) + tmp = layers.batch_norm(input=tmp, act=conv_act, in_place=True) drop_rate = conv_batchnorm_drop_rate[i] if abs(drop_rate) > 1e-5: tmp = layers.dropout(x=tmp, dropout_prob=drop_rate) From f707a83c80311f792aac594f3f401743d90cd687 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 28 Mar 2018 17:09:42 +0800 Subject: [PATCH 32/32] Add link --- doc/design/parallel_executor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/design/parallel_executor.md b/doc/design/parallel_executor.md index 076c55d281..9aed3b059a 100644 --- a/doc/design/parallel_executor.md +++ b/doc/design/parallel_executor.md @@ -8,7 +8,7 @@ The executor is a very naive interpreter. It runs operators one by one. We can u We want a `ProgramDesc` can be run on different nodes. It is better not to contain device information in `ProgramDesc`. However, we can write a high-performance interpreter, which can hold an alternative intermediate representation of `ProgramDesc`, to take full usage of Multi-GPUs. -ParallelExecutor is an interpreter of `ProgramDesc` which will [out-of-order execute](Out-of-order execution) `Program` in data parallelism mode and maximise the utility of Multi-GPUs. +ParallelExecutor is an interpreter of `ProgramDesc` which will [out-of-order execute](https://en.wikipedia.org/wiki/Out-of-order_execution) `Program` in data parallelism mode and maximise the utility of Multi-GPUs. ## Overview of MultiGPUs logic