From 58f896c3f40602116f68f6bfc58c96228d0f48bd Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 18 Oct 2016 03:12:03 +0000 Subject: [PATCH] Speed up PyDP2, support numpy.float array (#207) --- cmake/flags.cmake | 4 ++- demo/mnist/data/get_mnist_data.sh | 2 +- paddle/gserver/dataproviders/DataProvider.cpp | 8 +++-- paddle/gserver/dataproviders/DataProvider.h | 5 +-- .../gserver/dataproviders/PyDataProvider2.cpp | 36 +++++++++++++++---- paddle/utils/Queue.h | 15 ++++++++ .../trainer_config_helpers/data_sources.py | 1 + 7 files changed, 59 insertions(+), 12 deletions(-) diff --git a/cmake/flags.cmake b/cmake/flags.cmake index cc59309ee7..dbad6be3f4 100644 --- a/cmake/flags.cmake +++ b/cmake/flags.cmake @@ -64,7 +64,9 @@ set(COMMON_FLAGS -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-error=literal-suffix - -Wno-error=unused-local-typedefs) + -Wno-error=unused-local-typedefs + -Wno-error=unused-function # Warnings in Numpy Header. +) foreach(flag ${COMMON_FLAGS}) safe_set_cflag(CMAKE_C_FLAGS ${flag}) diff --git a/demo/mnist/data/get_mnist_data.sh b/demo/mnist/data/get_mnist_data.sh index c3ef994450..9099b5ab6f 100755 --- a/demo/mnist/data/get_mnist_data.sh +++ b/demo/mnist/data/get_mnist_data.sh @@ -1,6 +1,6 @@ #!/usr/bin/env sh # This scripts downloads the mnist data and unzips it. - +set -e DIR="$( cd "$(dirname "$0")" ; pwd -P )" rm -rf "$DIR/raw_data" mkdir "$DIR/raw_data" diff --git a/paddle/gserver/dataproviders/DataProvider.cpp b/paddle/gserver/dataproviders/DataProvider.cpp index c3b4769f76..8cefbb30ad 100644 --- a/paddle/gserver/dataproviders/DataProvider.cpp +++ b/paddle/gserver/dataproviders/DataProvider.cpp @@ -57,7 +57,8 @@ void BufferBatch::clone(DataBatch* srcBatch, bool useGpu) { } } -DoubleBuffer::DoubleBuffer(DataProvider* dataPool, bool useGpu, +DoubleBuffer::DoubleBuffer(DataProvider *dataPool, + bool useGpu, int64_t batchSize) { batchSize_ = batchSize; dataPool_ = dataPool; @@ -110,6 +111,9 @@ void DoubleBuffer::removeOneBatch(DataBatch* dataBatch) { } void DoubleBuffer::insertOneBatch(DataBatch* batch) { + while (!bufferQueue_->waitNotEmptyFor(2 /* seconds */)) { // time out + if (stopping_) return; + } BufferBatch* bufBatch = bufferQueue_->dequeue(); // clone and copy the data from an Threadlocal Variable bufBatch->clone(batch, useGpu_); @@ -138,7 +142,7 @@ void DoubleBuffer::asyncLoadBatch() { actualSize = dataPool_->getNextBatchInternal(batchSize_, &newBatch); } insertOneBatch(&newBatch); - } while (actualSize > 0); + } while (actualSize > 0 && !stopping_); } } diff --git a/paddle/gserver/dataproviders/DataProvider.h b/paddle/gserver/dataproviders/DataProvider.h index c24546374a..112e45de1c 100644 --- a/paddle/gserver/dataproviders/DataProvider.h +++ b/paddle/gserver/dataproviders/DataProvider.h @@ -259,7 +259,9 @@ typedef Queue BufferBatchQueue; class DoubleBuffer { public: - DoubleBuffer(DataProvider* dataPool, bool useGpu, int64_t batchSize = 0); + DoubleBuffer(DataProvider* dataPool, + bool useGpu, + int64_t batchSize = 0); virtual ~DoubleBuffer(); void removeOneBatch(DataBatch* dataBatch); @@ -349,7 +351,6 @@ public: */ virtual void reset() { if (doubleBuffer_ != nullptr) { - LOG(INFO) << "the double-buffer is starting ..."; doubleBuffer_->startAsyncLoad(); } } diff --git a/paddle/gserver/dataproviders/PyDataProvider2.cpp b/paddle/gserver/dataproviders/PyDataProvider2.cpp index e3e472ac16..c464d01fde 100644 --- a/paddle/gserver/dataproviders/PyDataProvider2.cpp +++ b/paddle/gserver/dataproviders/PyDataProvider2.cpp @@ -18,9 +18,16 @@ limitations under the License. */ #include #include #include +#include +#include +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include #include "DataProvider.h" + #include "paddle/utils/PythonUtil.h" +#include "paddle/utils/Locks.h" +#include "paddle/utils/Stat.h" namespace paddle { @@ -202,7 +209,10 @@ public: PyDataProvider2(const DataConfig& config, const ModelConfig& modelConfig, bool useGpu) - :DataProvider(config, useGpu), callingContextCreated_(2) { + :DataProvider(config, useGpu), + callingContextCreated_(2) { + if (PyArray_API == NULL) + import_array(); auto& args = config.load_data_args(); PyObjectPtr kwargs = PyObjectPtr(PyDict_New()); if (!args.empty()) { @@ -454,6 +464,7 @@ private: std::condition_variable pushCV_; std::condition_variable pullCV_; std::mutex mtx_; + ThreadBarrier callingContextCreated_; std::unique_ptr cache_; @@ -496,8 +507,8 @@ public: * Resetting the PyDataProvider. May start reading thread here. */ virtual void reset() { - DataProvider::reset(); resetImpl(true); + DataProvider::reset(); } /** @@ -518,6 +529,7 @@ public: * Loading a batch of data. */ int64_t getNextBatchInternal(int64_t size_, DataBatch *batch) { + REGISTER_TIMER("PyDP2.getNextBatchInternal") CHECK_GE(size_, 0); size_t size = (size_t) size_; if (loadThread_) { // loading from thread should wait for data pool ready. @@ -698,10 +710,22 @@ public: */ virtual void fill(Argument &argument, PyObject *obj) { real* dat = argument.value->getData() + height_ * headerPtr_->dim; - py::SequenceHelper s(obj); - // TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy. - for (size_t i=0; i < headerPtr_->dim; ++i) { - dat[i] = (real) s.getDouble(i); + if (PyArray_Check(obj)) { + auto dtype = PyArray_DTYPE((PyArrayObject*)obj); + if (dtype->type == 'f' && dtype->elsize == sizeof(real)) { + real * data = (real*)PyArray_DATA((PyArrayObject*)obj); + auto sz = PyArray_SIZE((PyArrayObject*)obj); + std::copy(data, data + sz, dat); + } else { + LOG(FATAL) << "You should yield float" << sizeof(real) * 8 + << " array"; + } + } else { + py::SequenceHelper s(obj); + // TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy. + for (size_t i=0; i < headerPtr_->dim; ++i) { + dat[i] = (real) s.getDouble(i); + } } ++height_; } diff --git a/paddle/utils/Queue.h b/paddle/utils/Queue.h index d73f27d7fa..f952cf5877 100644 --- a/paddle/utils/Queue.h +++ b/paddle/utils/Queue.h @@ -135,6 +135,21 @@ public: queueCV_.wait(lock, [this]() { return numElements_ == 0; }); } + /** + * @brief wait queue is not empty at most for some seconds. + * @param seconds wait time limit. + * @return true if queue is not empty. false if timeout. + */ + bool waitNotEmptyFor(int seconds) { + std::unique_lock lock(queueLock_); + return queueCV_.wait_for( + lock, + std::chrono::seconds(seconds), + [this] { + return numElements_ != 0; + }); + } + private: std::deque elements_; int numElements_; diff --git a/python/paddle/trainer_config_helpers/data_sources.py b/python/paddle/trainer_config_helpers/data_sources.py index 8ada3903dc..3b5c17a271 100644 --- a/python/paddle/trainer_config_helpers/data_sources.py +++ b/python/paddle/trainer_config_helpers/data_sources.py @@ -84,6 +84,7 @@ def define_py_data_source(file_list, cls, module, data.load_data_module = load_data_module data.load_data_object = load_data_object data.load_data_args = load_data_args + data.async_load_data = True return data data_cls = py_data2