From fc3e7341fc500f5459138027773b2143985350bb Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Fri, 6 Jul 2018 15:36:27 +0800 Subject: [PATCH 01/22] fix compile warning in inference related codes --- cmake/external/anakin.cmake | 12 +++++++++++- .../inference/test_paddle_inference_api_impl.cc | 2 +- .../analysis/fluid_to_data_flow_graph_pass_tester.cc | 2 +- .../inference/analysis/subgraph_splitter_tester.cc | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cmake/external/anakin.cmake b/cmake/external/anakin.cmake index d205e39582..fb3d8ef8d5 100644 --- a/cmake/external/anakin.cmake +++ b/cmake/external/anakin.cmake @@ -7,7 +7,17 @@ set(ANAKIN_INSTALL_DIR "${THIRD_PARTY_PATH}/install/anakin" CACHE PATH set(ANAKIN_INCLUDE "${ANAKIN_INSTALL_DIR}" CACHE STRING "root of Anakin header files") set(ANAKIN_LIBRARY "${ANAKIN_INSTALL_DIR}" CACHE STRING "path of Anakin library") -set(ANAKIN_COMPILE_EXTRA_FLAGS -Wno-error=unused-variable -Wno-error=format-extra-args -Wno-error=comment -Wno-error=format -Wno-error=switch -Wno-error=return-type -Wno-error=non-virtual-dtor -Wno-reorder -Wno-error=cpp) +set(ANAKIN_COMPILE_EXTRA_FLAGS + -Wno-error=unused-variable -Wno-unused-variable + -Wno-error=format-extra-args -Wno-format-extra-args + -Wno-error=comment -Wno-comment + -Wno-error=format -Wno-format + -Wno-error=switch -Wno-switch + -Wno-error=return-type -Wno-return-type + -Wno-error=non-virtual-dtor -Wno-non-virtual-dtor + -Wno-sign-compare + -Wno-reorder + -Wno-error=cpp) set(ANAKIN_LIBRARY_URL "https://github.com/pangge/Anakin/releases/download/3.0/anakin_release_simple.tar.gz") diff --git a/paddle/contrib/inference/test_paddle_inference_api_impl.cc b/paddle/contrib/inference/test_paddle_inference_api_impl.cc index 88c4e665a3..c3649dcb96 100644 --- a/paddle/contrib/inference/test_paddle_inference_api_impl.cc +++ b/paddle/contrib/inference/test_paddle_inference_api_impl.cc @@ -249,7 +249,7 @@ void MainThreadsImageClassification(bool use_gpu) { const size_t len = local_outputs[0].data.length(); float* data = static_cast(local_outputs[0].data.data()); float* ref_data = refs[tid].data(); - EXPECT_EQ(refs[tid].numel(), len / sizeof(float)); + EXPECT_EQ((size_t)refs[tid].numel(), len / sizeof(float)); for (int i = 0; i < refs[tid].numel(); ++i) { EXPECT_NEAR(ref_data[i], data[i], 1e-3); } diff --git a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc index cfbbc284e4..cbca5abdd5 100644 --- a/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc +++ b/paddle/fluid/inference/analysis/fluid_to_data_flow_graph_pass_tester.cc @@ -27,7 +27,7 @@ TEST_F(DFG_Tester, Init) { DataFlowGraph graph; pass.Run(&graph); // Analysis is sensitive to ProgramDesc, careful to change the original model. - ASSERT_EQ(graph.nodes.size(), 37); + ASSERT_EQ(graph.nodes.size(), 37UL); pass.Finalize(); LOG(INFO) << '\n' << graph.DotString(); } diff --git a/paddle/fluid/inference/analysis/subgraph_splitter_tester.cc b/paddle/fluid/inference/analysis/subgraph_splitter_tester.cc index 8134494f8b..67dd4da54b 100644 --- a/paddle/fluid/inference/analysis/subgraph_splitter_tester.cc +++ b/paddle/fluid/inference/analysis/subgraph_splitter_tester.cc @@ -82,7 +82,7 @@ TEST_F(DFG_Tester, Fuse) { // At least one nodes should be deleted. ASSERT_EQ(dfg.nodes.size(), count0 + 1); // added a new FunctionBlock - ASSERT_EQ(6UL, count1); + ASSERT_EQ(6, count1); } } // namespace analysis From 28b7f87ccf80883967b0774c64597d5f70bcc328 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 6 Jul 2018 19:07:45 +0800 Subject: [PATCH 02/22] fix mac build and whl pkg --- python/setup.py.in | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/python/setup.py.in b/python/setup.py.in index 5506443733..8f80502412 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -126,10 +126,11 @@ if '${WITH_MKL}' == 'ON': shutil.copy('${MKLML_IOMP_LIB}', libs_path) package_data['paddle.libs']+=['libmklml_intel.so','libiomp5.so'] if '${WITH_MKLDNN}' == 'ON': - # change rpath of libmkldnn.so.0, add $ORIGIN/ to it. - # The reason is that all thirdparty libraries in the same directory, - # thus, libmkldnn.so.0 will find libmklml_intel.so and libiomp5.so. - command = "patchelf --set-rpath '$ORIGIN/' ${MKLDNN_SHARED_LIB}" + if "@APPLE@" != "1": + # change rpath of libmkldnn.so.0, add $ORIGIN/ to it. + # The reason is that all thirdparty libraries in the same directory, + # thus, libmkldnn.so.0 will find libmklml_intel.so and libiomp5.so. + command = "patchelf --set-rpath '$ORIGIN/' ${MKLDNN_SHARED_LIB}" if os.system(command) != 0: raise Exception("patchelf --set-rpath for libmkldnn.so.0 fails") package_data['paddle.libs']+=['libmkldnn.so.0'] @@ -142,7 +143,10 @@ package_dir['paddle.libs']=libs_path # The reason is that libwarpctc.so, libiomp5.so etc are in paddle.libs, and # core.so is in paddle.fluid, thus paddle/fluid/../libs will pointer to above libraries. # This operation will fix https://github.com/PaddlePaddle/Paddle/issues/3213 -command = "patchelf --set-rpath '$ORIGIN/../libs/' ${PADDLE_BINARY_DIR}/python/paddle/fluid/core.so" +if "@APPLE@" == "1": + command = "install_name_tool -id \"@loader_path/../libs/\" ${PADDLE_BINARY_DIR}/python/paddle/fluid/core.so" +else: + command = "patchelf --set-rpath '$ORIGIN/../libs/' ${PADDLE_BINARY_DIR}/python/paddle/fluid/core.so" if os.system(command) != 0: raise Exception("patchelf --set-rpath for core.so fails") From 2bbe5f77e79d33bf5cc9fb907abc3230c31ea0bb Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Sat, 7 Jul 2018 14:20:22 +0800 Subject: [PATCH 03/22] Add GetEndPoints of Reader. We can get endpoints of a reader chain. --- paddle/fluid/framework/CMakeLists.txt | 1 + paddle/fluid/framework/reader.cc | 30 ++++++++++++++++ paddle/fluid/framework/reader.h | 17 +++++++++ paddle/fluid/framework/reader_test.cc | 50 +++++++++++++++++++++++++++ 4 files changed, 98 insertions(+) create mode 100644 paddle/fluid/framework/reader_test.cc diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 397c9f7394..ec252929d5 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -27,6 +27,7 @@ cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) cc_library(reader SRCS reader.cc DEPS lod_tensor ddim) +cc_test(reader_test SRCS reader_test.cc DEPS reader) cc_test(variable_test SRCS variable_test.cc) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 0b36f1116d..2e2aa1cba1 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -13,11 +13,40 @@ // limitations under the License. #include "paddle/fluid/framework/reader.h" +#include namespace paddle { namespace framework { ReaderBase::~ReaderBase() {} +void ReaderBase::InsertDecoratedReader(ReaderBase *decorated_reader) { + decorated_readers_.emplace(decorated_reader); +} +void ReaderBase::EraseDecoratedReader(ReaderBase *decorated_reader) { + auto it = decorated_readers_.find(decorated_reader); + PADDLE_ENFORCE(it != decorated_readers_.end(), + "Cannot find the decorated reader to erase"); + decorated_readers_.erase(it); +} +std::unordered_set ReaderBase::GetEndPoints() { + std::unordered_set result; + std::deque queue; + queue.emplace_back(this); + while (!queue.empty()) { // BFS search + auto *front = queue.front(); + queue.pop_front(); + if (front->decorated_readers_.empty()) { + result.emplace(front); + } else { + for (ReaderBase *reader : front->decorated_readers_) { + queue.emplace_back(reader); + } + } + } + + return result; +} + FileReader::FileReader(const std::vector &dims) : dims_(dims) {} void FileReader::ReadNext(std::vector *out) { @@ -37,5 +66,6 @@ void FileReader::ReadNext(std::vector *out) { } } } +DecoratedReader::~DecoratedReader() { reader_->EraseDecoratedReader(this); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 64d4ceab62..2a65c58e3f 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "paddle/fluid/framework/ddim.h" @@ -31,6 +32,19 @@ class ReaderBase { virtual void ReInit() = 0; virtual ~ReaderBase(); + + // Return the readers which are the end of decorating chain. Basically + // they are readers just before read op. + std::unordered_set GetEndPoints(); + + private: + friend class DecoratedReader; + // These methods can be only invoked inside DecoratedReader to record the + // decorating chain. + void InsertDecoratedReader(ReaderBase* decorated_reader); + void EraseDecoratedReader(ReaderBase* decorated_reader); + // A set of which readers that decorated this reader. + std::unordered_set decorated_readers_; }; class DecoratedReader : public ReaderBase { @@ -38,8 +52,11 @@ class DecoratedReader : public ReaderBase { explicit DecoratedReader(const std::shared_ptr& reader) : ReaderBase(), reader_(reader) { PADDLE_ENFORCE_NOT_NULL(reader_); + reader_->InsertDecoratedReader(this); } + ~DecoratedReader(); + void ReInit() override { reader_->ReInit(); } protected: diff --git a/paddle/fluid/framework/reader_test.cc b/paddle/fluid/framework/reader_test.cc new file mode 100644 index 0000000000..c763fe18d6 --- /dev/null +++ b/paddle/fluid/framework/reader_test.cc @@ -0,0 +1,50 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/reader.h" +#include +#include "gtest/gtest.h" + +class StubDecoratedReader : public paddle::framework::DecoratedReader { + public: + explicit StubDecoratedReader(const std::shared_ptr &reader) + : DecoratedReader(reader) {} + + void ReadNext(std::vector *out) override {} +}; + +class StubRootReader : public paddle::framework::ReaderBase { + public: + void ReadNext(std::vector *out) override {} + void ReInit() override {} +}; + +TEST(READER, decorate_chain) { + auto root = std::make_shared(); + auto end_point1 = StubDecoratedReader(root); + auto end_point2 = StubDecoratedReader(root); + + { + auto endpoints = root->GetEndPoints(); + ASSERT_EQ(endpoints.size(), 2U); + ASSERT_NE(endpoints.count(&end_point1), 0); + ASSERT_NE(endpoints.count(&end_point2), 0); + } + + { + auto end_point3 = StubDecoratedReader(root); + ASSERT_EQ(root->GetEndPoints().size(), 3U); + } + { ASSERT_EQ(root->GetEndPoints().size(), 2U); } +} From c48c586acacc31274cd3a50965d8e65e8213e6f1 Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Sun, 8 Jul 2018 12:19:05 +0800 Subject: [PATCH 04/22] Use weak_ptr to implement DecoratedReaderChain --- paddle/fluid/framework/reader.cc | 19 +++++------ paddle/fluid/framework/reader.h | 34 ++++++++++++++----- paddle/fluid/framework/reader_test.cc | 13 ++++--- .../reader/create_batch_reader_op.cc | 4 +-- .../reader/create_custom_reader_op.cc | 8 ++--- .../reader/create_double_buffer_reader_op.cc | 3 +- .../reader/create_multi_pass_reader_op.cc | 3 +- .../operators/reader/create_py_reader_op.cc | 2 +- .../reader/create_random_data_generator_op.cc | 4 +-- .../reader/create_recordio_file_reader_op.cc | 2 +- .../reader/create_shuffle_reader_op.cc | 5 ++- .../reader/create_threaded_reader_op.cc | 3 +- .../fluid/operators/reader/open_files_op.cc | 6 ++-- 13 files changed, 62 insertions(+), 44 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 2e2aa1cba1..e1d2ac79cf 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -19,15 +19,11 @@ namespace paddle { namespace framework { ReaderBase::~ReaderBase() {} -void ReaderBase::InsertDecoratedReader(ReaderBase *decorated_reader) { - decorated_readers_.emplace(decorated_reader); -} -void ReaderBase::EraseDecoratedReader(ReaderBase *decorated_reader) { - auto it = decorated_readers_.find(decorated_reader); - PADDLE_ENFORCE(it != decorated_readers_.end(), - "Cannot find the decorated reader to erase"); - decorated_readers_.erase(it); +void ReaderBase::InsertDecoratedReader( + const std::shared_ptr &decorated_reader) { + decorated_readers_.emplace_back(decorated_reader); } + std::unordered_set ReaderBase::GetEndPoints() { std::unordered_set result; std::deque queue; @@ -38,8 +34,10 @@ std::unordered_set ReaderBase::GetEndPoints() { if (front->decorated_readers_.empty()) { result.emplace(front); } else { - for (ReaderBase *reader : front->decorated_readers_) { - queue.emplace_back(reader); + for (auto &reader : front->decorated_readers_) { + if (auto *reader_ptr = reader.lock().get()) { + queue.emplace_back(reader_ptr); + } } } } @@ -66,6 +64,5 @@ void FileReader::ReadNext(std::vector *out) { } } } -DecoratedReader::~DecoratedReader() { reader_->EraseDecoratedReader(this); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 2a65c58e3f..730e3faace 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -41,24 +41,26 @@ class ReaderBase { friend class DecoratedReader; // These methods can be only invoked inside DecoratedReader to record the // decorating chain. - void InsertDecoratedReader(ReaderBase* decorated_reader); - void EraseDecoratedReader(ReaderBase* decorated_reader); + void InsertDecoratedReader( + const std::shared_ptr& decorated_reader); // A set of which readers that decorated this reader. - std::unordered_set decorated_readers_; + std::vector> decorated_readers_; }; -class DecoratedReader : public ReaderBase { +class DecoratedReader : public ReaderBase, + public std::enable_shared_from_this { public: explicit DecoratedReader(const std::shared_ptr& reader) : ReaderBase(), reader_(reader) { PADDLE_ENFORCE_NOT_NULL(reader_); - reader_->InsertDecoratedReader(this); } - ~DecoratedReader(); - void ReInit() override { reader_->ReInit(); } + void RegisterDecorateChain() { + reader_->InsertDecoratedReader(shared_from_this()); + } + protected: std::shared_ptr reader_; }; @@ -80,9 +82,14 @@ class FileReader : public ReaderBase { // making it easier to access different type reader in Variables. class ReaderHolder { public: - void Reset(ReaderBase* reader) { reader_.reset(reader); } + template + void Reset(const std::shared_ptr& reader) { + auto reader_base = std::dynamic_pointer_cast(reader); + PADDLE_ENFORCE_NOT_NULL(reader_base); + reader_ = reader_base; + } - std::shared_ptr Get() const { return reader_; } + const std::shared_ptr& Get() const { return reader_; } void ReadNext(std::vector* out) { PADDLE_ENFORCE_NOT_NULL(reader_); @@ -93,9 +100,18 @@ class ReaderHolder { reader_->ReInit(); } + operator const std::shared_ptr&() const { return this->reader_; } + private: std::shared_ptr reader_; }; +template +inline std::shared_ptr MakeDecoratedReader(ARGS&&... args) { + std::shared_ptr reader(new T(std::forward(args)...)); + reader->RegisterDecorateChain(); + return reader; +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader_test.cc b/paddle/fluid/framework/reader_test.cc index c763fe18d6..c05be86706 100644 --- a/paddle/fluid/framework/reader_test.cc +++ b/paddle/fluid/framework/reader_test.cc @@ -32,18 +32,21 @@ class StubRootReader : public paddle::framework::ReaderBase { TEST(READER, decorate_chain) { auto root = std::make_shared(); - auto end_point1 = StubDecoratedReader(root); - auto end_point2 = StubDecoratedReader(root); + auto end_point1 = + paddle::framework::MakeDecoratedReader(root); + auto end_point2 = + paddle::framework::MakeDecoratedReader(root); { auto endpoints = root->GetEndPoints(); ASSERT_EQ(endpoints.size(), 2U); - ASSERT_NE(endpoints.count(&end_point1), 0); - ASSERT_NE(endpoints.count(&end_point2), 0); + ASSERT_NE(endpoints.count(end_point1.get()), 0); + ASSERT_NE(endpoints.count(end_point2.get()), 0); } { - auto end_point3 = StubDecoratedReader(root); + auto end_point3 = + paddle::framework::MakeDecoratedReader(root); ASSERT_EQ(root->GetEndPoints().size(), 3U); } { ASSERT_EQ(root->GetEndPoints().size(), 2U); } diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc index ecbae3894d..41c3d37903 100644 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ b/paddle/fluid/operators/reader/create_batch_reader_op.cc @@ -46,8 +46,8 @@ class CreateBatchReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - out->Reset( - new BatchReader(underlying_reader.Get(), Attr("batch_size"))); + out->Reset(framework::MakeDecoratedReader( + underlying_reader, Attr("batch_size"))); } }; diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index a75c6d4c56..81a1aa7f9c 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -60,10 +60,10 @@ class CreateCustomReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - out->Reset( - new CustomReader(underlying_reader.Get(), *sub_block, - Attr>("source_var_names"), - Attr>("sink_var_names"))); + out->Reset(framework::MakeDecoratedReader( + underlying_reader, *sub_block, + Attr>("source_var_names"), + Attr>("sink_var_names"))); } }; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 5f734489a8..9382046954 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -109,7 +109,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { place = platform::CUDAPlace(static_cast(num)); } - out->Reset(new DoubleBufferReader(underlying_reader.Get(), place)); + out->Reset(framework::MakeDecoratedReader( + underlying_reader, place)); } }; diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index 19b54110b9..69b3400a84 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -60,7 +60,8 @@ class CreateMultiPassReaderOp : public framework::OperatorBase { const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); int pass_num = Attr("pass_num"); - out->Reset(new MultiPassReader(underlying_reader.Get(), pass_num)); + out->Reset(framework::MakeDecoratedReader( + underlying_reader, pass_num)); } }; diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 36587360f7..0b3578570b 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -58,7 +58,7 @@ class CreatePyReaderOp : public framework::OperatorBase { auto* queue_holder = queue_holder_var->template GetMutable(); - out->Reset(new PyReader(queue_holder->GetQueue())); + out->Reset(std::make_shared(queue_holder->GetQueue())); } }; diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index 5b7e8a063a..1c3de3feab 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -79,8 +79,8 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase { std::vector shapes = RestoreShapes(shape_concat, ranks); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new RandomDataGenerator(shapes, Attr("low"), - Attr("high"))); + out->Reset(std::make_shared>( + shapes, Attr("low"), Attr("high"))); } }; diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 559827f084..c457cb3fb4 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -70,7 +70,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new RecordIOFileReader( + out->Reset(std::make_shared>( filename, RestoreShapes(shape_concat, ranks))); } }; diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 57e8e21214..75adabdaa9 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -86,9 +86,8 @@ class CreateShuffleReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - out->Reset( - new ShuffleReader(underlying_reader.Get(), - static_cast(Attr("buffer_size")))); + out->Reset(framework::MakeDecoratedReader( + underlying_reader, static_cast(Attr("buffer_size")))); } }; diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 3798015146..81d75cdd33 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -49,7 +49,8 @@ class CreateThreadedReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - out->Reset(new ThreadedReader(underlying_reader.Get())); + out->Reset( + framework::MakeDecoratedReader(underlying_reader)); } }; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 31e5d81e55..e382066be5 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -180,9 +180,9 @@ class OpenFilesOp : public framework::OperatorBase { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new MultiFileReader(file_names, - RestoreShapes(shape_concat, ranks), - thread_num, buffer_size)); + out->Reset(std::make_shared( + file_names, RestoreShapes(shape_concat, ranks), thread_num, + buffer_size)); } }; From de9a411f1cdca85f127f0715e1f9d25ef4c76195 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Jul 2018 15:59:27 +0800 Subject: [PATCH 05/22] adjust readers' inheritance relationships 1. Make PyReader and RandomDataGenerator inherited from FileReader. 2. Remove the memeber variable 'dims_' and realated checks in FileReader. --- paddle/fluid/framework/reader.cc | 20 +------------------ paddle/fluid/framework/reader.h | 5 +---- .../reader/create_double_buffer_reader_op.cc | 2 +- .../operators/reader/create_py_reader_op.cc | 7 ++++--- .../reader/create_random_data_generator_op.cc | 6 +++--- .../reader/create_recordio_file_reader_op.cc | 17 +++------------- .../fluid/operators/reader/open_files_op.cc | 9 +++------ .../operators/reader/reader_op_registry.cc | 4 ++-- .../operators/reader/reader_op_registry.h | 11 +++++----- 9 files changed, 23 insertions(+), 58 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 0b36f1116d..f288b90b4d 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -18,24 +18,6 @@ namespace paddle { namespace framework { ReaderBase::~ReaderBase() {} -FileReader::FileReader(const std::vector &dims) : dims_(dims) {} - -void FileReader::ReadNext(std::vector *out) { - ReadNextImpl(out); - if (out->empty()) { - return; - } - - PADDLE_ENFORCE_EQ(out->size(), dims_.size()); - for (size_t i = 0; i < dims_.size(); ++i) { - auto &actual = (*out)[i].dims(); - auto &expect = dims_[i]; - - PADDLE_ENFORCE_EQ(actual.size(), expect.size()); - for (int j = 0; j < actual.size(); ++j) { - // PADDLE_ENFORCE(actual[i] == expect[i] || expect[i] == -1); - } - } -} +void FileReader::ReadNext(std::vector *out) { ReadNextImpl(out); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 64d4ceab62..823d58af5e 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -48,15 +48,12 @@ class DecoratedReader : public ReaderBase { class FileReader : public ReaderBase { public: - explicit FileReader(const std::vector& dims); + FileReader() : ReaderBase() {} void ReadNext(std::vector* out) override; protected: virtual void ReadNextImpl(std::vector* out) = 0; - - private: - std::vector dims_; }; // The ReaderHolder is used as reader' unified wrapper, diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 5f734489a8..0d2ff2e8e4 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -151,8 +151,8 @@ void DoubleBufferReader::ReadNext(std::vector* out) { } void DoubleBufferReader::ReInit() { - reader_->ReInit(); EndPrefetcher(); + reader_->ReInit(); StartPrefetcher(); } diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 36587360f7..84ea72379b 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -19,14 +19,15 @@ namespace paddle { namespace operators { namespace reader { -class PyReader : public framework::ReaderBase { +class PyReader : public framework::FileReader { public: - explicit PyReader(const std::shared_ptr& queue) { + explicit PyReader(const std::shared_ptr& queue) + : framework::FileReader() { PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null"); queue_ = queue; } - void ReadNext(std::vector* out) override { + void ReadNextImpl(std::vector* out) override { bool success; *out = queue_->Pop(&success); if (!success) out->clear(); diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index 5b7e8a063a..7cbc2882fd 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -19,11 +19,11 @@ namespace operators { namespace reader { template -class RandomDataGenerator : public framework::ReaderBase { +class RandomDataGenerator : public framework::FileReader { public: RandomDataGenerator(const std::vector& shapes, float low, float high) - : framework::ReaderBase(), low_(low), high_(high), shapes_(shapes) { + : framework::FileReader(), low_(low), high_(high), shapes_(shapes) { PADDLE_ENFORCE_LE(low, high, "'low' shouldn't be greater than 'high'.(%f vs %f)", low, high); @@ -32,7 +32,7 @@ class RandomDataGenerator : public framework::ReaderBase { dist_ = std::uniform_real_distribution(low_, high_); } - void ReadNext(std::vector* out) override { + void ReadNextImpl(std::vector* out) override { out->clear(); out->reserve(shapes_.size()); for (const framework::DDim& shape : shapes_) { diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 559827f084..c032acdffd 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -21,9 +21,8 @@ namespace reader { template class RecordIOFileReader : public framework::FileReader { public: - explicit RecordIOFileReader(const std::string& filename, - const std::vector& dims) - : FileReader(dims), + explicit RecordIOFileReader(const std::string& filename) + : FileReader(), scanner_(filename), dev_ctx_(*platform::DeviceContextPool::Instance().Get( platform::CPUPlace())) { @@ -58,20 +57,10 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope& scope, const platform::Place& dev_place) const override { - const auto& shape_concat = Attr>("shape_concat"); - const auto& ranks = Attr>("ranks"); - PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); - PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - static_cast(shape_concat.size()), - "The accumulate of all ranks should be equal to the " - "shape concat's length."); std::string filename = Attr("filename"); - auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - - out->Reset(new RecordIOFileReader( - filename, RestoreShapes(shape_concat, ranks))); + out->Reset(new RecordIOFileReader(filename)); } }; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 31e5d81e55..6e34a5bd00 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -23,13 +23,12 @@ namespace reader { class MultiFileReader : public framework::ReaderBase { public: - MultiFileReader(const std::vector& file_names, - const std::vector& dims, size_t thread_num, + MultiFileReader(const std::vector& file_names, size_t thread_num, size_t buffer_size) : buffer_size_(buffer_size) { readers_.reserve(file_names.size()); for (const std::string& f_name : file_names) { - readers_.emplace_back(CreateReaderByFileName(f_name, dims)); + readers_.emplace_back(CreateReaderByFileName(f_name)); } prefetchers_.resize(thread_num); StartNewScheduler(); @@ -180,9 +179,7 @@ class OpenFilesOp : public framework::OperatorBase { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new MultiFileReader(file_names, - RestoreShapes(shape_concat, ranks), - thread_num, buffer_size)); + out->Reset(new MultiFileReader(file_names, thread_num, buffer_size)); } }; diff --git a/paddle/fluid/operators/reader/reader_op_registry.cc b/paddle/fluid/operators/reader/reader_op_registry.cc index e11256a49f..b82aab1214 100644 --- a/paddle/fluid/operators/reader/reader_op_registry.cc +++ b/paddle/fluid/operators/reader/reader_op_registry.cc @@ -39,7 +39,7 @@ std::unordered_map& FileReaderRegistry() { } std::unique_ptr CreateReaderByFileName( - const std::string& file_name, const std::vector& dims) { + const std::string& file_name) { size_t separator_pos = file_name.find_last_of(kFileFormatSeparator); PADDLE_ENFORCE_NE(separator_pos, std::string::npos, "File name illegal! A legal file name should be like: " @@ -49,7 +49,7 @@ std::unique_ptr CreateReaderByFileName( auto itor = FileReaderRegistry().find(filetype); PADDLE_ENFORCE(itor != FileReaderRegistry().end(), "No file reader registered for '%s' format.", filetype); - framework::ReaderBase* reader = (itor->second)(file_name, dims); + framework::ReaderBase* reader = (itor->second)(file_name); return std::unique_ptr(reader); } diff --git a/paddle/fluid/operators/reader/reader_op_registry.h b/paddle/fluid/operators/reader/reader_op_registry.h index 244bf15f06..25c3e7d77b 100644 --- a/paddle/fluid/operators/reader/reader_op_registry.h +++ b/paddle/fluid/operators/reader/reader_op_registry.h @@ -25,22 +25,21 @@ namespace reader { static constexpr char kFileFormatSeparator[] = "."; -using FileReaderCreator = std::function&)>; +using FileReaderCreator = + std::function; std::unordered_map& FileReaderRegistry(); template int RegisterFileReader(const std::string& filetype) { - FileReaderRegistry()[filetype] = []( - const std::string& fn, const std::vector& dims) { - return new Reader(fn, dims); + FileReaderRegistry()[filetype] = [](const std::string& fn) { + return new Reader(fn); }; return 0; } std::unique_ptr CreateReaderByFileName( - const std::string& file_name, const std::vector& dims); + const std::string& file_name); extern std::vector RestoreShapes( const std::vector& shape_concat, const std::vector& ranks); From 5528f599001d5d8def973ac92cffa6a7bb9611be Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Jul 2018 17:57:18 +0800 Subject: [PATCH 06/22] Split ReInit() to Shutdown() and Start() --- paddle/fluid/framework/reader.cc | 23 +++++++++- paddle/fluid/framework/reader.h | 43 +++++++++++++++---- .../reader/create_batch_reader_op.cc | 5 ++- .../reader/create_custom_reader_op.cc | 8 ++-- .../reader/create_double_buffer_reader_op.cc | 25 ++++++----- .../reader/create_multi_pass_reader_op.cc | 15 ++++--- .../operators/reader/create_py_reader_op.cc | 8 +++- .../reader/create_random_data_generator_op.cc | 3 +- .../reader/create_recordio_file_reader_op.cc | 5 ++- .../reader/create_shuffle_reader_op.cc | 15 ++++++- .../reader/create_threaded_reader_op.cc | 21 ++++++--- .../fluid/operators/reader/open_files_op.cc | 27 +++++------- paddle/fluid/pybind/pybind.cc | 2 +- 13 files changed, 136 insertions(+), 64 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index f288b90b4d..0f2f4387aa 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -16,8 +16,29 @@ namespace paddle { namespace framework { + +void ReaderBase::ReadNext(std::vector *out) { + if (status_ != ReaderStatus::kRunning) { + PADDLE_THROW("The reader is not at the status of 'running'."); + } + ReadNextImpl(out); +} + +void ReaderBase::Shutdown() { + if (status_ != ReaderStatus::kStopped) { + ShutdownImpl(); + status_ = ReaderStatus::kStopped; + } +} + +void ReaderBase::Start() { + if (status_ != ReaderStatus::kRunning) { + StartImpl(); + status_ = ReaderStatus::kRunning; + } +} + ReaderBase::~ReaderBase() {} -void FileReader::ReadNext(std::vector *out) { ReadNextImpl(out); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 823d58af5e..6b62d11802 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -24,13 +24,26 @@ namespace paddle { namespace framework { +enum ReaderStatus { kRunning, kStopped }; + class ReaderBase { public: - virtual void ReadNext(std::vector* out) = 0; + void ReadNext(std::vector* out); + + void Shutdown(); - virtual void ReInit() = 0; + void Start(); virtual ~ReaderBase(); + + protected: + virtual void ReadNextImpl(std::vector* out) = 0; + + virtual void ShutdownImpl() = 0; + + virtual void StartImpl() = 0; + + std::atomic status_{kStopped}; }; class DecoratedReader : public ReaderBase { @@ -40,9 +53,11 @@ class DecoratedReader : public ReaderBase { PADDLE_ENFORCE_NOT_NULL(reader_); } - void ReInit() override { reader_->ReInit(); } - protected: + void ShutdownImpl() override { reader_->Shutdown(); } + + void StartImpl() override { reader_->Start(); } + std::shared_ptr reader_; }; @@ -50,10 +65,10 @@ class FileReader : public ReaderBase { public: FileReader() : ReaderBase() {} - void ReadNext(std::vector* out) override; - protected: - virtual void ReadNextImpl(std::vector* out) = 0; + void ShutdownImpl() override {} + + void StartImpl() override {} }; // The ReaderHolder is used as reader' unified wrapper, @@ -68,9 +83,19 @@ class ReaderHolder { PADDLE_ENFORCE_NOT_NULL(reader_); reader_->ReadNext(out); } - void ReInit() { + + void ResetAll() { + // TODO(fengjiayi): The interface of reseting all. + } + + void Shutdown() { + PADDLE_ENFORCE_NOT_NULL(reader_); + reader_->Shutdown(); + } + + void Start() { PADDLE_ENFORCE_NOT_NULL(reader_); - reader_->ReInit(); + reader_->Start(); } private: diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc index ecbae3894d..429313a339 100644 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ b/paddle/fluid/operators/reader/create_batch_reader_op.cc @@ -23,9 +23,10 @@ class BatchReader : public framework::DecoratedReader { BatchReader(const std::shared_ptr& reader, int batch_size) : DecoratedReader(reader), batch_size_(batch_size) { buffer_.reserve(batch_size_); + Start(); } - void ReadNext(std::vector* out) override; + void ReadNextImpl(std::vector* out) override; private: int batch_size_; @@ -66,7 +67,7 @@ class CreateBatchReaderOpMaker : public DecoratedReaderMakerBase { } }; -void BatchReader::ReadNext(std::vector* out) { +void BatchReader::ReadNextImpl(std::vector* out) { buffer_.clear(); buffer_.reserve(batch_size_); for (int i = 0; i < batch_size_; ++i) { diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index a75c6d4c56..334ba4cf6e 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -31,9 +31,11 @@ class CustomReader : public framework::DecoratedReader { sub_block_id_(sub_block.ID()), exe_(framework::Executor(platform::CPUPlace())), source_var_names_(source_var_names), - sink_var_names_(sink_var_names) {} + sink_var_names_(sink_var_names) { + Start(); + } - void ReadNext(std::vector* out) override; + void ReadNextImpl(std::vector* out) override; private: const framework::ProgramDesc program_; @@ -143,7 +145,7 @@ class CustomReaderInferVarType : public framework::VarTypeInference { } }; -void CustomReader::ReadNext(std::vector* out) { +void CustomReader::ReadNextImpl(std::vector* out) { out->clear(); std::vector underlying_outs; reader_->ReadNext(&underlying_outs); diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 0d2ff2e8e4..88c34187ea 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -47,15 +47,24 @@ class DoubleBufferReader : public framework::DecoratedReader { } } #endif - StartPrefetcher(); + Start(); } - void ReadNext(std::vector* out) override; - void ReInit() override; + void ReadNextImpl(std::vector* out) override; - ~DoubleBufferReader() { EndPrefetcher(); } + ~DoubleBufferReader() { Shutdown(); } private: + void ShutdownImpl() override { + EndPrefetcher(); + reader_->Shutdown(); + } + + void StartImpl() override { + reader_->Start(); + StartPrefetcher(); + } + void StartPrefetcher() { channel_ = new reader::BlockingQueue(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); @@ -136,7 +145,7 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { } }; -void DoubleBufferReader::ReadNext(std::vector* out) { +void DoubleBufferReader::ReadNextImpl(std::vector* out) { size_t cached_tensor_id; if (channel_->Receive(&cached_tensor_id)) { if (platform::is_gpu_place(place_)) { @@ -150,12 +159,6 @@ void DoubleBufferReader::ReadNext(std::vector* out) { } } -void DoubleBufferReader::ReInit() { - EndPrefetcher(); - reader_->ReInit(); - StartPrefetcher(); -} - void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; size_t cached_tensor_id = 0; diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index 19b54110b9..7c8aa975a1 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -22,25 +22,28 @@ namespace reader { class MultiPassReader : public framework::DecoratedReader { public: MultiPassReader(const std::shared_ptr& reader, int pass_num) - : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} + : DecoratedReader(reader), pass_num_(pass_num) { + Start(); + } - void ReadNext(std::vector* out) override { + void ReadNextImpl(std::vector* out) override { reader_->ReadNext(out); if (out->empty()) { ++pass_count_; if (pass_count_ < pass_num_) { - reader_->ReInit(); + reader_->Shutdown(); + reader_->Start(); reader_->ReadNext(out); } } } - void ReInit() override { + private: + void StartImpl() override { pass_count_ = 0; - reader_->ReInit(); + reader_->Start(); } - private: int pass_num_; mutable int pass_count_; }; diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 84ea72379b..9b4c6412e6 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -33,9 +33,13 @@ class PyReader : public framework::FileReader { if (!success) out->clear(); } - void ReInit() override {} - private: + void ShutdownImpl() override { /* TODO */ + } + + void StartImpl() override { /* TODO */ + } + std::shared_ptr queue_; }; diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index 7cbc2882fd..c92a8b49b5 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -30,6 +30,7 @@ class RandomDataGenerator : public framework::FileReader { unsigned int seed = std::random_device()(); engine_.seed(seed); dist_ = std::uniform_real_distribution(low_, high_); + Start(); } void ReadNextImpl(std::vector* out) override { @@ -51,8 +52,6 @@ class RandomDataGenerator : public framework::FileReader { } } - void ReInit() override { return; } - private: float low_; float high_; diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index c032acdffd..7a44bd14eb 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -30,10 +30,9 @@ class RecordIOFileReader : public framework::FileReader { mutex_.reset(new std::mutex()); } LOG(INFO) << "Creating file reader" << filename; + Start(); } - void ReInit() override { scanner_.Reset(); } - protected: void ReadNextImpl(std::vector* out) override { if (ThreadSafe) { @@ -44,6 +43,8 @@ class RecordIOFileReader : public framework::FileReader { } } + void ShutdownImpl() override { scanner_.Reset(); } + private: std::unique_ptr mutex_; recordio::Scanner scanner_; diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 57e8e21214..3cee9bfd64 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -31,10 +31,10 @@ class ShuffleReader : public framework::DecoratedReader { std::random_device device; seed_ = device(); } - ReloadBuffer(); + Start(); } - void ReadNext(std::vector* out) override { + void ReadNextImpl(std::vector* out) override { out->clear(); if (iteration_pos_ >= buffer_.size()) { VLOG(10) << "Resetting shuffle buffer"; @@ -47,6 +47,17 @@ class ShuffleReader : public framework::DecoratedReader { } private: + void ShutdownImpl() override { + buffer_.clear(); + iteration_pos_ = 0; + reader_->Shutdown(); + } + + void StartImpl() override { + reader_->Start(); + ReloadBuffer(); + } + void ReloadBuffer() { buffer_.clear(); buffer_.reserve(buffer_size_); diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 3798015146..76b853527c 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -22,16 +22,26 @@ namespace reader { class ThreadedReader : public framework::DecoratedReader { public: explicit ThreadedReader(const std::shared_ptr& reader) - : DecoratedReader(reader) {} + : DecoratedReader(reader) { + Start(); + } - void ReadNext(std::vector* out) override { + void ReadNextImpl(std::vector* out) override { std::lock_guard lock(mutex_); reader_->ReadNext(out); } - void ReInit() override { reader_->ReInit(); } - private: + void ShutdownImpl() override { + std::lock_guard lock(mutex_); + reader_->Shutdown(); + } + + void StartImpl() override { + std::lock_guard lock(mutex_); + reader_->Start(); + } + std::mutex mutex_; }; @@ -62,9 +72,6 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { This operator creates a threaded reader. A threaded reader's 'ReadNext()' can be invoked by several threads at the same time. - When the attribute 'safe_mode' is true, the threaded reader's - 'ReInit()' is disabled to avoid unexpected bugs in multi-thread - environment. )DOC"); } }; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 6e34a5bd00..85127d93b2 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -31,17 +31,16 @@ class MultiFileReader : public framework::ReaderBase { readers_.emplace_back(CreateReaderByFileName(f_name)); } prefetchers_.resize(thread_num); - StartNewScheduler(); + Start(); } - void ReadNext(std::vector* out) override; - void ReInit() override; + void ReadNextImpl(std::vector* out) override; - ~MultiFileReader() { EndScheduler(); } + ~MultiFileReader() { Shutdown(); } private: - void StartNewScheduler(); - void EndScheduler(); + void StartImpl() override; + void ShutdownImpl() override; void ScheduleThreadFunc(); void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx); @@ -54,18 +53,13 @@ class MultiFileReader : public framework::ReaderBase { reader::BlockingQueue>* buffer_; }; -void MultiFileReader::ReadNext(std::vector* out) { +void MultiFileReader::ReadNextImpl(std::vector* out) { if (!buffer_->Receive(out)) { out->clear(); } } -void MultiFileReader::ReInit() { - EndScheduler(); - StartNewScheduler(); -} - -void MultiFileReader::StartNewScheduler() { +void MultiFileReader::StartImpl() { size_t thread_num = prefetchers_.size(); waiting_reader_idx_ = new reader::BlockingQueue(readers_.size()); available_thread_idx_ = new reader::BlockingQueue(thread_num); @@ -83,7 +77,7 @@ void MultiFileReader::StartNewScheduler() { scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); } -void MultiFileReader::EndScheduler() { +void MultiFileReader::ShutdownImpl() { available_thread_idx_->Close(); buffer_->Close(); waiting_reader_idx_->Close(); @@ -119,7 +113,7 @@ void MultiFileReader::ScheduleThreadFunc() { } } } - // If users invoke ReInit() when scheduler is running, it will close the + // If users invoke Shutdown() when scheduler is running, it will close the // 'avaiable_thread_idx_' and prefecther threads have no way to tell scheduler // to release their resource. So a check is needed before scheduler ends. for (auto& p : prefetchers_) { @@ -137,7 +131,8 @@ void MultiFileReader::PrefetchThreadFunc(size_t reader_idx, size_t thread_idx) { std::vector ins; reader->ReadNext(&ins); if (ins.empty()) { - reader->ReInit(); + reader->Shutdown(); + reader->Start(); break; } try { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 7a8bb71245..0c523b6f17 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -296,7 +296,7 @@ All parameter, weight, gradient are variables in Paddle. py::return_value_policy::reference); py::class_(m, "Reader", "") - .def("reset", &framework::ReaderHolder::ReInit); + .def("reset", &framework::ReaderHolder::ResetAll); using LoDTensorBlockingQueue = ::paddle::operators::reader::LoDTensorBlockingQueue; From 6fc6cc2f4c18e5443e57035fe4d5b7368a36cd42 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Jul 2018 18:28:10 +0800 Subject: [PATCH 07/22] Some updates on readers 1. Shrink DoubleBufferReader's buffer size to 3. 2. Add BatchReader an option to discard leftover instances. 3. Fix a MultiPassReader bug on pass count. --- .../reader/create_batch_reader_op.cc | 19 +++++++++++++++---- .../reader/create_double_buffer_reader_op.cc | 4 ++-- .../reader/create_multi_pass_reader_op.cc | 10 ++++------ 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc index 429313a339..4d16b82e67 100644 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ b/paddle/fluid/operators/reader/create_batch_reader_op.cc @@ -20,8 +20,11 @@ namespace reader { class BatchReader : public framework::DecoratedReader { public: - BatchReader(const std::shared_ptr& reader, int batch_size) - : DecoratedReader(reader), batch_size_(batch_size) { + BatchReader(const std::shared_ptr& reader, int batch_size, + bool discard_leftover) + : DecoratedReader(reader), + batch_size_(batch_size), + discard_leftover_(discard_leftover) { buffer_.reserve(batch_size_); Start(); } @@ -30,6 +33,7 @@ class BatchReader : public framework::DecoratedReader { private: int batch_size_; + bool discard_leftover_; std::vector> buffer_; }; @@ -47,8 +51,8 @@ class CreateBatchReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - out->Reset( - new BatchReader(underlying_reader.Get(), Attr("batch_size"))); + out->Reset(new BatchReader(underlying_reader.Get(), Attr("batch_size"), + Attr("discard_leftover"))); } }; @@ -58,6 +62,10 @@ class CreateBatchReaderOpMaker : public DecoratedReaderMakerBase { AddAttr("batch_size", "How many instances the batch reader yields each time.") .GreaterThan(0); + AddAttr("discard_leftover", + "If true, the leftover instances that are not enough for a " + "new batch will be discarded.") + .SetDefault(true); AddComment(R"DOC( CreateBatchReader Operator @@ -78,6 +86,9 @@ void BatchReader::ReadNextImpl(std::vector* out) { break; } } + if (discard_leftover_ && buffer_.size() < batch_size_) { + buffer_.clear(); + } // Concat instances out->clear(); if (buffer_.empty()) { diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 88c34187ea..efca6fe225 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -23,13 +23,13 @@ namespace reader { // 'Double buffer' means we shall maintain two batches of input data at the same // time. So the kCacheSize shoul be at least 2. -static constexpr size_t kCacheSize = 5; +static constexpr size_t kCacheSize = 3; // There will be two bacthes out of the channel during training: // 1. the one waiting to be sent to the channel // 2. the one just be received from the channel, which is also being used by // subsequent operators. // So the channel size should be kChacheSize - 2 -static constexpr size_t kChannelSize = 3; // kCacheSize - 2 +static constexpr size_t kChannelSize = 1; // kCacheSize - 2 class DoubleBufferReader : public framework::DecoratedReader { public: diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index 7c8aa975a1..82331cb272 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -28,13 +28,11 @@ class MultiPassReader : public framework::DecoratedReader { void ReadNextImpl(std::vector* out) override { reader_->ReadNext(out); - if (out->empty()) { + if (out->empty() && pass_count_ < pass_num_ - 1) { + reader_->Shutdown(); + reader_->Start(); + reader_->ReadNext(out); ++pass_count_; - if (pass_count_ < pass_num_) { - reader_->Shutdown(); - reader_->Start(); - reader_->ReadNext(out); - } } } From 55e6e2c4a817518b9a18471c0e44fb05c9f27901 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 9 Jul 2018 10:38:42 +0800 Subject: [PATCH 08/22] follow commonts --- python/setup.py.in | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/setup.py.in b/python/setup.py.in index 8f80502412..efbf9eeffc 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -126,13 +126,15 @@ if '${WITH_MKL}' == 'ON': shutil.copy('${MKLML_IOMP_LIB}', libs_path) package_data['paddle.libs']+=['libmklml_intel.so','libiomp5.so'] if '${WITH_MKLDNN}' == 'ON': - if "@APPLE@" != "1": - # change rpath of libmkldnn.so.0, add $ORIGIN/ to it. - # The reason is that all thirdparty libraries in the same directory, - # thus, libmkldnn.so.0 will find libmklml_intel.so and libiomp5.so. - command = "patchelf --set-rpath '$ORIGIN/' ${MKLDNN_SHARED_LIB}" + # TODO(typhoonzero): use install_name_tool to patch mkl libs once + # we can support mkl on mac. + # + # change rpath of libmkldnn.so.0, add $ORIGIN/ to it. + # The reason is that all thirdparty libraries in the same directory, + # thus, libmkldnn.so.0 will find libmklml_intel.so and libiomp5.so. + command = "patchelf --set-rpath '$ORIGIN/' ${MKLDNN_SHARED_LIB}" if os.system(command) != 0: - raise Exception("patchelf --set-rpath for libmkldnn.so.0 fails") + raise Exception("patch libmkldnn.so failed, command: %s" % command) package_data['paddle.libs']+=['libmkldnn.so.0'] shutil.copy('${MKLDNN_SHARED_LIB}', libs_path) # remove unused paddle/libs/__init__.py @@ -148,7 +150,7 @@ if "@APPLE@" == "1": else: command = "patchelf --set-rpath '$ORIGIN/../libs/' ${PADDLE_BINARY_DIR}/python/paddle/fluid/core.so" if os.system(command) != 0: - raise Exception("patchelf --set-rpath for core.so fails") + raise Exception("patch core.so failed, command: %s" % command) setup(name='${PACKAGE_NAME}', version='${PADDLE_VERSION}', From 75988275f53b8e078fe19c0b78f3415646c2e746 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 9 Jul 2018 11:18:51 +0800 Subject: [PATCH 09/22] fix MKL flag auto detect --- cmake/external/mkldnn.cmake | 1 + cmake/external/mklml.cmake | 1 + 2 files changed, 2 insertions(+) diff --git a/cmake/external/mkldnn.cmake b/cmake/external/mkldnn.cmake index 20dda35c5c..9dc73d239e 100644 --- a/cmake/external/mkldnn.cmake +++ b/cmake/external/mkldnn.cmake @@ -28,6 +28,7 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLDNN in Paddle yet." "Force WITH_MKLDNN=OFF") SET(WITH_MKLDNN OFF CACHE STRING "Disable MKLDNN in Windows and MacOS" FORCE) + SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) return() ENDIF() diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index 82c424fb79..30e7b4a8b1 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -21,6 +21,7 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLML in Paddle yet." "Force WITH_MKLML=OFF") SET(WITH_MKLML OFF CACHE STRING "Disable MKLML package in Windows and MacOS" FORCE) + SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) return() ENDIF() From 62c1133f42055211db82a2e075768fb438f8c0c4 Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Mon, 9 Jul 2018 11:26:10 +0800 Subject: [PATCH 10/22] Add mutex for decorated_chain --- paddle/fluid/framework/reader.cc | 1 + paddle/fluid/framework/reader.h | 1 + 2 files changed, 2 insertions(+) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index e1d2ac79cf..9884e94121 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -21,6 +21,7 @@ ReaderBase::~ReaderBase() {} void ReaderBase::InsertDecoratedReader( const std::shared_ptr &decorated_reader) { + std::lock_guard guard(decorated_readers_mtx_); decorated_readers_.emplace_back(decorated_reader); } diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 730e3faace..01ef349300 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -45,6 +45,7 @@ class ReaderBase { const std::shared_ptr& decorated_reader); // A set of which readers that decorated this reader. std::vector> decorated_readers_; + std::mutex decorated_readers_mtx_; }; class DecoratedReader : public ReaderBase, From d9e182e260948dc86243e30a440ea676afbac18a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 9 Jul 2018 11:26:38 +0800 Subject: [PATCH 11/22] update --- cmake/external/mkldnn.cmake | 4 +++- cmake/external/mklml.cmake | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cmake/external/mkldnn.cmake b/cmake/external/mkldnn.cmake index 9dc73d239e..5c09507163 100644 --- a/cmake/external/mkldnn.cmake +++ b/cmake/external/mkldnn.cmake @@ -28,7 +28,9 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLDNN in Paddle yet." "Force WITH_MKLDNN=OFF") SET(WITH_MKLDNN OFF CACHE STRING "Disable MKLDNN in Windows and MacOS" FORCE) - SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) + IF ((NOT ${WITH_MKLDNN}) AND (NOT ${WITH_MKLML})) + SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) + ENDIF() return() ENDIF() diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index 30e7b4a8b1..ba00f0fe63 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -21,7 +21,9 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLML in Paddle yet." "Force WITH_MKLML=OFF") SET(WITH_MKLML OFF CACHE STRING "Disable MKLML package in Windows and MacOS" FORCE) - SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) + IF ((NOT ${WITH_MKLDNN}) AND (NOT ${WITH_MKLML})) + SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) + ENDIF() return() ENDIF() From b4f0e579564d246b43388b872c34e7ef9baccfeb Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Jul 2018 22:30:33 +0800 Subject: [PATCH 12/22] fix errors --- paddle/fluid/framework/reader.h | 3 ++- .../operators/reader/create_batch_reader_op.cc | 1 - .../operators/reader/create_custom_reader_op.cc | 4 +--- .../reader/create_double_buffer_reader_op.cc | 4 ++-- .../reader/create_multi_pass_reader_op.cc | 4 +--- .../reader/create_random_data_generator_op.cc | 1 - .../reader/create_recordio_file_reader_op.cc | 1 - .../operators/reader/create_shuffle_reader_op.cc | 2 +- .../reader/create_threaded_reader_op.cc | 4 +--- paddle/fluid/operators/reader/open_files_op.cc | 16 ++++++++++------ 10 files changed, 18 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 6b62d11802..91108544ac 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -43,7 +44,7 @@ class ReaderBase { virtual void StartImpl() = 0; - std::atomic status_{kStopped}; + std::atomic status_{kRunning}; }; class DecoratedReader : public ReaderBase { diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc index 4d16b82e67..e5b69dabcb 100644 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ b/paddle/fluid/operators/reader/create_batch_reader_op.cc @@ -26,7 +26,6 @@ class BatchReader : public framework::DecoratedReader { batch_size_(batch_size), discard_leftover_(discard_leftover) { buffer_.reserve(batch_size_); - Start(); } void ReadNextImpl(std::vector* out) override; diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index 334ba4cf6e..a53dceced3 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -31,9 +31,7 @@ class CustomReader : public framework::DecoratedReader { sub_block_id_(sub_block.ID()), exe_(framework::Executor(platform::CPUPlace())), source_var_names_(source_var_names), - sink_var_names_(sink_var_names) { - Start(); - } + sink_var_names_(sink_var_names) {} void ReadNextImpl(std::vector* out) override; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index efca6fe225..1bf6a86a5a 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -47,12 +47,12 @@ class DoubleBufferReader : public framework::DecoratedReader { } } #endif - Start(); + StartPrefetcher(); } void ReadNextImpl(std::vector* out) override; - ~DoubleBufferReader() { Shutdown(); } + ~DoubleBufferReader() { EndPrefetcher(); } private: void ShutdownImpl() override { diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index 82331cb272..f26a470c25 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -22,9 +22,7 @@ namespace reader { class MultiPassReader : public framework::DecoratedReader { public: MultiPassReader(const std::shared_ptr& reader, int pass_num) - : DecoratedReader(reader), pass_num_(pass_num) { - Start(); - } + : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} void ReadNextImpl(std::vector* out) override { reader_->ReadNext(out); diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index c92a8b49b5..9f7e3fd2d8 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -30,7 +30,6 @@ class RandomDataGenerator : public framework::FileReader { unsigned int seed = std::random_device()(); engine_.seed(seed); dist_ = std::uniform_real_distribution(low_, high_); - Start(); } void ReadNextImpl(std::vector* out) override { diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 7a44bd14eb..66f209b04e 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -30,7 +30,6 @@ class RecordIOFileReader : public framework::FileReader { mutex_.reset(new std::mutex()); } LOG(INFO) << "Creating file reader" << filename; - Start(); } protected: diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 3cee9bfd64..1d3d85b9e4 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -31,7 +31,7 @@ class ShuffleReader : public framework::DecoratedReader { std::random_device device; seed_ = device(); } - Start(); + ReloadBuffer(); } void ReadNextImpl(std::vector* out) override { diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 76b853527c..88a2bcab8d 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -22,9 +22,7 @@ namespace reader { class ThreadedReader : public framework::DecoratedReader { public: explicit ThreadedReader(const std::shared_ptr& reader) - : DecoratedReader(reader) { - Start(); - } + : DecoratedReader(reader) {} void ReadNextImpl(std::vector* out) override { std::lock_guard lock(mutex_); diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 85127d93b2..c657ffc535 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -31,16 +31,20 @@ class MultiFileReader : public framework::ReaderBase { readers_.emplace_back(CreateReaderByFileName(f_name)); } prefetchers_.resize(thread_num); - Start(); + StartNewScheduler(); } void ReadNextImpl(std::vector* out) override; - ~MultiFileReader() { Shutdown(); } + ~MultiFileReader() { EndScheduler(); } private: - void StartImpl() override; - void ShutdownImpl() override; + void ShutdownImpl() override { EndScheduler(); } + + void StartImpl() override { StartNewScheduler(); } + + void StartNewScheduler(); + void EndScheduler(); void ScheduleThreadFunc(); void PrefetchThreadFunc(size_t reader_idx, size_t thread_idx); @@ -59,7 +63,7 @@ void MultiFileReader::ReadNextImpl(std::vector* out) { } } -void MultiFileReader::StartImpl() { +void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_reader_idx_ = new reader::BlockingQueue(readers_.size()); available_thread_idx_ = new reader::BlockingQueue(thread_num); @@ -77,7 +81,7 @@ void MultiFileReader::StartImpl() { scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); } -void MultiFileReader::ShutdownImpl() { +void MultiFileReader::EndScheduler() { available_thread_idx_->Close(); buffer_->Close(); waiting_reader_idx_->Close(); From 0d2ccfbd3c7f89007cb2c014279af8ede2dc7b42 Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Mon, 9 Jul 2018 14:03:22 +0800 Subject: [PATCH 13/22] Remove atomic --- paddle/fluid/framework/reader.cc | 4 +--- paddle/fluid/framework/reader.h | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 0f2f4387aa..3231b2ab27 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -18,9 +18,7 @@ namespace paddle { namespace framework { void ReaderBase::ReadNext(std::vector *out) { - if (status_ != ReaderStatus::kRunning) { - PADDLE_THROW("The reader is not at the status of 'running'."); - } + PADDLE_ENFORCE_EQ(status_, ReaderStatus::kRunning); ReadNextImpl(out); } diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 91108544ac..9dc5fce4aa 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include @@ -44,7 +43,7 @@ class ReaderBase { virtual void StartImpl() = 0; - std::atomic status_{kRunning}; + ReaderStatus status_{kRunning}; }; class DecoratedReader : public ReaderBase { From e8ee9dc7f86e3828c48641b4bff7a1253548edab Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Mon, 9 Jul 2018 14:28:22 +0800 Subject: [PATCH 14/22] Several Polish --- paddle/fluid/framework/reader.cc | 2 +- paddle/fluid/framework/reader.h | 15 ++++----------- .../fluid/operators/reader/create_py_reader_op.cc | 4 ++-- .../reader/create_recordio_file_reader_op.cc | 5 ++--- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 3231b2ab27..924bbd32bd 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -36,7 +36,7 @@ void ReaderBase::Start() { } } -ReaderBase::~ReaderBase() {} +ReaderBase::~ReaderBase() { Shutdown(); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 9dc5fce4aa..51fc887f34 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -39,9 +39,9 @@ class ReaderBase { protected: virtual void ReadNextImpl(std::vector* out) = 0; - virtual void ShutdownImpl() = 0; + virtual void ShutdownImpl() {} - virtual void StartImpl() = 0; + virtual void StartImpl() {} ReaderStatus status_{kRunning}; }; @@ -61,15 +61,8 @@ class DecoratedReader : public ReaderBase { std::shared_ptr reader_; }; -class FileReader : public ReaderBase { - public: - FileReader() : ReaderBase() {} - - protected: - void ShutdownImpl() override {} - - void StartImpl() override {} -}; +// FileReader is just a conceptual class. +class FileReader : public ReaderBase {}; // The ReaderHolder is used as reader' unified wrapper, // making it easier to access different type reader in Variables. diff --git a/paddle/fluid/operators/reader/create_py_reader_op.cc b/paddle/fluid/operators/reader/create_py_reader_op.cc index 9b4c6412e6..fb7a96cb73 100644 --- a/paddle/fluid/operators/reader/create_py_reader_op.cc +++ b/paddle/fluid/operators/reader/create_py_reader_op.cc @@ -56,8 +56,8 @@ class CreatePyReaderOp : public framework::OperatorBase { const std::string& queue_name = Input("blocking_queue"); auto* queue_holder_var = scope.FindVar(queue_name); - PADDLE_ENFORCE( - queue_holder_var != nullptr, + PADDLE_ENFORCE_NOT_NULL( + queue_holder_var, "No LoDTensorBlockingQueueHolder variable with name %s found", queue_name); auto* queue_holder = diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 66f209b04e..61d94cfbd3 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -22,8 +22,7 @@ template class RecordIOFileReader : public framework::FileReader { public: explicit RecordIOFileReader(const std::string& filename) - : FileReader(), - scanner_(filename), + : scanner_(filename), dev_ctx_(*platform::DeviceContextPool::Instance().Get( platform::CPUPlace())) { if (ThreadSafe) { @@ -42,7 +41,7 @@ class RecordIOFileReader : public framework::FileReader { } } - void ShutdownImpl() override { scanner_.Reset(); } + void StartImpl() override { scanner_.Reset(); } private: std::unique_ptr mutex_; From 0e9f1e2790e992734820db9d013f7ca38b162441 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Mon, 9 Jul 2018 15:01:10 +0800 Subject: [PATCH 15/22] Make ReaderBase thread safe and remove ThreadedReader --- paddle/fluid/framework/reader.cc | 3 + paddle/fluid/framework/reader.h | 2 + paddle/fluid/operators/reader/CMakeLists.txt | 1 - .../reader/create_threaded_reader_op.cc | 84 ------------------- python/paddle/fluid/layers/io.py | 8 -- 5 files changed, 5 insertions(+), 93 deletions(-) delete mode 100644 paddle/fluid/operators/reader/create_threaded_reader_op.cc diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 3231b2ab27..567b0ee99f 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -18,11 +18,13 @@ namespace paddle { namespace framework { void ReaderBase::ReadNext(std::vector *out) { + std::lock_guard lock(mu_); PADDLE_ENFORCE_EQ(status_, ReaderStatus::kRunning); ReadNextImpl(out); } void ReaderBase::Shutdown() { + std::lock_guard lock(mu_); if (status_ != ReaderStatus::kStopped) { ShutdownImpl(); status_ = ReaderStatus::kStopped; @@ -30,6 +32,7 @@ void ReaderBase::Shutdown() { } void ReaderBase::Start() { + std::lock_guard lock(mu_); if (status_ != ReaderStatus::kRunning) { StartImpl(); status_ = ReaderStatus::kRunning; diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 9dc5fce4aa..8e7f43cdf9 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -44,6 +44,8 @@ class ReaderBase { virtual void StartImpl() = 0; ReaderStatus status_{kRunning}; + + mutable std::mutex mu_; }; class DecoratedReader : public ReaderBase { diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index a39c8a0053..9dbcc35e6f 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -22,7 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) -reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc) diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc deleted file mode 100644 index 88a2bcab8d..0000000000 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class ThreadedReader : public framework::DecoratedReader { - public: - explicit ThreadedReader(const std::shared_ptr& reader) - : DecoratedReader(reader) {} - - void ReadNextImpl(std::vector* out) override { - std::lock_guard lock(mutex_); - reader_->ReadNext(out); - } - - private: - void ShutdownImpl() override { - std::lock_guard lock(mutex_); - reader_->Shutdown(); - } - - void StartImpl() override { - std::lock_guard lock(mutex_); - reader_->Start(); - } - - std::mutex mutex_; -}; - -class CreateThreadedReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(new ThreadedReader(underlying_reader.Get())); - } -}; - -class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddComment(R"DOC( - CreateThreadedReader Operator - - This operator creates a threaded reader. A threaded reader's - 'ReadNext()' can be invoked by several threads at the same - time. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader, - reader::CreateThreadedReaderOp, - reader::CreateThreadedReaderOpMaker); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index f33ae76aea..2346252658 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -529,9 +529,6 @@ def open_files(filenames, main_prog_reader = multi_pass( reader=main_prog_reader, pass_num=pass_num) - if for_parallel: - main_prog_reader = parallel(reader=main_prog_reader) - return monkey_patch_reader_methods(main_prog_reader) @@ -647,11 +644,6 @@ def multi_pass(reader, pass_num): 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) -def parallel(reader): - return __create_shared_decorated_reader__('create_threaded_reader', reader, - {}) - - def read_file(reader): """ Execute the given reader and get data via it. From 869ccea565e5f08bb945743a0f90c1402754637c Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 9 Jul 2018 18:13:23 +0800 Subject: [PATCH 16/22] fix inference deps --- paddle/contrib/inference/CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paddle/contrib/inference/CMakeLists.txt b/paddle/contrib/inference/CMakeLists.txt index c30eff5010..80ae87d36a 100644 --- a/paddle/contrib/inference/CMakeLists.txt +++ b/paddle/contrib/inference/CMakeLists.txt @@ -48,8 +48,10 @@ cc_library(paddle_inference_api # Here the shared library doesn't depend on other fluid libraries, or double free will occur. cc_library(paddle_inference_api_shared SHARED - SRCS paddle_inference_api.cc paddle_inference_api_impl.cc) + SRCS paddle_inference_api.cc paddle_inference_api_impl.cc + DEPS ${FLUID_CORE_MODULES} ${GLOB_OP_LIB}) set_target_properties(paddle_inference_api_shared PROPERTIES OUTPUT_NAME paddle_inference_api) + if(NOT APPLE) set(LINK_FLAGS "-fPIC -fvisibility=hidden") set_target_properties(paddle_inference_api_shared PROPERTIES LINK_FLAGS "${LINK_FLAGS}") From d55919c656e19cd9600e1d009e6cdff878b5e28e Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Mon, 9 Jul 2018 16:53:42 +0800 Subject: [PATCH 17/22] Impl ResetAll and fix errors --- paddle/fluid/framework/reader.cc | 2 +- paddle/fluid/framework/reader.h | 8 +++++++- paddle/fluid/framework/reader_test.cc | 5 ++--- python/paddle/fluid/layers/io.py | 3 --- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index f8877e5cb0..5897d320a8 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -26,7 +26,7 @@ void ReaderBase::ReadNext(std::vector *out) { void ReaderBase::InsertDecoratedReader( const std::shared_ptr &decorated_reader) { - std::lock_guard guard(mu_)); + std::lock_guard guard(mu_); decorated_readers_.emplace_back(decorated_reader); } diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 93cd6243ff..6c4432cb7a 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -104,7 +104,13 @@ class ReaderHolder { } void ResetAll() { - // TODO(fengjiayi): The interface of reseting all. + auto end_readers = reader_->GetEndPoints(); + for (auto* reader : end_readers) { + reader->Shutdown(); + } + for (auto* reader : end_readers) { + reader->Start(); + } } void Shutdown() { diff --git a/paddle/fluid/framework/reader_test.cc b/paddle/fluid/framework/reader_test.cc index c05be86706..f0d07cb7c1 100644 --- a/paddle/fluid/framework/reader_test.cc +++ b/paddle/fluid/framework/reader_test.cc @@ -21,13 +21,12 @@ class StubDecoratedReader : public paddle::framework::DecoratedReader { explicit StubDecoratedReader(const std::shared_ptr &reader) : DecoratedReader(reader) {} - void ReadNext(std::vector *out) override {} + void ReadNextImpl(std::vector *out) override {} }; class StubRootReader : public paddle::framework::ReaderBase { public: - void ReadNext(std::vector *out) override {} - void ReInit() override {} + void ReadNextImpl(std::vector *out) override {} }; TEST(READER, decorate_chain) { diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 2346252658..977abde21f 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -375,9 +375,6 @@ def open_recordio_file(filename, if pass_num > 1: main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) - if for_parallel: - main_prog_var = parallel(reader=main_prog_var) - return monkey_patch_reader_methods(main_prog_var) From b2b371f31d006273dbc64aeec4a5cacc48a37a86 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 10 Jul 2018 10:40:21 +0800 Subject: [PATCH 18/22] update --- paddle/contrib/inference/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/contrib/inference/CMakeLists.txt b/paddle/contrib/inference/CMakeLists.txt index 80ae87d36a..98c2f68a6c 100644 --- a/paddle/contrib/inference/CMakeLists.txt +++ b/paddle/contrib/inference/CMakeLists.txt @@ -48,8 +48,8 @@ cc_library(paddle_inference_api # Here the shared library doesn't depend on other fluid libraries, or double free will occur. cc_library(paddle_inference_api_shared SHARED - SRCS paddle_inference_api.cc paddle_inference_api_impl.cc - DEPS ${FLUID_CORE_MODULES} ${GLOB_OP_LIB}) + SRCS paddle_inference_api.cc paddle_inference_api_impl.cc) +add_dependencies(paddle_inference_api_shared ${FLUID_CORE_MODULES} ${GLOB_OP_LIB}) set_target_properties(paddle_inference_api_shared PROPERTIES OUTPUT_NAME paddle_inference_api) if(NOT APPLE) From 6a74e2547f5175e3762f9bc74af5b40cb94702e5 Mon Sep 17 00:00:00 2001 From: skylarch Date: Tue, 10 Jul 2018 11:48:44 +0800 Subject: [PATCH 19/22] Update hyperlinks in workflow_of_capi_cn.md --- doc/v2/howto/capi/workflow_of_capi_cn.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/doc/v2/howto/capi/workflow_of_capi_cn.md b/doc/v2/howto/capi/workflow_of_capi_cn.md index 3acdbae28e..db1568a2af 100644 --- a/doc/v2/howto/capi/workflow_of_capi_cn.md +++ b/doc/v2/howto/capi/workflow_of_capi_cn.md @@ -28,9 +28,9 @@ ### 准备预测模型 -准备预测模型部分,我们以手写数字识别任务为例进行介绍。手写数字识别任务定义了一个含有[两个隐层的简单全连接网络](https://github.com/PaddlePaddle/book/blob/develop/02.recognize_digits/README.cn.md#softmax回归softmax-regression),网络接受一幅图片作为输入,将图片分类到 0 ~ 9 类别标签之一。完整代码可以查看[此目录](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense) 中的相关脚本。 +准备预测模型部分,我们以手写数字识别任务为例进行介绍。手写数字识别任务定义了一个含有[两个隐层的简单全连接网络](https://github.com/PaddlePaddle/book/blob/develop/02.recognize_digits/README.cn.md#softmax回归softmax-regression),网络接受一幅图片作为输入,将图片分类到 0 ~ 9 类别标签之一。完整代码可以查看[此目录](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense) 中的相关脚本。 -调用C-API开发预测程序需要一个训练好的模型,运行[MNIST手写数字识别目录](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense)下的[mnist_v2.py](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/examples/model_inference/dense/mnist_v2.py)脚本,在终端执行`python mnist_v2.py`,会使用 PaddlePaddle 内置的 [MNIST 数据集](http://yann.lecun.com/exdb/mnist/)进行训练。训练好的模型默认保存在当前运行目录下的`models`目录中。 +调用C-API开发预测程序需要一个训练好的模型,运行[MNIST手写数字识别目录](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense)下的[mnist_v2.py](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/examples/model_inference/dense/mnist_v2.py)脚本,在终端执行`python mnist_v2.py`,会使用 PaddlePaddle 内置的 [MNIST 数据集](http://yann.lecun.com/exdb/mnist/)进行训练。训练好的模型默认保存在当前运行目录下的`models`目录中。 下面,我们将训练结束后存储下来的模型转换成预测模型。 @@ -48,7 +48,7 @@ dump_v2_config(predict, "trainer_config.bin", True) ``` - 对[手写数字识别](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense)这个示例,[`mnist_v2.py`](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense/mnist_v2.py)脚本集成了序列化神经网络结构的过程,可以直接运行 `python mnist_v2.py --task dump_config` 对神经网络结构进行序列化,结果会写入当前运行目录下的`trainer_config.bin`文件中。 + 对[手写数字识别](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense)这个示例,[`mnist_v2.py`](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense/mnist_v2.py)脚本集成了序列化神经网络结构的过程,可以直接运行 `python mnist_v2.py --task dump_config` 对神经网络结构进行序列化,结果会写入当前运行目录下的`trainer_config.bin`文件中。 使用这种方式,需要**在运行时将神经网络的多个可学习参数放在同一个目录中**,C-API可以通过分别指定序列化后的网络结构文件和参数目录来加载训练好的模型。 @@ -68,7 +68,7 @@ merge_v2_model(net, param_file, output_file) ``` - 对[手写数字识别](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense)这个示例,可直接运行 `python` [merge_v2_model.py](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference/dense/merge_v2_model.py)。序列化结果会写入当前运行目录下的`output.paddle.model`文件中。使用这种方式,运行时C-API可以通过指定`output.paddle.model`文件的路径来加载预测模型。 + 对[手写数字识别](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense)这个示例,可直接运行 `python` [merge_v2_model.py](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference/dense/merge_v2_model.py)。序列化结果会写入当前运行目录下的`output.paddle.model`文件中。使用这种方式,运行时C-API可以通过指定`output.paddle.model`文件的路径来加载预测模型。 #### 注意事项 1. 为使用C-API,在调用`dump_v2_config`序列化神经网络结构时,参数`binary`必须指定为`True`。 @@ -77,10 +77,10 @@ ### 编写预测代码 -预测代码更多详细示例代码请参考[C-API使用示例](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/capi/examples/model_inference) 目录下的代码示例。这一节对图1中预测代码编写的5个步骤进行介绍和说明。 +预测代码更多详细示例代码请参考[C-API使用示例](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/legacy/capi/examples/model_inference) 目录下的代码示例。这一节对图1中预测代码编写的5个步骤进行介绍和说明。 #### step 1. 初始化PaddlePaddle运行环境 -第一步需调用[`paddle_init`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/main.h#L27) 初始化PaddlePaddle运行环境,该接口接受两个参数:参数的个数和参数列表。 +第一步需调用[`paddle_init`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/main.h#L27) 初始化PaddlePaddle运行环境,该接口接受两个参数:参数的个数和参数列表。 #### step2. 加载模型 @@ -88,8 +88,8 @@ 概念上,在 PaddlePaddle 内部,一个GradientMachine类的对象管理着一组计算层(PaddlePaddle Layers)来完成前向和反向计算,并处理与之相关的所有细节。在调用C-API预测时,只需进行前向计算而无需调用反向计算。这篇文档之后部分会使用`gradient machine`来特指调用PaddlePaddle C-API创建的GradientMachine类的对象。每一个 `gradient machine` 都会管理维护一份训练好的模型,下面是C-API提供的,两种常用的模型加载方式: -1. 调用[`paddle_gradient_machine_load_parameter_from_disk`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/gradient_machine.h#L61)接口,从磁盘加载预测模型。这时`gradient machine`会独立拥有一份训练好的模型; -1. 调用[`paddle_gradient_machine_create_shared_param`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/gradient_machine.h#L88)接口,与其它`gradient machine`的共享已经加载的预测模型。这种情况多出现在使用多线程预测时,通过多个线程共享同一个模型来减少内存开销。可参考[此示例](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/examples/model_inference/multi_thread/main.c)。 +1. 调用[`paddle_gradient_machine_load_parameter_from_disk`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/gradient_machine.h#L61)接口,从磁盘加载预测模型。这时`gradient machine`会独立拥有一份训练好的模型; +1. 调用[`paddle_gradient_machine_create_shared_param`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/gradient_machine.h#L88)接口,与其它`gradient machine`的共享已经加载的预测模型。这种情况多出现在使用多线程预测时,通过多个线程共享同一个模型来减少内存开销。可参考[此示例](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/examples/model_inference/multi_thread/main.c)。 - 注意事项 @@ -117,7 +117,7 @@ C-API支持的所有输入数据类型和他们的组织方式,请参考“输 #### step 4. 前向计算 -完成上述准备之后,通过调用 [`paddle_gradient_machine_forward`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/capi/gradient_machine.h#L73) 接口完成神经网络的前向计算。 +完成上述准备之后,通过调用 [`paddle_gradient_machine_forward`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/legacy/capi/gradient_machine.h#L73) 接口完成神经网络的前向计算。 #### step 5. 清理 From 10fbb831edd0225d34639b5de476453a5ed0c1e0 Mon Sep 17 00:00:00 2001 From: qingqing01 Date: Tue, 10 Jul 2018 13:07:43 +0800 Subject: [PATCH 20/22] Skip BatchNorm when feature only has 1 element. (#11578) * Fix batch norm when only 1 elements in normzalize dimension during training. --- paddle/fluid/operators/batch_norm_op.cc | 21 ++++- paddle/fluid/operators/batch_norm_op.cu.cc | 77 +++++++++++-------- paddle/fluid/operators/cross_entropy_op.cc | 3 +- .../unittests/test_fake_dequantize_op.py | 1 - .../fluid/tests/unittests/test_parallel_op.py | 4 +- 5 files changed, 66 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/operators/batch_norm_op.cc b/paddle/fluid/operators/batch_norm_op.cc index 693bf973c2..5912a1a17c 100644 --- a/paddle/fluid/operators/batch_norm_op.cc +++ b/paddle/fluid/operators/batch_norm_op.cc @@ -216,6 +216,18 @@ class BatchNormKernel saved_mean_e.setZero(); saved_variance_e.setZero(); + EigenVectorArrayMap running_mean_arr( + mean_out->mutable_data(ctx.GetPlace()), C); + EigenVectorArrayMap running_var_arr( + variance_out->mutable_data(ctx.GetPlace()), C); + + if ((N * sample_size) == 1) { + LOG(WARNING) << "Only 1 element in normalization dimension, " + << "we skip the batch norm calculation, let y = x."; + framework::TensorCopySync(*x, ctx.GetPlace(), y); + return; + } + switch (data_layout) { case DataLayout::kNCHW: { ConstEigenArrayMap x_arr(x->data(), sample_size, N * C); @@ -247,10 +259,6 @@ class BatchNormKernel PADDLE_THROW("Unknown storage order: %s", data_layout_str); } - EigenVectorArrayMap running_mean_arr( - mean_out->mutable_data(ctx.GetPlace()), C); - EigenVectorArrayMap running_var_arr( - variance_out->mutable_data(ctx.GetPlace()), C); running_mean_arr = running_mean_arr * momentum + saved_mean_e * (1. - momentum); running_var_arr = @@ -427,6 +435,11 @@ class BatchNormGradKernel d_bias_arr.setZero(); d_scale_arr.setZero(); + if ((N * sample_size) == 1) { + framework::TensorCopySync(*d_y, ctx.GetPlace(), d_x); + return; + } + const auto scale_inv_var_nhw = scale_arr * inv_var_arr / (N * sample_size); switch (data_layout) { diff --git a/paddle/fluid/operators/batch_norm_op.cu.cc b/paddle/fluid/operators/batch_norm_op.cu.cc index 550dd32d36..ca6cd86693 100644 --- a/paddle/fluid/operators/batch_norm_op.cu.cc +++ b/paddle/fluid/operators/batch_norm_op.cu.cc @@ -72,6 +72,9 @@ class BatchNormKernel int N, C, H, W, D; ExtractNCWHD(x_dims, data_layout, &N, &C, &H, &W, &D); + auto *y = ctx.Output("Y"); + y->mutable_data(ctx.GetPlace()); + // ------------------- cudnn descriptors --------------------- cudnnTensorDescriptor_t data_desc_; cudnnTensorDescriptor_t bn_param_desc_; @@ -93,7 +96,7 @@ class BatchNormKernel mode_ = CUDNN_BATCHNORM_SPATIAL; #endif - VLOG(1) << "Setting descriptors."; + VLOG(3) << "Setting descriptors."; std::vector dims; std::vector strides; if (data_layout == DataLayout::kNCHW) { @@ -113,11 +116,6 @@ class BatchNormKernel const auto *scale = ctx.Input("Scale"); const auto *bias = ctx.Input("Bias"); - auto *y = ctx.Output("Y"); - - // alloc memory - y->mutable_data(ctx.GetPlace()); - auto &dev_ctx = ctx.template device_context(); auto handle = dev_ctx.cudnn_handle(); @@ -162,22 +160,28 @@ class BatchNormKernel functor(dev_ctx, saved_mean, static_cast>(0)); functor(dev_ctx, saved_variance, static_cast>(0)); - double this_factor = 1. - momentum; - - CUDNN_ENFORCE(platform::dynload::cudnnBatchNormalizationForwardTraining( - handle, mode_, CudnnDataType::kOne(), CudnnDataType::kZero(), - data_desc_, x->template data(), data_desc_, - y->template mutable_data(ctx.GetPlace()), bn_param_desc_, - scale->template data>(), - bias->template data>(), this_factor, - mean_out->template mutable_data>( - ctx.GetPlace()), - variance_out->template mutable_data>( - ctx.GetPlace()), - epsilon, saved_mean->template mutable_data>( - ctx.GetPlace()), - saved_variance->template mutable_data>( - ctx.GetPlace()))); + if ((N * H * W * D) == 1) { + LOG(WARNING) << "Only 1 element in normalization dimension, " + << "we skip the batch norm calculation, let y = x."; + framework::TensorCopySync(*x, ctx.GetPlace(), y); + } else { + double this_factor = 1. - momentum; + + CUDNN_ENFORCE(platform::dynload::cudnnBatchNormalizationForwardTraining( + handle, mode_, CudnnDataType::kOne(), CudnnDataType::kZero(), + data_desc_, x->template data(), data_desc_, + y->template mutable_data(ctx.GetPlace()), bn_param_desc_, + scale->template data>(), + bias->template data>(), this_factor, + mean_out->template mutable_data>( + ctx.GetPlace()), + variance_out->template mutable_data>( + ctx.GetPlace()), + epsilon, saved_mean->template mutable_data>( + ctx.GetPlace()), + saved_variance->template mutable_data>( + ctx.GetPlace()))); + } } // clean when exit. @@ -209,6 +213,25 @@ class BatchNormGradKernel int N, C, H, W, D; ExtractNCWHD(x_dims, data_layout, &N, &C, &H, &W, &D); + // init output + auto *d_x = ctx.Output(framework::GradVarName("X")); + auto *d_scale = ctx.Output(framework::GradVarName("Scale")); + auto *d_bias = ctx.Output(framework::GradVarName("Bias")); + + d_x->mutable_data(ctx.GetPlace()); + d_scale->mutable_data(ctx.GetPlace()); + d_bias->mutable_data(ctx.GetPlace()); + + auto &dev_ctx = ctx.template device_context(); + if ((N * H * W * D) == 1) { + framework::TensorCopySync(*d_y, ctx.GetPlace(), d_x); + math::SetConstant> + functor; + functor(dev_ctx, d_scale, static_cast>(0)); + functor(dev_ctx, d_bias, static_cast>(0)); + return; + } + PADDLE_ENFORCE_EQ(scale->dims().size(), 1UL); PADDLE_ENFORCE_EQ(scale->dims()[0], C); @@ -247,21 +270,11 @@ class BatchNormGradKernel CUDNN_ENFORCE(platform::dynload::cudnnDeriveBNTensorDescriptor( bn_param_desc_, data_desc_, mode_)); - // init output - auto *d_x = ctx.Output(framework::GradVarName("X")); - auto *d_scale = ctx.Output(framework::GradVarName("Scale")); - auto *d_bias = ctx.Output(framework::GradVarName("Bias")); - - d_x->mutable_data(ctx.GetPlace()); - d_scale->mutable_data(ctx.GetPlace()); - d_bias->mutable_data(ctx.GetPlace()); - const auto *saved_mean = ctx.Input("SavedMean"); const auto *saved_var = ctx.Input("SavedVariance"); const void *saved_mean_data = saved_mean->template data(); const void *saved_var_data = saved_var->template data(); - auto &dev_ctx = ctx.template device_context(); CUDNN_ENFORCE(platform::dynload::cudnnBatchNormalizationBackward( dev_ctx.cudnn_handle(), mode_, CudnnDataType::kOne(), CudnnDataType::kZero(), CudnnDataType::kOne(), diff --git a/paddle/fluid/operators/cross_entropy_op.cc b/paddle/fluid/operators/cross_entropy_op.cc index d5e095f9ca..a3bec3da45 100644 --- a/paddle/fluid/operators/cross_entropy_op.cc +++ b/paddle/fluid/operators/cross_entropy_op.cc @@ -124,8 +124,7 @@ class CrossEntropyOpMaker : public framework::OpProtoAndCheckerMaker { "Tensor with shape [N x D]."); AddOutput("Y", "(Tensor, default Tensor), a 2-D tensor with shape " - "[N x 1]. The cross entropy loss.") - .Reuse("X"); + "[N x 1]. The cross entropy loss."); AddAttr("soft_label", "(bool, default false), a flag indicating whether to " "interpretate the given labels as soft labels.") diff --git a/python/paddle/fluid/tests/unittests/test_fake_dequantize_op.py b/python/paddle/fluid/tests/unittests/test_fake_dequantize_op.py index 281068e945..026ac2112b 100644 --- a/python/paddle/fluid/tests/unittests/test_fake_dequantize_op.py +++ b/python/paddle/fluid/tests/unittests/test_fake_dequantize_op.py @@ -40,7 +40,6 @@ class TestFakeDequantizeMaxAbsOp(OpTest): self.op_type = "fake_dequantize_max_abs" x = np.random.randn(31, 65).astype("float32") yq, scale = quantize_max_abs(x, self.num_bits) - print 'scale ', scale ydq = dequantize_max_abs(yq, self.num_bits, scale) self.inputs = {'X': yq} diff --git a/python/paddle/fluid/tests/unittests/test_parallel_op.py b/python/paddle/fluid/tests/unittests/test_parallel_op.py index 79bea148f9..9ba5f988f3 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_op.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_op.py @@ -113,7 +113,9 @@ class BaseParallelForTest(unittest.TestCase): generator = callback() # Automatically insert parallel do if use_parallel = True if use_parallel: - places = fluid.layers.get_places() + thread_num = fluid.core.get_cuda_device_count( + ) if use_gpu else 8 + places = fluid.layers.get_places(thread_num) pd = fluid.layers.ParallelDo(places, use_nccl=use_nccl) data = next(generator) From ab9a49b8285b62e73e311cfceb8d4c2721b44e52 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 10 Jul 2018 13:31:57 +0800 Subject: [PATCH 21/22] follow comments --- CMakeLists.txt | 5 +++++ cmake/external/mkldnn.cmake | 3 --- cmake/external/mklml.cmake | 3 --- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 23bb27e77b..3169086044 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,11 @@ if(ANDROID OR IOS) add_definitions(-DPADDLE_MOBILE_INFERENCE) endif() +if (APPLE) + set(WITH_MKL OFF CACHE STRING + "Disable MKL for building on mac" FORCE) +endif() + set(THIRD_PARTY_PATH "${CMAKE_BINARY_DIR}/third_party" CACHE STRING "A path setting third party libraries download & build directories.") diff --git a/cmake/external/mkldnn.cmake b/cmake/external/mkldnn.cmake index 5c09507163..20dda35c5c 100644 --- a/cmake/external/mkldnn.cmake +++ b/cmake/external/mkldnn.cmake @@ -28,9 +28,6 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLDNN in Paddle yet." "Force WITH_MKLDNN=OFF") SET(WITH_MKLDNN OFF CACHE STRING "Disable MKLDNN in Windows and MacOS" FORCE) - IF ((NOT ${WITH_MKLDNN}) AND (NOT ${WITH_MKLML})) - SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) - ENDIF() return() ENDIF() diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index ba00f0fe63..82c424fb79 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -21,9 +21,6 @@ IF(WIN32 OR APPLE) "Windows or Mac is not supported with MKLML in Paddle yet." "Force WITH_MKLML=OFF") SET(WITH_MKLML OFF CACHE STRING "Disable MKLML package in Windows and MacOS" FORCE) - IF ((NOT ${WITH_MKLDNN}) AND (NOT ${WITH_MKLML})) - SET(WITH_MKL OFF CACHE STRING "Disable MKL for later scripts" FORCE) - ENDIF() return() ENDIF() From 81d382936c084cca7fa0c244cca92708780e31f3 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 10 Jul 2018 13:40:13 +0800 Subject: [PATCH 22/22] disable mkl for windows --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3169086044..db3c3b8e20 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,9 +103,9 @@ if(ANDROID OR IOS) add_definitions(-DPADDLE_MOBILE_INFERENCE) endif() -if (APPLE) +if (APPLE OR WIN32) set(WITH_MKL OFF CACHE STRING - "Disable MKL for building on mac" FORCE) + "Disable MKL for building on mac and windows" FORCE) endif() set(THIRD_PARTY_PATH "${CMAKE_BINARY_DIR}/third_party" CACHE STRING