diff --git a/cmake/external/boost.cmake b/cmake/external/boost.cmake index 137f11da7f..c70d83b3f4 100644 --- a/cmake/external/boost.cmake +++ b/cmake/external/boost.cmake @@ -15,9 +15,9 @@ include(ExternalProject) set(BOOST_PROJECT "extern_boost") -set(BOOST_VER "1.66.0") -set(BOOST_TAR "boost_1_66_0") -set(BOOST_URL "https://dl.bintray.com/boostorg/release/${BOOST_VER}/source/${BOOST_TAR}.tar.gz") +set(BOOST_VER "1.41.0") +set(BOOST_TAR "boost_1_41_0") +set(BOOST_URL "http://sourceforge.net/projects/boost/files/boost/${BOOST_VER}/${BOOST_TAR}.tar.gz") set(BOOST_SOURCES_DIR ${THIRD_PARTY_PATH}/boost) set(BOOST_DOWNLOAD_DIR "${BOOST_SOURCES_DIR}/src/${BOOST_PROJECT}") set(BOOST_INCLUDE_DIR "${BOOST_DOWNLOAD_DIR}/${BOOST_TAR}" CACHE PATH "boost include directory." FORCE) diff --git a/doc/design/support_new_device.md b/doc/design/support_new_device.md index 4c5f10e2ec..b154f01d32 100644 --- a/doc/design/support_new_device.md +++ b/doc/design/support_new_device.md @@ -2,9 +2,9 @@ ## Background -Deep learning has a high demand for computing resources. New high-performance devices and computing libraries are appearing very frequently. Deep learning frameworks have to integrate these high-performance devices and computing libraries flexibly and efficiently. +Deep learning has a high demand for computing resources. New high-performance devices and computing libraries are appearing very frequently. Deep learning frameworks have to integrate these high-performance devices and computing libraries in a flexible and efficient manner. -On one hand, hardware and computing libraries usually do not have a one-to-one correspondence. For example,Intel CPUs support Eigen and MKL computing libraries while Nvidia GPUs support Eigen and cuDNN computing libraries. We have to implement operator specific kernels for each computing library. +On one hand, hardware and computing libraries usually do not have a one-to-one correspondence. For example, Intel CPUs support Eigen and MKL computing libraries while Nvidia GPUs support Eigen and cuDNN computing libraries. We have to implement operator specific kernels for each computing library. On the other hand, users usually do not want to care about the low-level hardware and computing libraries when writing a neural network configuration. In Fluid, `Layer` is exposed in `Python`, and `Operator` is exposed in `C++`. Both `Layer` and `Operator` are hardware independent. @@ -17,7 +17,7 @@ For a general overview of fluid, please refer to the [overview doc](https://gith There are mainly three parts that we have to consider while integrating a new device/library: -- Place and DeviceContext: indicates the device id and manages hardware resources +- Place and DeviceContext: indicate the device id and manage hardware resources - Memory and Tensor: malloc/free data on certain device @@ -25,10 +25,10 @@ There are mainly three parts that we have to consider while integrating a new de ### Place and DeviceContext -Please remind that device and computing library are not one-to-one corresponding. A device can have a lot of computing libraries and a computing library can also support several devices. +Please note that device and computing library are not one-to-one corresponding. A device can have a lot of computing libraries and a computing library can also support several devices. #### Place -Fluid uses class [Place](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/platform/place.h#L55) to represent the device memory where data is located. If we add another device, we have to add corresponding `DevicePlace`. +Fluid uses class [Place](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/platform/place.h#L55) to represent the device memory where data is located. If we add another device, we have to add the corresponding `DevicePlace`. ``` | CPUPlace @@ -144,7 +144,7 @@ class Tensor { }; ``` -`Placeholder` is used to delay memory allocation; that is, we can first define a tensor, using `Resize` to configure its shape, and then call `mutuable_data` to allocate the actual memory. +`Placeholder` is used to delay memory allocation; that is, we can first define a tensor, using `Resize` to configurate its shape, and then call `mutuable_data` to allocate the actual memory. ```cpp paddle::framework::Tensor t; @@ -163,7 +163,7 @@ Fluid implements computing units based on different DeviceContexts. Some computi Let's take [MaxOutFunctor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/math/maxouting.h#L27) as an example: -The interface is defined in header file. +The interface is defined in the header file. ``` template @@ -203,7 +203,7 @@ class MaxOutFunctor { ``` -We get computing handle from a concrete DeviceContext, and make compution on tensors. +We first obtain the computing handle from a concrete DeviceContext, and then compute on tensors. The implemention of `OpKernel` is similar to math functors, the extra thing we need to do is to register the OpKernel in a global map. @@ -231,7 +231,7 @@ REGISTER_OP_CUDA_KERNEL( ## Advanced topics: How to switch between different Device/Library -Generally, we will impelement OpKernel for all Device/Library of an Operator. We can easily train a Convolutional Neural Network in GPU. However, some OpKernel is not sutibale on a specific Device. For example, crf operator can only run on CPU, whereas most other operators can run at GPU. To achieve high performance in such circumstance, we have to switch between different Device/Library. +Generally, we will implement OpKernel for all Device/Library of an Operator. We can easily train a Convolutional Neural Network in GPU. However, some OpKernel is not suitable on a specific Device. For example, crf operator can only run on CPU, whereas most other operators can run on GPU. To achieve high performance in such circumstance, we have to switch between different Device/Library. For more details, please refer to following docs: diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index 8d9260811a..2804969842 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -74,8 +74,10 @@ cc_library(backward SRCS backward.cc DEPS net_op) cc_test(backward_test SRCS backward_test.cc DEPS backward recurrent_op device_context fill_constant_op) cc_library(lod_rank_table SRCS lod_rank_table.cc DEPS lod_tensor) +cc_library(feed_fetch_method SRCS feed_fetch_method.cc DEPS lod_tensor scope glog) + cc_library(executor SRCS executor.cc DEPS op_registry device_context scope -framework_proto backward glog lod_rank_table profiler) +framework_proto backward glog lod_rank_table profiler feed_fetch_method) cc_library(prune SRCS prune.cc DEPS framework_proto) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/framework/channel.h b/paddle/framework/channel.h new file mode 100644 index 0000000000..9ba0fc5c55 --- /dev/null +++ b/paddle/framework/channel.h @@ -0,0 +1,87 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include +#include +#include + +namespace paddle { +namespace framework { + +template +class Channel { + public: + explicit Channel(std::size_t capacity) : capacity_(capacity) {} + + void Send(T* channel_element) { + std::unique_lock lock(mu_); + + if (IsBounded()) { + full_cond_var_.wait(lock, [this]() { + bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true; + return capacity_valid; + }); + } + channel_.push_back(std::move(*channel_element)); + + lock.unlock(); + empty_cond_var_.notify_one(); + } + + T* Receive() { + std::unique_lock lock(mu_); + empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); }); + + T* channel_element = std::move(channel_.front()); + channel_.pop_front(); + + NotifyAllSenders(&lock); + return channel_element; + } + + size_t Size() { + std::unique_lock lock(mu_); + return channel_.size(); + } + + void Clear() { + std::unique_lock lock(mu_); + channel_.clear(); + + NotifyAllSenders(&lock); + } + + private: + std::size_t capacity_; + std::mutex mu_; + std::condition_variable empty_cond_var_; + std::condition_variable full_cond_var_; + std::deque channel_; + + private: + void NotifyAllSenders(std::unique_lock* lock) { + if (IsBounded()) { + lock->unlock(); + full_cond_var_.notify_one(); + } + } + + bool IsBounded() const { return capacity_ > 0; } + + bool IsCapacityFull() const { return channel_.size() >= capacity_; } +}; + +} // namespace operator +} // namespace paddle diff --git a/paddle/framework/data_type.h b/paddle/framework/data_type.h index 6a372ac32e..98eb3e857d 100644 --- a/paddle/framework/data_type.h +++ b/paddle/framework/data_type.h @@ -79,5 +79,33 @@ inline void VisitDataType(proto::DataType type, Visitor visitor) { } } +inline std::string DataTypeToString(const proto::DataType type) { + using namespace paddle::framework::proto; + switch (type) { + case DataType::FP16: + return "float16"; + case DataType::FP32: + return "float32"; + case DataType::FP64: + return "float64"; + case DataType::INT16: + return "int16"; + case DataType::INT32: + return "int32"; + case DataType::INT64: + return "int64"; + case DataType::BOOL: + return "bool"; + default: + PADDLE_THROW("Not support type %d", type); + } +} + +inline std::ostream& operator<<(std::ostream& out, + const proto::DataType& type) { + out << DataTypeToString(type); + return out; +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/executor.cc b/paddle/framework/executor.cc index c28ffefdd0..cbf3ec7526 100644 --- a/paddle/framework/executor.cc +++ b/paddle/framework/executor.cc @@ -17,6 +17,7 @@ limitations under the License. */ #include #include "gflags/gflags.h" +#include "paddle/framework/feed_fetch_method.h" #include "paddle/framework/feed_fetch_type.h" #include "paddle/framework/lod_rank_table.h" #include "paddle/framework/lod_tensor_array.h" @@ -149,5 +150,164 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, } } +// Check whether the block already has feed operators and feed_holder. +// Return false if the block does not have any feed operators. +// If some feed operators have been prepended to the block, check that +// the info contained in these feed operators matches the feed_targets +// and feed_holder_name. Raise exception when any mismatch is found. +// Return true if the block has feed operators and holder of matching info. +static bool has_feed_operators( + BlockDesc* block, std::map& feed_targets, + const std::string& feed_holder_name) { + size_t feed_count = 0; + for (auto* op : block->AllOps()) { + if (op->Type() == kFeedOpType) { + feed_count++; + PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name, + "Input to feed op should be '%s'", feed_holder_name); + std::string feed_target_name = op->Output("Out")[0]; + PADDLE_ENFORCE( + feed_targets.find(feed_target_name) != feed_targets.end(), + "Feed operator output name '%s' cannot be found in 'feed_targets'", + feed_target_name); + } + } + + if (feed_count > 0) { + PADDLE_ENFORCE_EQ( + feed_count, feed_targets.size(), + "The number of feed operators should match 'feed_targets'"); + + // When feed operator are present, so should be feed_holder + auto var = block->FindVar(feed_holder_name); + PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", + feed_holder_name); + PADDLE_ENFORCE_EQ(var->GetType(), proto::VarDesc::FEED_MINIBATCH, + "'%s' variable should be 'FEED_MINIBATCH' type", + feed_holder_name); + } + + return feed_count > 0; +} + +// Check whether the block already has fetch operators and fetch_holder. +// Return false if the block does not have any fetch operators. +// If some fetch operators have been appended to the block, check that +// the info contained in these fetch operators matches the fetch_targets +// and fetch_holder_name. Raise exception when any mismatch is found. +// Return true if the block has fetch operators and holder of matching info. +static bool has_fetch_operators( + BlockDesc* block, std::map& fetch_targets, + const std::string& fetch_holder_name) { + size_t fetch_count = 0; + for (auto* op : block->AllOps()) { + if (op->Type() == kFetchOpType) { + fetch_count++; + PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name, + "Output of fetch op should be '%s'", fetch_holder_name); + std::string fetch_target_name = op->Input("X")[0]; + PADDLE_ENFORCE( + fetch_targets.find(fetch_target_name) != fetch_targets.end(), + "Fetch operator input name '%s' cannot be found in 'fetch_targets'", + fetch_target_name); + } + } + + if (fetch_count > 0) { + PADDLE_ENFORCE_EQ( + fetch_count, fetch_targets.size(), + "The number of fetch operators should match 'fetch_targets'"); + + // When fetch operator are present, so should be fetch_holder + auto var = block->FindVar(fetch_holder_name); + PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", + fetch_holder_name); + PADDLE_ENFORCE_EQ(var->GetType(), proto::VarDesc::FETCH_LIST, + "'%s' variable should be 'FETCH_LIST' type", + fetch_holder_name); + } + + return fetch_count > 0; +} + +void Executor::Run(const ProgramDesc& program, Scope* scope, + std::map& feed_targets, + std::map& fetch_targets, + const std::string& feed_holder_name, + const std::string& fetch_holder_name) { + auto* copy_program = new ProgramDesc(program); + auto* global_block = copy_program->MutableBlock(0); + + if (!has_feed_operators(global_block, feed_targets, feed_holder_name)) { + // create feed_holder variable + auto* feed_holder = global_block->Var(feed_holder_name); + feed_holder->SetType(proto::VarDesc::FEED_MINIBATCH); + feed_holder->SetPersistable(true); + + int i = 0; + for (auto& feed_target : feed_targets) { + std::string var_name = feed_target.first; + VLOG(3) << "feed target's name: " << var_name; + + // prepend feed op + auto* op = global_block->PrependOp(); + op->SetType(kFeedOpType); + op->SetInput("X", {feed_holder_name}); + op->SetOutput("Out", {var_name}); + op->SetAttr("col", {static_cast(i)}); + op->CheckAttrs(); + + i++; + } + } + + // map the data of feed_targets to feed_holder + for (auto* op : global_block->AllOps()) { + if (op->Type() == kFeedOpType) { + std::string feed_target_name = op->Output("Out")[0]; + int idx = boost::get(op->GetAttr("col")); + SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name, + idx); + } + } + + if (!has_fetch_operators(global_block, fetch_targets, fetch_holder_name)) { + // create fetch_holder variable + auto* fetch_holder = global_block->Var(fetch_holder_name); + fetch_holder->SetType(proto::VarDesc::FETCH_LIST); + fetch_holder->SetPersistable(true); + + int i = 0; + for (auto& fetch_target : fetch_targets) { + std::string var_name = fetch_target.first; + VLOG(3) << "fetch target's name: " << var_name; + + // append fetch op + auto* op = global_block->AppendOp(); + op->SetType(kFetchOpType); + op->SetInput("X", {var_name}); + op->SetOutput("Out", {fetch_holder_name}); + op->SetAttr("col", {static_cast(i)}); + op->CheckAttrs(); + + i++; + } + } + + Run(*copy_program, scope, 0, true, true); + + // obtain the data of fetch_targets from fetch_holder + for (auto* op : global_block->AllOps()) { + if (op->Type() == kFetchOpType) { + std::string fetch_target_name = op->Input("X")[0]; + int idx = boost::get(op->GetAttr("col")); + *fetch_targets[fetch_target_name] = + GetFetchVariable(*scope, fetch_holder_name, idx); + } + } + + delete copy_program; +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/executor.h b/paddle/framework/executor.h index d869e18901..035ff48a52 100644 --- a/paddle/framework/executor.h +++ b/paddle/framework/executor.h @@ -41,6 +41,12 @@ class Executor { void Run(const ProgramDesc&, Scope*, int, bool create_local_scope = true, bool create_vars = true); + void Run(const ProgramDesc& program, Scope* scope, + std::map& feed_targets, + std::map& fetch_targets, + const std::string& feed_holder_name = "feed", + const std::string& fetch_holder_name = "fetch"); + private: const platform::Place place_; }; diff --git a/paddle/framework/feed_fetch_method.cc b/paddle/framework/feed_fetch_method.cc new file mode 100644 index 0000000000..21201b6755 --- /dev/null +++ b/paddle/framework/feed_fetch_method.cc @@ -0,0 +1,56 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/framework/feed_fetch_method.h" +#include "glog/logging.h" +#include "paddle/framework/variable.h" + +namespace paddle { +namespace framework { + +void SetFeedVariable(Scope* scope, const LoDTensor& input, + const std::string& var_name, size_t index) { + // If var_name Variable is not found in GlobalScope, a new variable will + // be created. + VLOG(3) << "SetFeedVariable name=" << var_name << " index=" << index; + Variable* g_feed_value = scope->Var(var_name); + auto& feed_inputs = + *(g_feed_value->GetMutable>()); + if (index >= feed_inputs.size()) { + feed_inputs.resize(index + 1); + } + // shared data with input tensor + feed_inputs[index].ShareDataWith(input); + // set lod + feed_inputs[index].set_lod(input.lod()); +} + +LoDTensor& GetFetchVariable(const Scope& scope, const std::string& var_name, + size_t index) { + // Since we want to fetch LodTensor from a variable, the variable must + // be created alreadly. + Variable* g_fetch_value = scope.FindVar(var_name); + PADDLE_ENFORCE(g_fetch_value->IsType(), + "Only %s can be invoked by GetFetchVariable", + typeid(FeedFetchList).name()); + auto& fetch_outputs = *g_fetch_value->GetMutable(); + auto& tensor = fetch_outputs[index]; + VLOG(3) << "Fetch " << var_name << " with index " << index + << " shape= " << tensor.dims(); + PADDLE_ENFORCE_LT(index, fetch_outputs.size()); + return tensor; +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/framework/feed_fetch_method.h b/paddle/framework/feed_fetch_method.h index 7feacb1e24..b71945fcc8 100644 --- a/paddle/framework/feed_fetch_method.h +++ b/paddle/framework/feed_fetch_method.h @@ -13,46 +13,18 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once -#include "glog/logging.h" + #include "paddle/framework/feed_fetch_type.h" #include "paddle/framework/scope.h" -#include "paddle/framework/variable.h" namespace paddle { namespace framework { void SetFeedVariable(Scope* scope, const LoDTensor& input, - const std::string& var_name, size_t index) { - // If var_name Variable is not found in GlobalScope, a new variable will - // be created. - VLOG(3) << "SetFeedVariable name=" << var_name << " index=" << index; - Variable* g_feed_value = scope->Var(var_name); - auto& feed_inputs = - *(g_feed_value->GetMutable>()); - if (index >= feed_inputs.size()) { - feed_inputs.resize(index + 1); - } - // shared data with input tensor - feed_inputs[index].ShareDataWith(input); - // set lod - feed_inputs[index].set_lod(input.lod()); -} + const std::string& var_name, size_t index); LoDTensor& GetFetchVariable(const Scope& scope, const std::string& var_name, - size_t index) { - // Since we want to fetch LodTensor from a variable, the variable must - // be created alreadly. - Variable* g_fetch_value = scope.FindVar(var_name); - PADDLE_ENFORCE(g_fetch_value->IsType(), - "Only %s can be invoked by GetFetchVariable", - typeid(FeedFetchList).name()); - auto& fetch_outputs = *g_fetch_value->GetMutable(); - auto& tensor = fetch_outputs[index]; - VLOG(3) << "Fetch " << var_name << " with index " << index - << " shape= " << tensor.dims(); - PADDLE_ENFORCE_LT(index, fetch_outputs.size()); - return tensor; -} + size_t index); } // namespace framework } // namespace paddle diff --git a/paddle/framework/op_kernel_type_test.cc b/paddle/framework/op_kernel_type_test.cc index 649afeee8a..cb23bbde01 100644 --- a/paddle/framework/op_kernel_type_test.cc +++ b/paddle/framework/op_kernel_type_test.cc @@ -26,9 +26,9 @@ TEST(OpKernelType, ToString) { OpKernelType op_kernel_type(DataType::FP32, CPUPlace(), DataLayout::kNCHW, LibraryType::kCUDNN); - ASSERT_EQ( - paddle::framework::KernelTypeToString(op_kernel_type), - "data_type[5]:data_layout[NCHW]:place[CPUPlace]:library_type[CUDNN]"); + ASSERT_EQ(paddle::framework::KernelTypeToString(op_kernel_type), + "data_type[float32]:data_layout[NCHW]:place[CPUPlace]:library_type[" + "CUDNN]"); } TEST(OpKernelType, Hash) { diff --git a/paddle/framework/threadpool.cc b/paddle/framework/threadpool.cc index 109a7e7dc4..b2f5ae4a96 100644 --- a/paddle/framework/threadpool.cc +++ b/paddle/framework/threadpool.cc @@ -1,24 +1,93 @@ /* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ #include "paddle/framework/threadpool.h" namespace paddle { namespace framework { -std::unique_ptr ThreadPool::threadpool(nullptr); -std::once_flag ThreadPool::init_flag; +std::unique_ptr ThreadPool::threadpool_(nullptr); +std::once_flag ThreadPool::init_flag_; + +ThreadPool* ThreadPool::GetInstance() { + std::call_once(init_flag_, &ThreadPool::Init); + return threadpool_.get(); +} + +void ThreadPool::Init() { + if (threadpool_.get() == nullptr) { + // TODO(Yancey1989): specify the max threads number + int num_threads = std::thread::hardware_concurrency(); + PADDLE_ENFORCE_GT(num_threads, 0); + threadpool_.reset(new ThreadPool(num_threads)); + } +} + +ThreadPool::ThreadPool(int num_threads) + : total_threads_(num_threads), idle_threads_(num_threads), running_(true) { + threads_.resize(num_threads); + for (auto& thread : threads_) { + // TODO(Yancey1989): binding the thread on the specify CPU number + thread.reset(new std::thread(std::bind(&ThreadPool::TaskLoop, this))); + } +} + +ThreadPool::~ThreadPool() { + { + // notify all threads to stop running + running_ = false; + scheduled_.notify_all(); + } + + for (auto& t : threads_) { + t->join(); + t.reset(nullptr); + } +} + +void ThreadPool::Wait() { + std::unique_lock lock(mutex_); + completed_.wait(lock, [=] { return Done() == true; }); +} + +void ThreadPool::TaskLoop() { + while (running_) { + std::unique_lock lock(mutex_); + scheduled_.wait(lock, [=] { return !tasks_.empty() || !running_; }); + + if (!running_) { + break; + } + // pop a task from the task queue + auto task = std::move(tasks_.front()); + tasks_.pop(); + + --idle_threads_; + lock.unlock(); + + // run the task + task(); + + { + std::unique_lock lock(mutex_); + ++idle_threads_; + if (Done()) { + completed_.notify_all(); + } + } + } +} } // namespace framework } // namespace paddle diff --git a/paddle/framework/threadpool.h b/paddle/framework/threadpool.h index 3ac345851c..8912b1a43a 100644 --- a/paddle/framework/threadpool.h +++ b/paddle/framework/threadpool.h @@ -20,52 +20,36 @@ limitations under the License. */ #include #include #include +#include #include "paddle/platform/enforce.h" namespace paddle { namespace framework { +// ThreadPool maintains a queue of tasks, and runs them using a fixed +// number of threads. class ThreadPool { public: typedef std::packaged_task Task; - /** - * @brief Get a instance of threadpool, the thread number will - * be specified as the number of hardware thread contexts - */ - static ThreadPool* GetInstance() { - std::call_once(init_flag, &ThreadPool::Init); - return threadpool.get(); - } + // Returns the singleton of ThreadPool. + static ThreadPool* GetInstance(); - ~ThreadPool() { - { - // notify all threads to stop running - running_ = false; - scheduled_.notify_all(); - } - - for (auto& t : threads_) { - t->join(); - t.reset(nullptr); - } - } + ~ThreadPool(); - int GetNumThreads() const { return num_threads_; } + // Returns the number of threads created by the constructor. + size_t Threads() const { return total_threads_; } - int GetAvailable() { + // Returns the number of currently idle threads. + size_t IdleThreads() { std::unique_lock lock(mutex_); - return available_; + return idle_threads_; } - /** - * @brief Push a function to the queue, and will be scheduled and - * executed if a thread is available. - * @param[in] Task, will be pushed to the task queue. - * @return std::future, we could wait for the task finished by - * f.wait(). - */ + // Run pushes a function to the task queue and returns a std::future + // object. To wait for the completion of the task, call + // std::future::wait(). template std::future Run(Callback fn) { std::unique_lock lock(mutex_); @@ -77,84 +61,40 @@ class ThreadPool { return f; } - /** - * @brief Wait until all the tasks are completed. - */ - void Wait() { - std::unique_lock lock(mutex_); - completed_.wait(lock, [=] { return Done() == true; }); - } + // Wait until all the tasks are completed. + void Wait(); private: DISABLE_COPY_AND_ASSIGN(ThreadPool); - explicit ThreadPool(int num_threads) - : num_threads_(num_threads), available_(num_threads), running_(true) { - threads_.resize(num_threads); - for (auto& thread : threads_) { - // TODO(Yancey1989): binding the thread on the specify CPU number - thread.reset(new std::thread(std::bind(&ThreadPool::TaskLoop, this))); - } - } + explicit ThreadPool(int num_threads); - /** - * @brief If the task queue is empty and avaialbe - * is equal to the number of threads, means that - * all tasks are completed. - * - * Note: this function is not thread-safe. - * - * @return true if all tasks are completed. - */ - bool Done() { return tasks_.empty() && available_ == num_threads_; } - - void TaskLoop() { - while (running_) { - std::unique_lock lock(mutex_); - scheduled_.wait(lock, [=] { return !tasks_.empty() || !running_; }); - - if (!running_) { - break; - } - // pop a task from the task queue - auto task = std::move(tasks_.front()); - tasks_.pop(); - - --available_; - lock.unlock(); - - // run the task - task(); - - { - std::unique_lock lock(mutex_); - ++available_; - if (Done()) { - completed_.notify_all(); - } - } - } - } + // If the task queue is empty and avaialbe is equal to the number of + // threads, means that all tasks are completed. Note: this function + // is not thread-safe. Returns true if all tasks are completed. + // Note: don't delete the data member total_threads_ and use + // threads_.size() instead; because you'd need to lock the mutex + // before accessing threads_. + bool Done() { return tasks_.empty() && idle_threads_ == total_threads_; } - static void Init() { - if (threadpool.get() == nullptr) { - // TODO(Yancey1989): specify the max threads number - int num_threads = std::thread::hardware_concurrency(); - PADDLE_ENFORCE_GT(num_threads, 0); - threadpool.reset(new ThreadPool(num_threads)); - } - } + // The constructor starts threads to run TaskLoop, which retrieves + // and runs tasks from the queue. + void TaskLoop(); + + // Init is called by GetInstance. + static void Init(); private: - static std::unique_ptr threadpool; - static std::once_flag init_flag; + static std::unique_ptr threadpool_; + static std::once_flag init_flag_; - int num_threads_; - int available_; - bool running_; - std::queue tasks_; std::vector> threads_; + const size_t total_threads_; + size_t idle_threads_; + + std::queue tasks_; std::mutex mutex_; + bool running_; std::condition_variable scheduled_; std::condition_variable completed_; }; diff --git a/paddle/framework/threadpool_test.cc b/paddle/framework/threadpool_test.cc index 50b6238cd8..3fbfe7efc8 100644 --- a/paddle/framework/threadpool_test.cc +++ b/paddle/framework/threadpool_test.cc @@ -22,11 +22,7 @@ namespace framework = paddle::framework; void do_sum(framework::ThreadPool* pool, std::atomic& sum, int cnt) { std::vector> fs; for (int i = 0; i < cnt; ++i) { - auto f = pool->Run([&sum]() { sum.fetch_add(1); }); - fs.push_back(std::move(f)); - } - for (auto& f : fs) { - f.wait(); + fs.push_back(framework::Async([&sum]() { sum.fetch_add(1); })); } } diff --git a/paddle/inference/inference.cc b/paddle/inference/inference.cc index 09268ffb3a..b43c359ed1 100644 --- a/paddle/inference/inference.cc +++ b/paddle/inference/inference.cc @@ -15,7 +15,6 @@ limitations under the License. */ #include "inference.h" #include #include "paddle/framework/executor.h" -#include "paddle/framework/feed_fetch_method.h" #include "paddle/framework/init.h" #include "paddle/framework/scope.h" @@ -154,7 +153,7 @@ void InferenceEngine::Execute(const std::vector& feeds, LOG(FATAL) << "Please initialize the program_ and load_program_ first."; } - if (feeds.size() < feed_var_names_.size()) { + if (feeds.size() != feed_var_names_.size()) { LOG(FATAL) << "Please feed " << feed_var_names_.size() << " input Tensors."; } @@ -165,19 +164,22 @@ void InferenceEngine::Execute(const std::vector& feeds, executor->Run(*load_program_, scope, 0, true, true); + std::map feed_targets; + std::map fetch_targets; + // set_feed_variable for (size_t i = 0; i < feed_var_names_.size(); ++i) { - framework::SetFeedVariable(scope, feeds[i], "feed", i); + feed_targets[feed_var_names_[i]] = &feeds[i]; } - executor->Run(*program_, scope, 0, true, true); - // get_fetch_variable fetchs.resize(fetch_var_names_.size()); for (size_t i = 0; i < fetch_var_names_.size(); ++i) { - fetchs[i] = framework::GetFetchVariable(*scope, "fetch", i); + fetch_targets[fetch_var_names_[i]] = &fetchs[i]; } + executor->Run(*program_, scope, feed_targets, fetch_targets); + delete place; delete scope; delete executor; diff --git a/paddle/operators/activation_op.h b/paddle/operators/activation_op.h index 88c3d1c597..c0809abc05 100644 --- a/paddle/operators/activation_op.h +++ b/paddle/operators/activation_op.h @@ -323,7 +323,7 @@ template struct FloorFunctor : public BaseActivationFunctor { template void operator()(Device d, X x, Out out) const { - out.device(d) = x.ceil(); + out.device(d) = x.floor(); } }; diff --git a/paddle/operators/detail/grpc_client.cc b/paddle/operators/detail/grpc_client.cc index d699dabf2f..c433945542 100644 --- a/paddle/operators/detail/grpc_client.cc +++ b/paddle/operators/detail/grpc_client.cc @@ -101,8 +101,8 @@ bool RPCClient::Wait() { if (req_count_ <= 0) { return true; } - - std::vector a(req_count_); + const size_t kReqCnt = req_count_; + bool a[kReqCnt]; std::vector> waits(req_count_); for (int i = 0; i < req_count_; i++) { diff --git a/paddle/operators/gru_op.cc b/paddle/operators/gru_op.cc index 76f2adefed..fb901b6394 100644 --- a/paddle/operators/gru_op.cc +++ b/paddle/operators/gru_op.cc @@ -135,14 +135,14 @@ class GRUOpMaker : public framework::OpProtoAndCheckerMaker { AddComment(R"DOC( GRU Operator implements part calculations of the complete GRU as following: -\f[ -update \ gate: u_t = actGate(xu_t + W_u * h_{t-1} + b_u) \\ -reset \ gate: r_t = actGate(xr_t + W_r * h_{t-1} + b_r) \\ -output \ candidate: {h}_t = actNode(xc_t + W_c * dot(r_t, h_{t-1}) + b_c) \\ +$$ +update\_gate: u_t = actGate(xu_t + W_u * h_{t-1} + b_u) \\ +reset\_gate: r_t = actGate(xr_t + W_r * h_{t-1} + b_r) \\ +output\_candidate: {h}_t = actNode(xc_t + W_c * dot(r_t, h_{t-1}) + b_c) \\ output: h_t = dot((1 - u_t), h_{t-1}) + dot(u_t, {h}_t) -\f] +$$ -@note To implement the complete GRU, fully-connected operator must be used +@note To implement the complete GRU, fully-connected operator must be used before to feed xu, xr and xc as the Input of GRU operator. )DOC"); } diff --git a/paddle/operators/one_hot_op.cc b/paddle/operators/one_hot_op.cc new file mode 100644 index 0000000000..e78b7468de --- /dev/null +++ b/paddle/operators/one_hot_op.cc @@ -0,0 +1,95 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/operators/one_hot_op.h" +#include "paddle/framework/framework.pb.h" + +namespace paddle { +namespace operators { + +class OneHotOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + void InferShape(framework::InferShapeContext* ctx) const override { + PADDLE_ENFORCE(ctx->HasInput("X"), + "Input(X) of OneHotOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutput("Out"), + "Output(Out) of OneHotOp should not be null."); + + auto x_dims = ctx->GetInputDim("X"); + PADDLE_ENFORCE_GE(x_dims.size(), 2, + "Rank of Input(X) should be at least 2."); + PADDLE_ENFORCE_GE(x_dims[x_dims.size() - 1], 1U, + "Last dimension of Input(X) should be 1."); + + int depth = ctx->Attrs().Get("depth"); + + PADDLE_ENFORCE_GT(depth, 0, "Should provide a positive depth (%d).", depth); + + framework::DDim out_dims(x_dims); + out_dims[out_dims.size() - 1] = depth; + ctx->SetOutputDim("Out", out_dims); + ctx->ShareLoD("X", /* --> */ "Out"); + } +}; + +class OneHotOpMaker : public framework::OpProtoAndCheckerMaker { + public: + OneHotOpMaker(OpProto* proto, OpAttrChecker* op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", + "(LoDTensor, LoDTensor) Input variable with rank at least 2. " + "The last dimension of X should be 1. Each value of X is an index " + "to indicate the position."); + AddOutput("Out", + "(Tensor, Tensor) Output tensor with same rank as X. " + "The tensor consists of one-hot representations of values in X."); + AddAttr("depth", + "A positive integer to specify the length of one-hot vector."); + AddAttr("dtype", + "An integer to specify the data type of one-hot " + "vector. The default value is FP32.") + .SetDefault(paddle::framework::proto::DataType::FP32); + AddComment(R"DOC( +One Hot Operator. This operator creates the one-hot representations for input +index values. The following example will help to explain the function of this +operator: + +X is a LoDTensor: + X.lod = [[0, 1, 4]] + X.shape = [4, 1] + X.data = [[1], [1], [3], [0]] + +set depth = 4 + +Out is a LoDTensor: + Out.lod = [[0, 1, 4]] + Out.shape = [4, 4] + Out.data = [[0., 1., 0., 0.], + [0., 1., 0., 0.], + [0., 0., 0., 1.], + [1., 0., 0., 0.]] +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OPERATOR(one_hot, ops::OneHotOp, ops::OneHotOpMaker, + paddle::framework::EmptyGradOpMaker); +REGISTER_OP_CPU_KERNEL( + one_hot, ops::OneHotKernel, + ops::OneHotKernel); diff --git a/paddle/operators/one_hot_op.cu b/paddle/operators/one_hot_op.cu new file mode 100644 index 0000000000..16f6d9433e --- /dev/null +++ b/paddle/operators/one_hot_op.cu @@ -0,0 +1,80 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/operators/one_hot_op.h" +#include "paddle/platform/cuda_helper.h" +#include "paddle/platform/gpu_info.h" + +namespace paddle { +namespace operators { +using platform::PADDLE_CUDA_NUM_THREADS; + +template +__global__ void FillOutputKernel(const InT* p_in_data, OutT* p_out_data, + const int64_t numel, const int depth) { + int idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx < numel) { + *(p_out_data + (idx * depth) + p_in_data[idx]) = 1.0; + } +} + +template +struct OneHotOpCUDAFunctor { + const framework::LoDTensor* in_; + framework::LoDTensor* out_; + const DeviceContext& ctx_; + int depth_; + + OneHotOpCUDAFunctor(const framework::LoDTensor* in, framework::LoDTensor* out, + int depth, const DeviceContext& ctx) + : in_(in), out_(out), depth_(depth), ctx_(ctx) {} + + template + void operator()() const { + auto* p_in_data = in_->data(); + auto numel = in_->numel(); + auto* p_out_data = out_->mutable_data(ctx_.GetPlace()); + auto stream = ctx_.stream(); + math::set_constant(ctx_, out_, 0.0); + + FillOutputKernel<<<(numel + PADDLE_CUDA_NUM_THREADS - 1) / + PADDLE_CUDA_NUM_THREADS, + PADDLE_CUDA_NUM_THREADS, 0, stream>>>( + p_in_data, p_out_data, numel, depth_); + } +}; + +using LoDTensor = framework::LoDTensor; +template +class OneHotCUDAKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& context) const override { + auto* in = context.Input("X"); + auto* out = context.Output("Out"); + int depth = context.Attr("depth"); + + framework::VisitDataType( + static_cast(context.Attr("dtype")), + OneHotOpCUDAFunctor( + in, out, depth, context.template device_context())); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OP_CUDA_KERNEL( + one_hot, ops::OneHotCUDAKernel, + ops::OneHotCUDAKernel); diff --git a/paddle/operators/one_hot_op.h b/paddle/operators/one_hot_op.h new file mode 100644 index 0000000000..12031ede2c --- /dev/null +++ b/paddle/operators/one_hot_op.h @@ -0,0 +1,68 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "paddle/framework/op_registry.h" +#include "paddle/operators/math/math_function.h" + +namespace paddle { +namespace operators { + +template +struct OneHotOpFunctor { + const framework::LoDTensor* in_; + framework::LoDTensor* out_; + int depth_; + const DeviceContext& ctx_; + + OneHotOpFunctor(const framework::LoDTensor* in, framework::LoDTensor* out, + int depth, const DeviceContext& ctx) + : in_(in), out_(out), depth_(depth), ctx_(ctx) {} + + template + void operator()() const { + auto* p_in_data = in_->data(); + auto numel = in_->numel(); + auto* p_out_data = out_->mutable_data(ctx_.GetPlace()); + math::set_constant(ctx_, out_, 0.0); + + for (int i = 0; i < numel; ++i) { + PADDLE_ENFORCE_GE(p_in_data[i], 0, + "Illegal index value, should be at least 0."); + PADDLE_ENFORCE_LT(p_in_data[i], depth_, + "Illegal index value, should be less than depth (%d).", + depth_); + *(p_out_data + i * depth_ + p_in_data[i]) = 1.0; + } + } +}; + +using LoDTensor = framework::LoDTensor; +template +class OneHotKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& context) const override { + auto* in = context.Input("X"); + auto* out = context.Output("Out"); + int depth = context.Attr("depth"); + + framework::VisitDataType( + static_cast(context.Attr("dtype")), + OneHotOpFunctor( + in, out, depth, context.template device_context())); + } +}; + +} // namespace operators +} // namespace paddle diff --git a/paddle/pybind/CMakeLists.txt b/paddle/pybind/CMakeLists.txt index e78673e0ba..de53fea0dd 100644 --- a/paddle/pybind/CMakeLists.txt +++ b/paddle/pybind/CMakeLists.txt @@ -1,7 +1,7 @@ if(WITH_PYTHON) cc_library(paddle_pybind SHARED SRCS pybind.cc exception.cc protobuf.cc const_value.cc - DEPS pybind python backward proto_desc paddle_memory executor prune init profiler + DEPS pybind python backward proto_desc paddle_memory executor prune init profiler feed_fetch_method ${GLOB_OP_LIB}) if(NOT APPLE AND NOT ANDROID) target_link_libraries(paddle_pybind rt) diff --git a/paddle/pybind/pybind.cc b/paddle/pybind/pybind.cc index b4fd2a8989..490397afdd 100644 --- a/paddle/pybind/pybind.cc +++ b/paddle/pybind/pybind.cc @@ -424,7 +424,9 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init()) - .def("run", &Executor::Run); + .def("run", + (void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) & + Executor::Run); m.def("unique_integer", UniqueIntegerGenerator); m.def("init_gflags", framework::InitGflags); diff --git a/python/paddle/v2/fluid/tests/book/test_rnn_encoder_decoder.py b/python/paddle/v2/fluid/tests/book/test_rnn_encoder_decoder.py index 3fd3dbaf77..fdc6086176 100644 --- a/python/paddle/v2/fluid/tests/book/test_rnn_encoder_decoder.py +++ b/python/paddle/v2/fluid/tests/book/test_rnn_encoder_decoder.py @@ -49,7 +49,11 @@ def bi_lstm_encoder(input_seq, hidden_size): size=hidden_size * 4, is_reverse=True, use_peepholes=USE_PEEPHOLES) - return forward, backward + + forward_last = fluid.layers.sequence_last_step(input=forward) + backward_first = fluid.layers.sequence_first_step(input=backward) + + return forward_last, backward_first # FIXME(peterzhang2029): Replace this function with the lstm_unit_op. @@ -115,16 +119,13 @@ def seq_to_seq_net(): size=[source_dict_dim, embedding_dim], dtype='float32') - src_forward, src_backward = bi_lstm_encoder( + src_forward_last, src_backward_first = bi_lstm_encoder( input_seq=src_embedding, hidden_size=encoder_size) - src_forward_last = fluid.layers.sequence_last_step(input=src_forward) - src_backward_first = fluid.layers.sequence_first_step(input=src_backward) - encoded_vector = fluid.layers.concat( input=[src_forward_last, src_backward_first], axis=1) - decoder_boot = fluid.layers.fc(input=encoded_vector, + decoder_boot = fluid.layers.fc(input=src_backward_first, size=decoder_size, bias_attr=False, act='tanh') diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py index 218dea31e1..298ecfc386 100644 --- a/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_image_classification.py @@ -1,21 +1,19 @@ -#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # -#Licensed under the Apache License, Version 2.0 (the "License"); -#you may not use this file except in compliance with the License. -#You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -#Unless required by applicable law or agreed to in writing, software -#distributed under the License is distributed on an "AS IS" BASIS, -#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -#See the License for the specific language governing permissions and -#limitations under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function -import sys - import paddle.v2 as paddle import paddle.v2.fluid as fluid import os @@ -106,10 +104,10 @@ if len(sys.argv) >= 2: net_type = sys.argv[1] if net_type == "vgg": - print("train vgg net") + print("training vgg net") net = vgg16_bn_drop(images) elif net_type == "resnet": - print("train resnet") + print("training resnet") net = resnet_cifar10(images, 32) else: raise ValueError("%s network is not supported" % net_type) @@ -129,6 +127,7 @@ train_reader = paddle.batch( batch_size=BATCH_SIZE) place = fluid.CPUPlace() +feeder = fluid.DataFeeder(place=place, feed_list=[images, label]) exe = fluid.Executor(place) t = fluid.DistributeTranspiler() @@ -146,17 +145,14 @@ if training_role == "PSERVER": if not current_endpoint: print("need env SERVER_ENDPOINT") exit(1) - print("start pserver at:", current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup) exe.run(pserver_prog) - print("pserver run end") elif training_role == "TRAINER": - print("start trainer") trainer_prog = t.get_trainer_program() - feeder = fluid.DataFeeder(place=place, feed_list=[images, label]) exe.run(fluid.default_startup_program()) + for pass_id in range(PASS_NUM): accuracy.reset(exe) for data in train_reader(): @@ -164,9 +160,10 @@ elif training_role == "TRAINER": feed=feeder.feed(data), fetch_list=[avg_cost] + accuracy.metrics) pass_acc = accuracy.eval(exe) - print("loss:" + str(loss) + " acc:" + str(acc) + " pass_acc:" + str( - pass_acc)) - # this model is slow, so if we can train two mini batch, we think it works properly. + print("pass_id:" + str(pass_id) + "loss:" + str(loss) + " pass_acc:" + + str(pass_acc)) + # this model is slow, so if we can train two mini batches, + # we think it works properly. print("trainer run end") else: print("environment var TRAINER_ROLE should be TRAINER os PSERVER") diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py b/python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py new file mode 100644 index 0000000000..bff376a0e2 --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py @@ -0,0 +1,135 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import os +import paddle.v2 as paddle +import paddle.v2.fluid as fluid + + +def stacked_lstm_net(data, + label, + input_dim, + class_dim=2, + emb_dim=128, + hid_dim=512, + stacked_num=3): + assert stacked_num % 2 == 1 + + emb = fluid.layers.embedding(input=data, size=[input_dim, emb_dim]) + # add bias attr + + # TODO(qijun) linear act + fc1 = fluid.layers.fc(input=emb, size=hid_dim) + lstm1, cell1 = fluid.layers.dynamic_lstm(input=fc1, size=hid_dim) + + inputs = [fc1, lstm1] + + for i in range(2, stacked_num + 1): + fc = fluid.layers.fc(input=inputs, size=hid_dim) + lstm, cell = fluid.layers.dynamic_lstm( + input=fc, size=hid_dim, is_reverse=(i % 2) == 0) + inputs = [fc, lstm] + + fc_last = fluid.layers.sequence_pool(input=inputs[0], pool_type='max') + lstm_last = fluid.layers.sequence_pool(input=inputs[1], pool_type='max') + + prediction = fluid.layers.fc(input=[fc_last, lstm_last], + size=class_dim, + act='softmax') + cost = fluid.layers.cross_entropy(input=prediction, label=label) + avg_cost = fluid.layers.mean(x=cost) + adam_optimizer = fluid.optimizer.Adam(learning_rate=0.002) + optimize_ops, params_grads = adam_optimizer.minimize(avg_cost) + accuracy = fluid.evaluator.Accuracy(input=prediction, label=label) + return avg_cost, accuracy, accuracy.metrics[0], optimize_ops, params_grads + + +def to_lodtensor(data, place): + seq_lens = [len(seq) for seq in data] + cur_len = 0 + lod = [cur_len] + for l in seq_lens: + cur_len += l + lod.append(cur_len) + flattened_data = np.concatenate(data, axis=0).astype("int64") + flattened_data = flattened_data.reshape([len(flattened_data), 1]) + res = fluid.LoDTensor() + res.set(flattened_data, place) + res.set_lod([lod]) + return res + + +def main(): + BATCH_SIZE = 100 + PASS_NUM = 5 + + word_dict = paddle.dataset.imdb.word_dict() + print "loaded word dict successfully" + dict_dim = len(word_dict) + class_dim = 2 + + data = fluid.layers.data( + name="words", shape=[1], dtype="int64", lod_level=1) + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + cost, accuracy, acc_out, optimize_ops, params_grads = stacked_lstm_net( + data, label, input_dim=dict_dim, class_dim=class_dim) + + train_data = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.imdb.train(word_dict), buf_size=1000), + batch_size=BATCH_SIZE) + place = fluid.CPUPlace() + exe = fluid.Executor(place) + feeder = fluid.DataFeeder(feed_list=[data, label], place=place) + + t = fluid.DistributeTranspiler() + # all parameter server endpoints list for spliting parameters + pserver_endpoints = os.getenv("PSERVERS") + # server endpoint for current node + current_endpoint = os.getenv("SERVER_ENDPOINT") + # run as trainer or parameter server + training_role = os.getenv( + "TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver + t.transpile( + optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) + + if training_role == "PSERVER": + if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + exe.run(fluid.default_startup_program()) + trainer_prog = t.get_trainer_program() + for pass_id in xrange(PASS_NUM): + accuracy.reset(exe) + for data in train_data(): + cost_val, acc_val = exe.run(trainer_prog, + feed=feeder.feed(data), + fetch_list=[cost, acc_out]) + pass_acc = accuracy.eval(exe) + print("cost=" + str(cost_val) + " acc=" + str(acc_val) + + " pass_acc=" + str(pass_acc)) + if cost_val < 1.0 and acc_val > 0.8: + exit(0) + else: + print("environment var TRAINER_ROLE should be TRAINER os PSERVER") + + +if __name__ == '__main__': + main() diff --git a/python/paddle/v2/fluid/tests/test_activation_op.py b/python/paddle/v2/fluid/tests/test_activation_op.py index 18605e6065..1de5d446b8 100644 --- a/python/paddle/v2/fluid/tests/test_activation_op.py +++ b/python/paddle/v2/fluid/tests/test_activation_op.py @@ -186,8 +186,7 @@ class TestFloor(OpTest): self.op_type = "floor" x = np.random.uniform(-1, 1, [4, 4]).astype("float32") self.inputs = {'X': x} - # numpy floor need +1 - self.outputs = {'Out': np.floor(self.inputs['X']) + 1.0} + self.outputs = {'Out': np.floor(self.inputs['X'])} def test_check_output(self): self.check_output() diff --git a/python/paddle/v2/fluid/tests/test_one_hot_op.py b/python/paddle/v2/fluid/tests/test_one_hot_op.py new file mode 100644 index 0000000000..e51ea27d14 --- /dev/null +++ b/python/paddle/v2/fluid/tests/test_one_hot_op.py @@ -0,0 +1,110 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +import math +from op_test import OpTest +import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core +import paddle.v2.fluid.framework as framework +from paddle.v2.fluid.framework import Program, program_guard + + +class TestOneHotOp(OpTest): + def setUp(self): + self.op_type = 'one_hot' + depth = 10 + dimension = 12 + x_lod = [[0, 4, 5, 8, 11]] + x = [np.random.randint(0, depth - 1) for i in xrange(x_lod[0][-1])] + x = np.array(x).astype('int').reshape([x_lod[0][-1], 1]) + + out = np.zeros(shape=(np.product(x.shape[:-1]), + depth)).astype('float32') + + for i in xrange(np.product(x.shape)): + out[i, x[i]] = 1.0 + + self.inputs = {'X': (x, x_lod)} + self.attrs = {'depth': depth, 'dtype': int(core.DataType.FP32)} + self.outputs = {'Out': (out, x_lod)} + + def test_check_output(self): + self.check_output() + + +class TestOneHotOp_default_dtype(OpTest): + def setUp(self): + self.op_type = 'one_hot' + depth = 10 + dimension = 12 + x_lod = [[0, 4, 5, 8, 11]] + x = [np.random.randint(0, depth - 1) for i in xrange(x_lod[0][-1])] + x = np.array(x).astype('int').reshape([x_lod[0][-1], 1]) + + out = np.zeros(shape=(np.product(x.shape[:-1]), + depth)).astype('float32') + + for i in xrange(np.product(x.shape)): + out[i, x[i]] = 1.0 + + self.inputs = {'X': (x, x_lod)} + self.attrs = {'depth': depth} + self.outputs = {'Out': (out, x_lod)} + + def test_check_output(self): + self.check_output() + + +class TestOneHotOp_exception(OpTest): + def setUp(self): + self.op_type = 'one_hot' + self.depth = 10 + self.place = core.CPUPlace() + self.dimension = 12 + self.x = core.LoDTensor() + x_lod = [[0, 4, 5, 8, 11]] + data = [np.random.randint(11, 20) for i in xrange(x_lod[0][-1])] + data = np.array(data).astype('int').reshape([x_lod[0][-1], 1]) + self.x.set(data, self.place) + self.x.set_lod(x_lod) + + def test_check_output(self): + program = Program() + with program_guard(program): + x = fluid.layers.data( + name='x', shape=[self.dimension], dtype='float32', lod_level=1) + block = program.current_block() + one_hot_out = block.create_var( + name="one_hot_out", + type=core.VarDesc.VarType.LOD_TENSOR, + dtype='float32') + block.append_op( + type='one_hot', + inputs={'X': x}, + attrs={'depth': self.depth}, + outputs={'Out': one_hot_out}) + exe = fluid.Executor(self.place) + + def run(): + exe.run(feed={'x': self.x}, + fetch_list=[one_hot_out], + return_numpy=False) + + self.assertRaises(core.EnforceNotMet, run) + + +if __name__ == '__main__': + unittest.main()