From f43be75b82582ec5f81c2ceba45eb14128638478 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 2 Apr 2018 20:25:11 +0800 Subject: [PATCH 01/18] multi stream thread pool --- paddle/fluid/framework/threadpool.cc | 15 +++++++++++++++ paddle/fluid/framework/threadpool.h | 16 ++++++++++++++++ paddle/fluid/operators/detail/grpc_client.cc | 12 +++++++----- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/threadpool.cc b/paddle/fluid/framework/threadpool.cc index 9854d618d2..0a8377cc47 100644 --- a/paddle/fluid/framework/threadpool.cc +++ b/paddle/fluid/framework/threadpool.cc @@ -91,5 +91,20 @@ void ThreadPool::TaskLoop() { } } +std::unique_ptr MultiStreamThreadPool::io_threadpool_(nullptr); +std::once_flag MultiStreamThreadPool::io_init_flag_; + +MultiStreamThreadPool* MultiStreamThreadPool::GetInstanceIO() { + std::call_once(io_init_flag_, &MultiStreamThreadPool::InitIO); + return static_cast(io_threadpool_.get()); +} + +void MultiStreamThreadPool::InitIO() { + if (io_threadpool_.get() == nullptr) { + // TODO(typhoonzero1986): make this configurable + io_threadpool_.reset(new ThreadPool(100)); + } +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index f9dce7105e..5d437594ab 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -135,6 +135,17 @@ class ThreadPool { std::condition_variable completed_; }; +class MultiStreamThreadPool : ThreadPool { + public: + static MultiStreamThreadPool* GetInstanceIO(); + static void InitIO(); + + private: + // NOTE: threadpool in base will be inhereted here. + static std::unique_ptr io_threadpool_; + static std::once_flag io_init_flag_; +}; + // Run a function asynchronously. // NOTE: The function must return void. If the function need to return a value, // you can use lambda to capture a value pointer. @@ -143,5 +154,10 @@ std::future Async(Callback callback) { return ThreadPool::GetInstance()->Run(callback); } +template +std::future AsyncIO(Callback callback) { + return MultiStreamThreadPool::GetInstanceIO()->Run(callback); +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index d79ba6d291..3f96ce3718 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -33,7 +33,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] { + framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, + this] { auto* var = p_scope->FindVar(var_name_val); ::grpc::ByteBuffer req; @@ -88,7 +89,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] { + framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, + this] { // prepare input sendrecv::VariableMessage req; req.set_varname(var_name_val); @@ -131,8 +133,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, - time_out, ch, this] { + framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, + time_out, ch, this] { auto* var = p_scope->FindVar(in_var_name_val); ::grpc::ByteBuffer req; @@ -195,7 +197,7 @@ bool RPCClient::Wait() { std::vector> waits(req_count_); for (int i = 0; i < req_count_; i++) { - waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); }); + waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); }); } for (int i = 0; i < req_count_; i++) { From b851c0739f29eebfb9d63db026c847733fa8d252 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 3 Apr 2018 10:02:34 +0800 Subject: [PATCH 02/18] update compile --- paddle/fluid/framework/threadpool.h | 32 ++++++++++---------- paddle/fluid/operators/detail/grpc_client.cc | 12 +++----- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 5d437594ab..0a60488d9f 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -28,6 +28,22 @@ limitations under the License. */ namespace paddle { namespace framework { +struct ExceptionHandler { + mutable std::future> future_; + explicit ExceptionHandler( + std::future>&& f) + : future_(std::move(f)) {} + void operator()() const { + auto ex = this->future_.get(); + if (ex != nullptr) { + LOG(FATAL) << "The exception is thrown inside the thread pool. You " + "should use RunAndGetException to handle the exception.\n" + "The default exception handler is LOG(FATAL)." + << ex->what(); + } + } +}; + // ThreadPool maintains a queue of tasks, and runs them using a fixed // number of threads. class ThreadPool { @@ -87,22 +103,6 @@ class ThreadPool { void Wait(); private: - struct ExceptionHandler { - mutable std::future> future_; - explicit ExceptionHandler( - std::future>&& f) - : future_(std::move(f)) {} - void operator()() const { - auto ex = this->future_.get(); - if (ex != nullptr) { - LOG(FATAL) << "The exception is thrown inside the thread pool. You " - "should use RunAndGetException to handle the exception.\n" - "The default exception handler is LOG(FATAL)." - << ex->what(); - } - } - }; - DISABLE_COPY_AND_ASSIGN(ThreadPool); // If the task queue is empty and avaialbe is equal to the number of diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index 3f96ce3718..d79ba6d291 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -33,8 +33,7 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, - this] { + framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] { auto* var = p_scope->FindVar(var_name_val); ::grpc::ByteBuffer req; @@ -89,8 +88,7 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, - this] { + framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] { // prepare input sendrecv::VariableMessage req; req.set_varname(var_name_val); @@ -133,8 +131,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, - time_out, ch, this] { + framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, + time_out, ch, this] { auto* var = p_scope->FindVar(in_var_name_val); ::grpc::ByteBuffer req; @@ -197,7 +195,7 @@ bool RPCClient::Wait() { std::vector> waits(req_count_); for (int i = 0; i < req_count_; i++) { - waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); }); + waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); }); } for (int i = 0; i < req_count_; i++) { From fbd3604cad8fdb3ad7fa2f6717395b1c40e6ecaf Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Tue, 3 Apr 2018 05:31:52 +0000 Subject: [PATCH 03/18] Split Executor.Run to Executor.Prepare and Executor.RunPreparedContext for inference. --- paddle/fluid/framework/executor.cc | 94 ++++++++++++------- paddle/fluid/framework/executor.h | 7 ++ .../test_inference_image_classification.cc | 4 +- paddle/fluid/inference/tests/test_helper.h | 20 +++- 4 files changed, 85 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 64c06687b6..009d0fbeb8 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -129,13 +129,15 @@ static bool has_feed_operators( feed_count, feed_targets.size(), "The number of feed operators should match 'feed_targets'"); - // When feed operator are present, so should be feed_holder - auto var = block.FindVar(feed_holder_name); - PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", - feed_holder_name); - PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH, - "'%s' variable should be 'FEED_MINIBATCH' type", - feed_holder_name); + if (!feed_holder_name.empty()) { + // When feed operator are present, so should be feed_holder + auto var = block.FindVar(feed_holder_name); + PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", + feed_holder_name); + PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH, + "'%s' variable should be 'FEED_MINIBATCH' type", + feed_holder_name); + } } return feed_count > 0; @@ -169,13 +171,15 @@ static bool has_fetch_operators( fetch_count, fetch_targets.size(), "The number of fetch operators should match 'fetch_targets'"); - // When fetch operator are present, so should be fetch_holder - auto var = block.FindVar(fetch_holder_name); - PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", - fetch_holder_name); - PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST, - "'%s' variable should be 'FETCH_LIST' type", - fetch_holder_name); + if (!fetch_holder_name.empty()) { + // When fetch operator are present, so should be fetch_holder + auto var = block.FindVar(fetch_holder_name); + PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", + fetch_holder_name); + PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST, + "'%s' variable should be 'FETCH_LIST' type", + fetch_holder_name); + } } return fetch_count > 0; @@ -222,16 +226,6 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } } - // map the data of feed_targets to feed_holder - for (auto* op : global_block->AllOps()) { - if (op->Type() == kFeedOpType) { - std::string feed_target_name = op->Output("Out")[0]; - int idx = boost::get(op->GetAttr("col")); - SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name, - idx); - } - } - if (!has_fetch_ops) { // create fetch_holder variable auto* fetch_holder = global_block->Var(fetch_holder_name); @@ -255,17 +249,9 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } } - Run(*copy_program, scope, 0, create_vars, create_vars); - - // obtain the data of fetch_targets from fetch_holder - for (auto* op : global_block->AllOps()) { - if (op->Type() == kFetchOpType) { - std::string fetch_target_name = op->Input("X")[0]; - int idx = boost::get(op->GetAttr("col")); - *fetch_targets[fetch_target_name] = - GetFetchVariable(*scope, fetch_holder_name, idx); - } - } + auto ctx = Prepare(*copy_program, 0); + RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, + feed_holder_name, fetch_holder_name, create_vars); } std::unique_ptr Executor::Prepare( @@ -343,5 +329,43 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, } } +void Executor::RunPreparedContext( + ExecutorPrepareContext* ctx, Scope* scope, + std::map& feed_targets, + std::map& fetch_targets, + const std::string& feed_holder_name, const std::string& fetch_holder_name, + bool create_vars) { + auto& global_block = ctx->prog_.Block(ctx->block_id_); + + // map the data of feed_targets to feed_holder + for (auto* op : global_block.AllOps()) { + if (op->Type() == kFeedOpType) { + std::string feed_target_name = op->Output("Out")[0]; + PADDLE_ENFORCE(feed_targets.find(feed_target_name) != feed_targets.end(), + "Variable %s is not feeded."); + + int idx = boost::get(op->GetAttr("col")); + SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name, + idx); + } + } + + RunPreparedContext(ctx, scope, create_vars, create_vars); + + // obtain the data of fetch_targets from fetch_holder + for (auto* op : global_block.AllOps()) { + if (op->Type() == kFetchOpType) { + std::string fetch_target_name = op->Input("X")[0]; + PADDLE_ENFORCE( + fetch_targets.find(fetch_target_name) != fetch_targets.end(), + "Variable %s is not fetched."); + + int idx = boost::get(op->GetAttr("col")); + *fetch_targets[fetch_target_name] = + GetFetchVariable(*scope, fetch_holder_name, idx); + } + } +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 7173c51c95..b0e64d5de0 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -65,6 +65,13 @@ class Executor { bool create_local_scope = true, bool create_vars = true); + void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, + std::map& feed_targets, + std::map& fetch_targets, + const std::string& feed_holder_name = "feed", + const std::string& fetch_holder_name = "fetch", + bool create_vars = true); + private: const platform::Place place_; }; diff --git a/paddle/fluid/inference/tests/book/test_inference_image_classification.cc b/paddle/fluid/inference/tests/book/test_inference_image_classification.cc index e9a27171f1..9126efb8c2 100644 --- a/paddle/fluid/inference/tests/book/test_inference_image_classification.cc +++ b/paddle/fluid/inference/tests/book/test_inference_image_classification.cc @@ -48,7 +48,7 @@ TEST(inference, image_classification) { // Run inference on CPU LOG(INFO) << "--- CPU Runs: ---"; - TestInference( + TestInference( dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat); LOG(INFO) << output1.dims(); @@ -59,7 +59,7 @@ TEST(inference, image_classification) { // Run inference on CUDA GPU LOG(INFO) << "--- GPU Runs: ---"; - TestInference( + TestInference( dirname, cpu_feeds, cpu_fetchs2, FLAGS_repeat); LOG(INFO) << output2.dims(); diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index dce541c097..d559cc7d03 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -88,7 +88,7 @@ void CheckError(paddle::framework::LoDTensor& output1, EXPECT_EQ(count, 0U) << "There are " << count << " different elements."; } -template +template void TestInference(const std::string& dirname, const std::vector& cpu_feeds, std::vector& cpu_fetchs, @@ -170,7 +170,14 @@ void TestInference(const std::string& dirname, // 6. Run the inference program { // Ignore the profiling results of the first run - executor.Run(*inference_program, scope, feed_targets, fetch_targets); + std::unique_ptr ctx; + if (PrepareContext) { + ctx = executor.Prepare(*inference_program, 0); + executor.RunPreparedContext( + ctx.get(), scope, feed_targets, fetch_targets); + } else { + executor.Run(*inference_program, scope, feed_targets, fetch_targets); + } // Enable the profiler paddle::platform::EnableProfiler(state); @@ -181,7 +188,14 @@ void TestInference(const std::string& dirname, "run_inference", paddle::platform::DeviceContextPool::Instance().Get(place)); - executor.Run(*inference_program, scope, feed_targets, fetch_targets); + if (PrepareContext) { + // Note: if you changed the inference_program, you need to call + // executor.Prepare() again to get a new ExecutorPrepareContext. + executor.RunPreparedContext( + ctx.get(), scope, feed_targets, fetch_targets); + } else { + executor.Run(*inference_program, scope, feed_targets, fetch_targets); + } } // Disable the profiler and print the timing information From a9e826ed495bcd5a5b625d4ce364c8c42d0d0b7d Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Sun, 8 Apr 2018 06:32:30 +0000 Subject: [PATCH 04/18] Add the check of has_feed/fetch_operators back. --- paddle/fluid/framework/executor.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 8a0ab118d0..3edaede8d6 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -352,13 +352,17 @@ void Executor::RunPreparedContext( bool create_vars) { auto& global_block = ctx->prog_.Block(ctx->block_id_); + PADDLE_ENFORCE( + has_feed_operators(global_block, feed_targets, feed_holder_name), + "Program in ExecutorPrepareContext should has feed_ops."); + PADDLE_ENFORCE( + has_fetch_operators(global_block, fetch_targets, fetch_holder_name), + "Program in the prepared context should has fetch_ops."); + // map the data of feed_targets to feed_holder for (auto* op : global_block.AllOps()) { if (op->Type() == kFeedOpType) { std::string feed_target_name = op->Output("Out")[0]; - PADDLE_ENFORCE(feed_targets.find(feed_target_name) != feed_targets.end(), - "Variable %s is not feeded."); - int idx = boost::get(op->GetAttr("col")); SetFeedVariable(scope, *feed_targets[feed_target_name], feed_holder_name, idx); @@ -371,10 +375,6 @@ void Executor::RunPreparedContext( for (auto* op : global_block.AllOps()) { if (op->Type() == kFetchOpType) { std::string fetch_target_name = op->Input("X")[0]; - PADDLE_ENFORCE( - fetch_targets.find(fetch_target_name) != fetch_targets.end(), - "Variable %s is not fetched."); - int idx = boost::get(op->GetAttr("col")); *fetch_targets[fetch_target_name] = GetFetchVariable(*scope, fetch_holder_name, idx); From 972ae6e98ffbddac7b68242f946934b07b275e01 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 9 Apr 2018 14:27:19 +0800 Subject: [PATCH 05/18] random selected rows value --- paddle/fluid/operators/uniform_random_op.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index 87699362b2..a50add9739 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -24,7 +24,15 @@ template class CPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - auto* tensor = ctx.Output("Out"); + framework::Tensor* tensor(nullptr); + auto out_var = ctx.OutputVar("Out"); + if (out_var->IsType()) { + tensor = ctx.Output("Out"); + } else if (out_var->IsType()) { + tensor = ctx.Output("Out")->mutable_value(); + } else { + PADDLE_THROW("Only support LoDTensor and SelectedRows."); + } T* data = tensor->mutable_data(ctx.GetPlace()); unsigned int seed = static_cast(ctx.Attr("seed")); std::minstd_rand engine; @@ -36,6 +44,7 @@ class CPUUniformRandomKernel : public framework::OpKernel { static_cast(ctx.Attr("min")), static_cast(ctx.Attr("max"))); int64_t size = tensor->numel(); + VLOG(3) << "size = " << size; for (int64_t i = 0; i < size; ++i) { data[i] = dist(engine); } @@ -55,6 +64,7 @@ class UniformRandomOp : public framework::OperatorWithKernel { "uniform_random's min must less then max"); auto& shape = ctx->Attrs().Get>("shape"); std::vector temp; + VLOG(3) << "shape.size() = " << shape.size(); temp.reserve(shape.size()); for (auto dim : shape) { temp.push_back(static_cast(dim)); From f909ff1a3652697f63070cf1bc8cb425d1902417 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 9 Apr 2018 15:53:00 +0800 Subject: [PATCH 06/18] update unit test --- paddle/fluid/operators/uniform_random_op.cc | 5 +- paddle/fluid/operators/uniform_random_op.cu | 13 +++++- .../tests/unittests/test_uniform_random_op.py | 46 +++++++++++++++++-- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index a50add9739..d8b38fb7eb 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -29,11 +29,14 @@ class CPUUniformRandomKernel : public framework::OpKernel { if (out_var->IsType()) { tensor = ctx.Output("Out"); } else if (out_var->IsType()) { + auto shape = ctx.Attr>("shape"); tensor = ctx.Output("Out")->mutable_value(); + tensor->Resize(framework::make_ddim(shape)); } else { PADDLE_THROW("Only support LoDTensor and SelectedRows."); } T* data = tensor->mutable_data(ctx.GetPlace()); + data[0] = static_cast(1000); unsigned int seed = static_cast(ctx.Attr("seed")); std::minstd_rand engine; if (seed == 0) { @@ -44,7 +47,6 @@ class CPUUniformRandomKernel : public framework::OpKernel { static_cast(ctx.Attr("min")), static_cast(ctx.Attr("max"))); int64_t size = tensor->numel(); - VLOG(3) << "size = " << size; for (int64_t i = 0; i < size; ++i) { data[i] = dist(engine); } @@ -64,7 +66,6 @@ class UniformRandomOp : public framework::OperatorWithKernel { "uniform_random's min must less then max"); auto& shape = ctx->Attrs().Get>("shape"); std::vector temp; - VLOG(3) << "shape.size() = " << shape.size(); temp.reserve(shape.size()); for (auto dim : shape) { temp.push_back(static_cast(dim)); diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index 1232cd1eb3..115c859527 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -43,7 +43,18 @@ template class GPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { - auto* tensor = context.Output("Out"); + framework::Tensor* tensor(nullptr); + auto out_var = ctx.OutputVar("Out"); + if (out_var->IsType()) { + tensor = ctx.Output("Out"); + } else if (out_var->IsType()) { + auto shape = ctx.Attr>("shape"); + tensor = ctx.Output("Out")->mutable_value(); + tensor->Resize(framework::make_ddim(shape)); + } else { + PADDLE_THROW("Only support LoDTensor and SelectedRows."); + } + T* data = tensor->mutable_data(context.GetPlace()); unsigned int seed = static_cast(context.Attr("seed")); if (seed == 0) { diff --git a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py index 75ff85a55f..3331e99c36 100644 --- a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py +++ b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py @@ -15,6 +15,16 @@ import unittest import numpy as np from op_test import OpTest +import paddle.fluid.core as core +from paddle.fluid.op import Operator + + +def output_hist(out): + hist, _ = np.histogram(out, range=(-5, 10)) + hist = hist.astype("float32") + hist /= float(out.size) + prob = 0.1 * np.ones((10)) + return hist, prob class TestUniformRandomOp(OpTest): @@ -33,11 +43,37 @@ class TestUniformRandomOp(OpTest): self.check_output_customized(self.verify_output) def verify_output(self, outs): - tensor = outs[0] - hist, _ = np.histogram(outs[0], range=(-5, 10)) - hist = hist.astype("float32") - hist /= float(outs[0].size) - prob = 0.1 * np.ones((10)) + hist, prob = output_hist(outs[0]) + self.assertTrue( + np.allclose( + hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) + + +class TestUniformRandomOpSelectedRows(unittest.TestCase): + def get_places(self): + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + return places + + def test_check_output(self): + for place in self.get_places(): + self.check_with_place(place) + + def check_with_place(self, place): + scope = core.Scope() + out = scope.var("X").get_selected_rows() + + op = Operator( + "uniform_random", + Out="X", + shape=[1000, 784], + min=-5.0, + max=10.0, + seed=10) + op.run(scope, place) + out_tensor = out.get_tensor() + hist, prob = output_hist(np.array(out_tensor)) self.assertTrue( np.allclose( hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) From 3f6fc10b9fc6da75961bab0f7a473dc388d07f51 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 10 Apr 2018 14:23:09 +0800 Subject: [PATCH 07/18] new op that init table value randomly --- .../operators/uniform_random_table_op.cc | 144 ++++++++++++++++++ .../unittests/test_uniform_random_table_op.py | 66 ++++++++ 2 files changed, 210 insertions(+) create mode 100644 paddle/fluid/operators/uniform_random_table_op.cc create mode 100644 python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py diff --git a/paddle/fluid/operators/uniform_random_table_op.cc b/paddle/fluid/operators/uniform_random_table_op.cc new file mode 100644 index 0000000000..4664cc5d93 --- /dev/null +++ b/paddle/fluid/operators/uniform_random_table_op.cc @@ -0,0 +1,144 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/platform/device_context.h" + +namespace paddle { +namespace operators { + +class UniformRandomTableInferShape : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *ctx) const override { + VLOG(3) << "Infershape..."; + PADDLE_ENFORCE(ctx->HasOutput("Out"), + "Output(Out) of UniformRandomTableOp should not be null."); + + PADDLE_ENFORCE( + ctx->Attrs().Get("min") < ctx->Attrs().Get("max"), + "uniform_random's min must less then max"); + auto &shape = ctx->Attrs().Get>("shape"); + std::vector temp; + temp.reserve(shape.size()); + for (auto dim : shape) { + temp.push_back(static_cast(dim)); + } + ctx->SetOutputDim("Out", framework::make_ddim(temp)); + } +}; + +class UniformRandomTableOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + VLOG(3) << "RunImpl..."; + auto out = + scope.FindVar(Output("Out"))->GetMutable(); + auto shard_cnt = Attr("shard_cnt"); + auto shard_id = Attr("shard_id"); + auto max_id = Attr("max_id"); + auto shape = Attr>("shape"); + + auto tensor = out->mutable_value(); + tensor->Resize(framework::make_ddim(shape)); + // Only allocate the memory of large table on CPU + auto cpu = platform::CPUPlace(); + float *data = tensor->mutable_data(cpu); + VLOG(3) << "generate seed"; + unsigned int seed = static_cast(Attr("seed")); + std::minstd_rand engine; + if (seed == 0) { + seed = std::random_device()(); + } + engine.seed(seed); + std::uniform_real_distribution dist(Attr("min"), + Attr("max")); + int64_t size = tensor->numel(); + for (int64_t i = 0; i < size; ++i) { + data[i] = dist(engine); + } + // initialize rows by round-robin + // TODO(Yancey1989): need to support other way to distribute Ids + VLOG(3) << "calculate rows_size..."; + int64_t rows_size = 0; + if (max_id % shard_cnt == 0) { + rows_size = max_id / shard_cnt; + } else { + rows_size = max_id / shard_cnt + 1; + } + auto *rows = out->mutable_rows(); + rows->resize(rows_size); + (*rows)[0] = shard_id; + for (int64_t idx = 1; idx < rows_size; ++idx) { + (*rows)[idx] = (*rows)[idx - 1] + shard_cnt; + } + out->set_height(max_id); + } +}; + +class UniformRandomTableOpMaker : public framework::OpProtoAndCheckerMaker { + public: + UniformRandomTableOpMaker(OpProto *proto, OpAttrChecker *op_checker) + : framework::OpProtoAndCheckerMaker(proto, op_checker) { + AddOutput("Out", + "(SelectedRows)" + "The output table of uniform random table op."); + AddComment(R"DOC( +Uniform random operator for initializing a table. + +This operator initializes a SelectedRows with random values sampled from a +uniform distribution. + +)DOC"); + AddAttr("max_id", + "(int, required)" + "The maximal Id for the table."); + AddAttr("shard_cnt", + "(int, required)" + "The count of shards for distributing the table."); + AddAttr("shard_id", "(int, required) The current shard ID."); + AddAttr>("shape", + "(vector) The shape of the output tensor"); + AddAttr("min", + "(float, default -1.0) " + "Minimum value of uniform random") + .SetDefault(-1.0f); + AddAttr("max", + "(float, default 1.0) " + "Maximun value of uniform random") + .SetDefault(1.0f); + AddAttr("seed", + "(int, default 0) " + "Random seed used for generating samples. " + "0 means use a seed generated by the system." + "Note that if seed is not 0, this operator will always " + "generate the same random numbers every time.") + .SetDefault(0); + AddAttr("dtype", "(int, default 5(FP32)) Output tensor data type") + .SetDefault(framework::proto::VarType::FP32); + } +}; +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OPERATOR(uniform_random_table, ops::UniformRandomTableOp, + ops::UniformRandomTableInferShape, + ops::UniformRandomTableOpMaker, + paddle::framework::EmptyGradOpMaker); diff --git a/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py b/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py new file mode 100644 index 0000000000..0474c51e49 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py @@ -0,0 +1,66 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np +from op_test import OpTest +import paddle.fluid.core as core +from paddle.fluid.op import Operator + + +def output_hist(out): + hist, _ = np.histogram(out, range=(-5, 10)) + hist = hist.astype("float32") + hist /= float(out.size) + prob = 0.1 * np.ones((10)) + return hist, prob + + +class TestUniformRandomTableOp(unittest.TestCase): + def get_places(self): + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + return places + + def test_check_output(self): + for place in self.get_places(): + self.check_with_place(place) + + def check_with_place(self, place): + scope = core.Scope() + out = scope.var("X").get_selected_rows() + + op = Operator( + "uniform_random_table", + Out="X", + shape=[4, 784], + min=-5.0, + max=10.0, + seed=10, + shard_cnt=3, + shard_id=1, + max_id=10) + op.run(scope, place) + self.assertEqual(out.rows(), [1, 4, 7, 10]) + self.assertEqual(out.height(), 10) + self.assertEqual(out.get_tensor().shape(), [4, 784]) + hist, prob = output_hist(np.array(out.get_tensor())) + self.assertTrue( + np.allclose( + hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) + + +if __name__ == "__main__": + unittest.main() From cb7bbf426c1be2d4a0989855f6440b0b8313f6b0 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 10 Apr 2018 14:28:35 +0800 Subject: [PATCH 08/18] revert uniform_random_op --- paddle/fluid/operators/uniform_random_op.cc | 13 +----- paddle/fluid/operators/uniform_random_op.cu | 13 +----- .../tests/unittests/test_uniform_random_op.py | 46 ++----------------- 3 files changed, 7 insertions(+), 65 deletions(-) diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index d8b38fb7eb..87699362b2 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -24,19 +24,8 @@ template class CPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - framework::Tensor* tensor(nullptr); - auto out_var = ctx.OutputVar("Out"); - if (out_var->IsType()) { - tensor = ctx.Output("Out"); - } else if (out_var->IsType()) { - auto shape = ctx.Attr>("shape"); - tensor = ctx.Output("Out")->mutable_value(); - tensor->Resize(framework::make_ddim(shape)); - } else { - PADDLE_THROW("Only support LoDTensor and SelectedRows."); - } + auto* tensor = ctx.Output("Out"); T* data = tensor->mutable_data(ctx.GetPlace()); - data[0] = static_cast(1000); unsigned int seed = static_cast(ctx.Attr("seed")); std::minstd_rand engine; if (seed == 0) { diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index 115c859527..1232cd1eb3 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -43,18 +43,7 @@ template class GPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { - framework::Tensor* tensor(nullptr); - auto out_var = ctx.OutputVar("Out"); - if (out_var->IsType()) { - tensor = ctx.Output("Out"); - } else if (out_var->IsType()) { - auto shape = ctx.Attr>("shape"); - tensor = ctx.Output("Out")->mutable_value(); - tensor->Resize(framework::make_ddim(shape)); - } else { - PADDLE_THROW("Only support LoDTensor and SelectedRows."); - } - + auto* tensor = context.Output("Out"); T* data = tensor->mutable_data(context.GetPlace()); unsigned int seed = static_cast(context.Attr("seed")); if (seed == 0) { diff --git a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py index 3331e99c36..75ff85a55f 100644 --- a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py +++ b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py @@ -15,16 +15,6 @@ import unittest import numpy as np from op_test import OpTest -import paddle.fluid.core as core -from paddle.fluid.op import Operator - - -def output_hist(out): - hist, _ = np.histogram(out, range=(-5, 10)) - hist = hist.astype("float32") - hist /= float(out.size) - prob = 0.1 * np.ones((10)) - return hist, prob class TestUniformRandomOp(OpTest): @@ -43,37 +33,11 @@ class TestUniformRandomOp(OpTest): self.check_output_customized(self.verify_output) def verify_output(self, outs): - hist, prob = output_hist(outs[0]) - self.assertTrue( - np.allclose( - hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) - - -class TestUniformRandomOpSelectedRows(unittest.TestCase): - def get_places(self): - places = [core.CPUPlace()] - if core.is_compiled_with_cuda(): - places.append(core.CUDAPlace(0)) - return places - - def test_check_output(self): - for place in self.get_places(): - self.check_with_place(place) - - def check_with_place(self, place): - scope = core.Scope() - out = scope.var("X").get_selected_rows() - - op = Operator( - "uniform_random", - Out="X", - shape=[1000, 784], - min=-5.0, - max=10.0, - seed=10) - op.run(scope, place) - out_tensor = out.get_tensor() - hist, prob = output_hist(np.array(out_tensor)) + tensor = outs[0] + hist, _ = np.histogram(outs[0], range=(-5, 10)) + hist = hist.astype("float32") + hist /= float(outs[0].size) + prob = 0.1 * np.ones((10)) self.assertTrue( np.allclose( hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) From 70500398b63cf8a80a6113ada9e06aa5e98a541e Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 12 Apr 2018 09:54:33 +0800 Subject: [PATCH 09/18] wip --- paddle/fluid/operators/detail/grpc_client.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/detail/grpc_client.cc index 8bbfd1f159..b546aa1d2f 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/detail/grpc_client.cc @@ -35,7 +35,8 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] { + framework::AsyncIO([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, + this] { auto* var = p_scope->FindVar(var_name_val); ::grpc::ByteBuffer req; @@ -90,7 +91,8 @@ bool RPCClient::AsyncGetVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, this] { + framework::AsyncIO([var_name_val, ep_val, p_scope, p_ctx, time_out, ch, + this] { // prepare input sendrecv::VariableMessage req; req.set_varname(var_name_val); @@ -133,8 +135,8 @@ bool RPCClient::AsyncPrefetchVariable(const std::string& ep, const framework::Scope* p_scope = &scope; const auto ch = GetChannel(ep_val); - framework::Async([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, - time_out, ch, this] { + framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope, p_ctx, + time_out, ch, this] { auto* var = p_scope->FindVar(in_var_name_val); ::grpc::ByteBuffer req; @@ -197,7 +199,7 @@ bool RPCClient::Wait() { std::vector> waits(req_count_); for (int i = 0; i < req_count_; i++) { - waits[i] = framework::Async([i, &a, this] { a[i] = Proceed(); }); + waits[i] = framework::AsyncIO([i, &a, this] { a[i] = Proceed(); }); } for (int i = 0; i < req_count_; i++) { From 7132bbe6b7329914fefcd4fa9960afda495d3f89 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 12 Apr 2018 12:20:13 +0800 Subject: [PATCH 10/18] update by comment --- paddle/fluid/operators/uniform_random_op.cc | 12 +- paddle/fluid/operators/uniform_random_op.cu | 12 +- .../operators/uniform_random_table_op.cc | 144 ------------------ .../tests/unittests/test_uniform_random_op.py | 46 +++++- .../unittests/test_uniform_random_table_op.py | 66 -------- 5 files changed, 63 insertions(+), 217 deletions(-) delete mode 100644 paddle/fluid/operators/uniform_random_table_op.cc delete mode 100644 python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index 87699362b2..155690a6f4 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -24,7 +24,17 @@ template class CPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - auto* tensor = ctx.Output("Out"); + framework::Tensor* tensor(nullptr); + auto out_var = ctx.OutputVar("Out"); + if (out_var->IsType()) { + tensor = out_var->GetMutable(); + } else if (out_var->IsType()) { + auto shape = ctx.Attr>("shape"); + tensor = out_var->GetMutable()->mutable_value(); + tensor->Resize(framework::make_ddim(shape)); + } else { + PADDLE_THROW("Only support SelectedRows and Tensor"); + } T* data = tensor->mutable_data(ctx.GetPlace()); unsigned int seed = static_cast(ctx.Attr("seed")); std::minstd_rand engine; diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index 1232cd1eb3..33971be3e0 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -43,7 +43,17 @@ template class GPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { - auto* tensor = context.Output("Out"); + framework::Tensor* tensor(nullptr); + auto out_var = ctx.OutputVar("Out"); + if (out_var->IsType()) { + tensor = out_var->GetMutable(); + } else if (out_var->IsType()) { + auto shape = ctx.Attr>("shape"); + tensor = out_var->GetMutable()->mutable_value(); + tensor->Resize(framework::make_ddim(shape)); + } else { + PADDLE_THROW("Only support SelectedRows and Tensor"); + } T* data = tensor->mutable_data(context.GetPlace()); unsigned int seed = static_cast(context.Attr("seed")); if (seed == 0) { diff --git a/paddle/fluid/operators/uniform_random_table_op.cc b/paddle/fluid/operators/uniform_random_table_op.cc deleted file mode 100644 index 4664cc5d93..0000000000 --- a/paddle/fluid/operators/uniform_random_table_op.cc +++ /dev/null @@ -1,144 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/framework/data_type.h" -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/operators/math/math_function.h" -#include "paddle/fluid/platform/device_context.h" - -namespace paddle { -namespace operators { - -class UniformRandomTableInferShape : public framework::InferShapeBase { - public: - void operator()(framework::InferShapeContext *ctx) const override { - VLOG(3) << "Infershape..."; - PADDLE_ENFORCE(ctx->HasOutput("Out"), - "Output(Out) of UniformRandomTableOp should not be null."); - - PADDLE_ENFORCE( - ctx->Attrs().Get("min") < ctx->Attrs().Get("max"), - "uniform_random's min must less then max"); - auto &shape = ctx->Attrs().Get>("shape"); - std::vector temp; - temp.reserve(shape.size()); - for (auto dim : shape) { - temp.push_back(static_cast(dim)); - } - ctx->SetOutputDim("Out", framework::make_ddim(temp)); - } -}; - -class UniformRandomTableOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope &scope, - const platform::Place &dev_place) const override { - VLOG(3) << "RunImpl..."; - auto out = - scope.FindVar(Output("Out"))->GetMutable(); - auto shard_cnt = Attr("shard_cnt"); - auto shard_id = Attr("shard_id"); - auto max_id = Attr("max_id"); - auto shape = Attr>("shape"); - - auto tensor = out->mutable_value(); - tensor->Resize(framework::make_ddim(shape)); - // Only allocate the memory of large table on CPU - auto cpu = platform::CPUPlace(); - float *data = tensor->mutable_data(cpu); - VLOG(3) << "generate seed"; - unsigned int seed = static_cast(Attr("seed")); - std::minstd_rand engine; - if (seed == 0) { - seed = std::random_device()(); - } - engine.seed(seed); - std::uniform_real_distribution dist(Attr("min"), - Attr("max")); - int64_t size = tensor->numel(); - for (int64_t i = 0; i < size; ++i) { - data[i] = dist(engine); - } - // initialize rows by round-robin - // TODO(Yancey1989): need to support other way to distribute Ids - VLOG(3) << "calculate rows_size..."; - int64_t rows_size = 0; - if (max_id % shard_cnt == 0) { - rows_size = max_id / shard_cnt; - } else { - rows_size = max_id / shard_cnt + 1; - } - auto *rows = out->mutable_rows(); - rows->resize(rows_size); - (*rows)[0] = shard_id; - for (int64_t idx = 1; idx < rows_size; ++idx) { - (*rows)[idx] = (*rows)[idx - 1] + shard_cnt; - } - out->set_height(max_id); - } -}; - -class UniformRandomTableOpMaker : public framework::OpProtoAndCheckerMaker { - public: - UniformRandomTableOpMaker(OpProto *proto, OpAttrChecker *op_checker) - : framework::OpProtoAndCheckerMaker(proto, op_checker) { - AddOutput("Out", - "(SelectedRows)" - "The output table of uniform random table op."); - AddComment(R"DOC( -Uniform random operator for initializing a table. - -This operator initializes a SelectedRows with random values sampled from a -uniform distribution. - -)DOC"); - AddAttr("max_id", - "(int, required)" - "The maximal Id for the table."); - AddAttr("shard_cnt", - "(int, required)" - "The count of shards for distributing the table."); - AddAttr("shard_id", "(int, required) The current shard ID."); - AddAttr>("shape", - "(vector) The shape of the output tensor"); - AddAttr("min", - "(float, default -1.0) " - "Minimum value of uniform random") - .SetDefault(-1.0f); - AddAttr("max", - "(float, default 1.0) " - "Maximun value of uniform random") - .SetDefault(1.0f); - AddAttr("seed", - "(int, default 0) " - "Random seed used for generating samples. " - "0 means use a seed generated by the system." - "Note that if seed is not 0, this operator will always " - "generate the same random numbers every time.") - .SetDefault(0); - AddAttr("dtype", "(int, default 5(FP32)) Output tensor data type") - .SetDefault(framework::proto::VarType::FP32); - } -}; -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators; -REGISTER_OPERATOR(uniform_random_table, ops::UniformRandomTableOp, - ops::UniformRandomTableInferShape, - ops::UniformRandomTableOpMaker, - paddle::framework::EmptyGradOpMaker); diff --git a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py index 75ff85a55f..346a949b6e 100644 --- a/python/paddle/fluid/tests/unittests/test_uniform_random_op.py +++ b/python/paddle/fluid/tests/unittests/test_uniform_random_op.py @@ -15,6 +15,16 @@ import unittest import numpy as np from op_test import OpTest +import paddle.fluid.core as core +from paddle.fluid.op import Operator + + +def output_hist(out): + hist, _ = np.histogram(out, range=(-5, 10)) + hist = hist.astype("float32") + hist /= float(out.size) + prob = 0.1 * np.ones((10)) + return hist, prob class TestUniformRandomOp(OpTest): @@ -33,11 +43,37 @@ class TestUniformRandomOp(OpTest): self.check_output_customized(self.verify_output) def verify_output(self, outs): - tensor = outs[0] - hist, _ = np.histogram(outs[0], range=(-5, 10)) - hist = hist.astype("float32") - hist /= float(outs[0].size) - prob = 0.1 * np.ones((10)) + hist, prob = output_hist(np.array(outs[0])) + self.assertTrue( + np.allclose( + hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) + + +class TestUniformRandomOpSelectedRows(unittest.TestCase): + def get_places(self): + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + return places + + def test_check_output(self): + for place in self.get_places(): + self.check_with_place(place) + + def check_with_place(self, place): + scope = core.Scope() + out = scope.var("X").get_selected_rows() + + op = Operator( + "uniform_random", + Out="X", + shape=[4, 784], + min=-5.0, + max=10.0, + seed=10) + op.run(scope, place) + self.assertEqual(out.get_tensor().shape(), [4, 784]) + hist, prob = output_hist(np.array(out.get_tensor())) self.assertTrue( np.allclose( hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) diff --git a/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py b/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py deleted file mode 100644 index 0474c51e49..0000000000 --- a/python/paddle/fluid/tests/unittests/test_uniform_random_table_op.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import numpy as np -from op_test import OpTest -import paddle.fluid.core as core -from paddle.fluid.op import Operator - - -def output_hist(out): - hist, _ = np.histogram(out, range=(-5, 10)) - hist = hist.astype("float32") - hist /= float(out.size) - prob = 0.1 * np.ones((10)) - return hist, prob - - -class TestUniformRandomTableOp(unittest.TestCase): - def get_places(self): - places = [core.CPUPlace()] - if core.is_compiled_with_cuda(): - places.append(core.CUDAPlace(0)) - return places - - def test_check_output(self): - for place in self.get_places(): - self.check_with_place(place) - - def check_with_place(self, place): - scope = core.Scope() - out = scope.var("X").get_selected_rows() - - op = Operator( - "uniform_random_table", - Out="X", - shape=[4, 784], - min=-5.0, - max=10.0, - seed=10, - shard_cnt=3, - shard_id=1, - max_id=10) - op.run(scope, place) - self.assertEqual(out.rows(), [1, 4, 7, 10]) - self.assertEqual(out.height(), 10) - self.assertEqual(out.get_tensor().shape(), [4, 784]) - hist, prob = output_hist(np.array(out.get_tensor())) - self.assertTrue( - np.allclose( - hist, prob, rtol=0, atol=0.01), "hist: " + str(hist)) - - -if __name__ == "__main__": - unittest.main() From 9e9f5d8080995e71b3a7ef8fd20a0a02f33f107f Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 12 Apr 2018 12:43:16 +0800 Subject: [PATCH 11/18] fix ci --- paddle/fluid/operators/uniform_random_op.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index 33971be3e0..00011bbe61 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -44,11 +44,11 @@ class GPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { framework::Tensor* tensor(nullptr); - auto out_var = ctx.OutputVar("Out"); + auto out_var = context.OutputVar("Out"); if (out_var->IsType()) { tensor = out_var->GetMutable(); } else if (out_var->IsType()) { - auto shape = ctx.Attr>("shape"); + auto shape = context.Attr>("shape"); tensor = out_var->GetMutable()->mutable_value(); tensor->Resize(framework::make_ddim(shape)); } else { From 339be6254ea5e3432e4cbe44f35609bb45662e12 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Thu, 12 Apr 2018 05:58:26 +0000 Subject: [PATCH 12/18] Refine the order of arguments. --- paddle/fluid/framework/executor.cc | 5 ++--- paddle/fluid/framework/executor.h | 4 ++-- paddle/fluid/inference/tests/test_helper.h | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 910012927b..34bba77f40 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -359,9 +359,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void Executor::RunPreparedContext( ExecutorPrepareContext* ctx, Scope* scope, std::map& feed_targets, - std::map& fetch_targets, - const std::string& feed_holder_name, const std::string& fetch_holder_name, - bool create_vars) { + std::map& fetch_targets, bool create_vars, + const std::string& feed_holder_name, const std::string& fetch_holder_name) { auto& global_block = ctx->prog_.Block(ctx->block_id_); PADDLE_ENFORCE( diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index cbd70d9544..8b3ea01542 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -73,9 +73,9 @@ class Executor { void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, std::map& feed_targets, std::map& fetch_targets, + bool create_vars = true, const std::string& feed_holder_name = "feed", - const std::string& fetch_holder_name = "fetch", - bool create_vars = true); + const std::string& fetch_holder_name = "fetch"); private: const platform::Place place_; diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index 09fe344ec7..9875e43860 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -178,8 +178,8 @@ void TestInference(const std::string& dirname, std::unique_ptr ctx; if (PrepareContext) { ctx = executor.Prepare(*inference_program, 0); - executor.RunPreparedContext(ctx.get(), scope, feed_targets, - fetch_targets); + executor.RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, + CreateVars); } else { executor.Run(*inference_program, scope, feed_targets, fetch_targets, CreateVars); @@ -198,7 +198,7 @@ void TestInference(const std::string& dirname, // Note: if you changed the inference_program, you need to call // executor.Prepare() again to get a new ExecutorPrepareContext. executor.RunPreparedContext(ctx.get(), scope, feed_targets, - fetch_targets); + fetch_targets, CreateVars); } else { executor.Run(*inference_program, scope, feed_targets, fetch_targets, CreateVars); From 26cfc634b9f4dc02b051b49f54e33b57938e5ff2 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 12 Apr 2018 14:48:26 +0800 Subject: [PATCH 13/18] multi stream thread pool --- paddle/fluid/framework/threadpool.cc | 10 +++++++--- paddle/fluid/framework/threadpool.h | 10 +++++----- paddle/fluid/operators/detail/grpc_server.cc | 2 +- .../paddle/fluid/tests/book/test_recognize_digits.py | 1 - 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/framework/threadpool.cc b/paddle/fluid/framework/threadpool.cc index 0a8377cc47..109c2c745c 100644 --- a/paddle/fluid/framework/threadpool.cc +++ b/paddle/fluid/framework/threadpool.cc @@ -14,8 +14,12 @@ #include "paddle/fluid/framework/threadpool.h" +#include "gflags/gflags.h" #include "paddle/fluid/platform/enforce.h" +DEFINE_int32(io_threadpool_size, 100, + "number of threads used for doing IO, default 100"); + namespace paddle { namespace framework { @@ -94,15 +98,15 @@ void ThreadPool::TaskLoop() { std::unique_ptr MultiStreamThreadPool::io_threadpool_(nullptr); std::once_flag MultiStreamThreadPool::io_init_flag_; -MultiStreamThreadPool* MultiStreamThreadPool::GetInstanceIO() { +ThreadPool* MultiStreamThreadPool::GetInstanceIO() { std::call_once(io_init_flag_, &MultiStreamThreadPool::InitIO); - return static_cast(io_threadpool_.get()); + return io_threadpool_.get(); } void MultiStreamThreadPool::InitIO() { if (io_threadpool_.get() == nullptr) { // TODO(typhoonzero1986): make this configurable - io_threadpool_.reset(new ThreadPool(100)); + io_threadpool_.reset(new ThreadPool(FLAGS_io_threadpool_size)); } } diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 0a60488d9f..1cc058834c 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -14,12 +14,12 @@ limitations under the License. */ #pragma once -#include +#include // NOLINT #include -#include -#include +#include // NOLINT +#include // NOLINT #include -#include +#include // NOLINT #include #include "glog/logging.h" #include "paddle/fluid/platform/enforce.h" @@ -137,7 +137,7 @@ class ThreadPool { class MultiStreamThreadPool : ThreadPool { public: - static MultiStreamThreadPool* GetInstanceIO(); + static ThreadPool* GetInstanceIO(); static void InitIO(); private: diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index d5fc163bc2..36dad5dd43 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -216,10 +216,10 @@ void AsyncGRPCServer::RunSyncUpdate() { std::function prefetch_register = std::bind(&AsyncGRPCServer::TryToRegisterNewPrefetchOne, this); + // TODO(wuyi): Run these "HandleRequest" in thread pool t_send_.reset( new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_send_.get(), "cq_send", send_register))); - t_get_.reset( new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, cq_get_.get(), "cq_get", get_register))); diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index e4997b4069..5ec6890c1b 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -157,7 +157,6 @@ def train(nn_type, for ip in pserver_ips.split(","): eplist.append(':'.join([ip, port])) pserver_endpoints = ",".join(eplist) # ip:port,ip:port... - pserver_endpoints = os.getenv("PSERVERS") trainers = int(os.getenv("TRAINERS")) current_endpoint = os.getenv("POD_IP") + ":" + port trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) From 449bdde58accc9beb94d56c8ef33c0bde4c007b7 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Thu, 12 Apr 2018 06:15:24 +0000 Subject: [PATCH 14/18] Correct some typos. --- cmake/cblas.cmake | 2 +- paddle/fluid/framework/executor.cc | 19 +++++++++++-------- paddle/fluid/framework/executor.h | 3 +++ paddle/fluid/inference/io.cc | 2 +- paddle/fluid/inference/tests/test_helper.h | 2 +- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cmake/cblas.cmake b/cmake/cblas.cmake index 52a22c1fbf..e3b9d94215 100644 --- a/cmake/cblas.cmake +++ b/cmake/cblas.cmake @@ -78,7 +78,7 @@ if(NOT CMAKE_CROSSCOMPILING) /usr/lib/reference/ ) else() - # Diable the finding of reference cblas under host's system path + # Disable the finding of reference cblas under host's system path set(REFERENCE_CBLAS_INCLUDE_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/include) set(REFERENCE_CBLAS_LIB_SEARCH_PATHS ${REFERENCE_CBLAS_ROOT}/lib) endif() diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 34bba77f40..513e720fd0 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -83,8 +83,8 @@ static void CheckTensorNANOrInf(const std::string& name, if (tensor.memory_size() == 0) { return; } - if (tensor.type().hash_code() != typeid(float).hash_code() && - tensor.type().hash_code() != typeid(double).hash_code()) { + if (tensor.type().hash_code() != typeid(float).hash_code() && // NOLINT + tensor.type().hash_code() != typeid(double).hash_code()) { // NOLINT return; } PADDLE_ENFORCE(!framework::TensorContainsInf(tensor), @@ -145,12 +145,13 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, // Return true if the block has feed operators and holder of matching info. static bool has_feed_operators( const BlockDesc& block, - std::map& feed_targets, + const std::map& feed_targets, const std::string& feed_holder_name) { size_t feed_count = 0; for (auto* op : block.AllOps()) { if (op->Type() == kFeedOpType) { feed_count++; + // The input variable's name of feed_op should be feed_holder_name. PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name, "Input to feed op should be '%s'", feed_holder_name); std::string feed_target_name = op->Output("Out")[0]; @@ -167,7 +168,7 @@ static bool has_feed_operators( "The number of feed operators should match 'feed_targets'"); if (!feed_holder_name.empty()) { - // When feed operator are present, so should be feed_holder + // When feed operator are present, so should be feed_holder. auto var = block.FindVar(feed_holder_name); PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", feed_holder_name); @@ -187,12 +188,14 @@ static bool has_feed_operators( // and fetch_holder_name. Raise exception when any mismatch is found. // Return true if the block has fetch operators and holder of matching info. static bool has_fetch_operators( - const BlockDesc& block, std::map& fetch_targets, + const BlockDesc& block, + const std::map& fetch_targets, const std::string& fetch_holder_name) { size_t fetch_count = 0; for (auto* op : block.AllOps()) { if (op->Type() == kFetchOpType) { fetch_count++; + // The output variable's name of fetch_op should be fetch_holder_name. PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name, "Output of fetch op should be '%s'", fetch_holder_name); std::string fetch_target_name = op->Input("X")[0]; @@ -209,7 +212,7 @@ static bool has_fetch_operators( "The number of fetch operators should match 'fetch_targets'"); if (!fetch_holder_name.empty()) { - // When fetch operator are present, so should be fetch_holder + // When fetch operator are present, so should be fetch_holder. auto var = block.FindVar(fetch_holder_name); PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", fetch_holder_name); @@ -287,8 +290,8 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } auto ctx = Prepare(*copy_program, 0); - RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, - feed_holder_name, fetch_holder_name, create_vars); + RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, create_vars, + feed_holder_name, fetch_holder_name); } std::unique_ptr Executor::Prepare( diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 8b3ea01542..43defdacf2 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -14,6 +14,9 @@ limitations under the License. */ #pragma once +#include +#include +#include #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" diff --git a/paddle/fluid/inference/io.cc b/paddle/fluid/inference/io.cc index a29d457b6f..3b58019db6 100644 --- a/paddle/fluid/inference/io.cc +++ b/paddle/fluid/inference/io.cc @@ -23,7 +23,7 @@ limitations under the License. */ namespace paddle { namespace inference { -// Temporarilly add this function for exposing framework::InitDevices() when +// Temporarily add this function for exposing framework::InitDevices() when // linking the inference shared library. void Init(bool init_p2p) { framework::InitDevices(init_p2p); } diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index 9875e43860..c3a8d0889c 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -195,7 +195,7 @@ void TestInference(const std::string& dirname, paddle::platform::DeviceContextPool::Instance().Get(place)); if (PrepareContext) { - // Note: if you changed the inference_program, you need to call + // Note: if you change the inference_program, you need to call // executor.Prepare() again to get a new ExecutorPrepareContext. executor.RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, CreateVars); From 8eac2a46f7f6945cf2c553d8716be02b96791813 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 12 Apr 2018 19:52:40 +0800 Subject: [PATCH 15/18] update by comment --- paddle/fluid/operators/uniform_random_op.cc | 6 ++++-- paddle/fluid/operators/uniform_random_op.cu | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index 155690a6f4..acaefaacda 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -24,7 +24,7 @@ template class CPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& ctx) const override { - framework::Tensor* tensor(nullptr); + framework::Tensor* tensor = nullptr; auto out_var = ctx.OutputVar("Out"); if (out_var->IsType()) { tensor = out_var->GetMutable(); @@ -33,7 +33,9 @@ class CPUUniformRandomKernel : public framework::OpKernel { tensor = out_var->GetMutable()->mutable_value(); tensor->Resize(framework::make_ddim(shape)); } else { - PADDLE_THROW("Only support SelectedRows and Tensor"); + PADDLE_THROW( + "uniform_random_op's output only" + "supports SelectedRows and Tensor"); } T* data = tensor->mutable_data(ctx.GetPlace()); unsigned int seed = static_cast(ctx.Attr("seed")); diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index 00011bbe61..e1c7323a30 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -43,7 +43,7 @@ template class GPUUniformRandomKernel : public framework::OpKernel { public: void Compute(const framework::ExecutionContext& context) const override { - framework::Tensor* tensor(nullptr); + framework::Tensor* tensor = nullptr; auto out_var = context.OutputVar("Out"); if (out_var->IsType()) { tensor = out_var->GetMutable(); @@ -52,7 +52,9 @@ class GPUUniformRandomKernel : public framework::OpKernel { tensor = out_var->GetMutable()->mutable_value(); tensor->Resize(framework::make_ddim(shape)); } else { - PADDLE_THROW("Only support SelectedRows and Tensor"); + PADDLE_THROW( + "uniform_random_op's output only" + "supports SelectedRows and Tensor"); } T* data = tensor->mutable_data(context.GetPlace()); unsigned int seed = static_cast(context.Attr("seed")); From c20cc2bd8a018f078e3916e01579df8faab66f92 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Fri, 13 Apr 2018 05:45:52 +0000 Subject: [PATCH 16/18] Add Wait() for reshape_op --- paddle/fluid/operators/reshape_op.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paddle/fluid/operators/reshape_op.h b/paddle/fluid/operators/reshape_op.h index 9abc78421a..8320c257c9 100644 --- a/paddle/fluid/operators/reshape_op.h +++ b/paddle/fluid/operators/reshape_op.h @@ -147,6 +147,7 @@ class ReshapeKernel : public framework::OpKernel { if (!inplace) { out->mutable_data(ctx.GetPlace()); framework::TensorCopy(*in, ctx.GetPlace(), ctx.device_context(), out); + ctx.device_context().Wait(); // TensorCopy will resize to in_dims. out->Resize(out_dims); } else { @@ -169,6 +170,7 @@ class ReshapeGradKernel : public framework::OpKernel { auto in_dims = d_x->dims(); if (!inplace) { framework::TensorCopy(*d_out, ctx.GetPlace(), ctx.device_context(), d_x); + ctx.device_context().Wait(); d_x->Resize(in_dims); } else { d_x->ShareDataWith(*d_out); From a08bf76f74cbdd4db4a773a4557b4ad6551ce679 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 13 Apr 2018 13:52:39 +0800 Subject: [PATCH 17/18] refine name --- paddle/fluid/framework/threadpool.cc | 10 +++++----- paddle/fluid/framework/threadpool.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/threadpool.cc b/paddle/fluid/framework/threadpool.cc index 109c2c745c..f26f212d4d 100644 --- a/paddle/fluid/framework/threadpool.cc +++ b/paddle/fluid/framework/threadpool.cc @@ -95,15 +95,15 @@ void ThreadPool::TaskLoop() { } } -std::unique_ptr MultiStreamThreadPool::io_threadpool_(nullptr); -std::once_flag MultiStreamThreadPool::io_init_flag_; +std::unique_ptr ThreadPoolIO::io_threadpool_(nullptr); +std::once_flag ThreadPoolIO::io_init_flag_; -ThreadPool* MultiStreamThreadPool::GetInstanceIO() { - std::call_once(io_init_flag_, &MultiStreamThreadPool::InitIO); +ThreadPool* ThreadPoolIO::GetInstanceIO() { + std::call_once(io_init_flag_, &ThreadPoolIO::InitIO); return io_threadpool_.get(); } -void MultiStreamThreadPool::InitIO() { +void ThreadPoolIO::InitIO() { if (io_threadpool_.get() == nullptr) { // TODO(typhoonzero1986): make this configurable io_threadpool_.reset(new ThreadPool(FLAGS_io_threadpool_size)); diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 1cc058834c..94111ee335 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -135,7 +135,7 @@ class ThreadPool { std::condition_variable completed_; }; -class MultiStreamThreadPool : ThreadPool { +class ThreadPoolIO : ThreadPool { public: static ThreadPool* GetInstanceIO(); static void InitIO(); @@ -156,7 +156,7 @@ std::future Async(Callback callback) { template std::future AsyncIO(Callback callback) { - return MultiStreamThreadPool::GetInstanceIO()->Run(callback); + return ThreadPoolIO::GetInstanceIO()->Run(callback); } } // namespace framework From 3fa0ef3d7102615848f8793c1151c6ec069cd296 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Fri, 13 Apr 2018 06:40:50 +0000 Subject: [PATCH 18/18] Refine double_buffer code --- .../reader/create_double_buffer_reader_op.cc | 62 +++++++------------ 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 33a50b5ceb..0b7c1d6af7 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -33,28 +33,14 @@ static constexpr size_t kChannelSize = 0; // kCacheSize - 2 class DoubleBufferReader : public framework::DecoratedReader { public: - struct Item { - Item() : ctx_(nullptr) {} - Item(Item&& b) { - payloads_ = std::move(b.payloads_); - ctx_ = std::move(b.ctx_); - } - Item& operator=(Item&& b) { - payloads_ = std::move(b.payloads_); - ctx_ = std::move(b.ctx_); - return *this; - } - - std::vector payloads_; - platform::DeviceContext* ctx_; - }; - explicit DoubleBufferReader( ReaderBase* reader, platform::Place target_place = platform::CPUPlace()) : DecoratedReader(reader), place_(target_place) { + cpu_tensor_cache_.resize(kCacheSize); + gpu_tensor_cache_.resize(kCacheSize); #ifdef PADDLE_WITH_CUDA - for (size_t i = 0; i < kCacheSize; ++i) { - if (platform::is_gpu_place(place_)) { + if (platform::is_gpu_place(place_)) { + for (size_t i = 0; i < kCacheSize; ++i) { ctxs_.emplace_back(new platform::CUDADeviceContext( boost::get(place_))); } @@ -72,7 +58,7 @@ class DoubleBufferReader : public framework::DecoratedReader { bool HasNext() const; void StartPrefetcher() { - channel_ = framework::MakeChannel(kChannelSize); + channel_ = framework::MakeChannel(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); } @@ -88,8 +74,10 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); std::thread prefetcher_; - framework::Channel* channel_; + framework::Channel* channel_; platform::Place place_; + std::vector> cpu_tensor_cache_; + std::vector> gpu_tensor_cache_; std::vector> ctxs_; }; @@ -153,11 +141,14 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { void DoubleBufferReader::ReadNext(std::vector* out) { out->clear(); if (HasNext()) { - Item batch; - channel_->Receive(&batch); - *out = batch.payloads_; - if (batch.ctx_) { - batch.ctx_->Wait(); + size_t cached_tensor_id; + channel_->Receive(&cached_tensor_id); + if (platform::is_gpu_place(place_)) { + *out = gpu_tensor_cache_[cached_tensor_id]; + ctxs_[cached_tensor_id]->Wait(); + } else { + // CPU place + *out = cpu_tensor_cache_[cached_tensor_id]; } } } @@ -176,42 +167,33 @@ bool DoubleBufferReader::HasNext() const { void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; - std::vector> cpu_tensor_cache(kCacheSize); - std::vector> gpu_tensor_cache(kCacheSize); size_t cached_tensor_id = 0; - while (true) { - Item batch; - auto& cpu_batch = cpu_tensor_cache[cached_tensor_id]; + auto& cpu_batch = cpu_tensor_cache_[cached_tensor_id]; reader_->ReadNext(&cpu_batch); if (cpu_batch.empty()) { // The underlying reader have no next data. break; } if (platform::is_gpu_place(place_)) { - auto& gpu_batch = gpu_tensor_cache[cached_tensor_id]; + auto& gpu_batch = gpu_tensor_cache_[cached_tensor_id]; auto* gpu_ctx = ctxs_[cached_tensor_id].get(); gpu_batch.resize(cpu_batch.size()); for (size_t i = 0; i < cpu_batch.size(); ++i) { framework::TensorCopy(cpu_batch[i], place_, *gpu_ctx, &gpu_batch[i]); gpu_batch[i].set_lod(cpu_batch[i].lod()); } - batch.payloads_ = gpu_batch; - batch.ctx_ = gpu_ctx; - } else { - // CPUPlace - batch.payloads_ = cpu_batch; } - ++cached_tensor_id; - cached_tensor_id %= kCacheSize; - try { - channel_->Send(&batch); + size_t tmp = cached_tensor_id; + channel_->Send(&tmp); } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread will terminate."; break; } + ++cached_tensor_id; + cached_tensor_id %= kCacheSize; } channel_->Close(); VLOG(5) << "Prefetch thread terminates.";