From af242901232464d8a59d26cba9084ffe22562fdf Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 4 Apr 2018 15:05:46 +0800 Subject: [PATCH 01/33] Add 'buffer_size' api for open_files op --- paddle/fluid/operators/reader/open_files_op.cc | 15 ++++++++++----- python/paddle/fluid/layers/io.py | 12 ++++++++++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index eacedeea88..db4e619e7b 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -38,8 +38,9 @@ class MultipleReader : public framework::ReaderBase { }; MultipleReader(const std::vector& file_names, - const std::vector& dims, size_t thread_num) - : file_names_(file_names), dims_(dims) { + const std::vector& dims, size_t thread_num, + size_t buffer_size) + : file_names_(file_names), dims_(dims), buffer_size_(buffer_size) { prefetchers_.resize(thread_num); StartNewScheduler(); } @@ -60,6 +61,7 @@ class MultipleReader : public framework::ReaderBase { std::vector dims_; std::thread scheduler_; std::vector prefetchers_; + size_t buffer_size_; framework::Channel* waiting_file_idx_; framework::Channel* available_thread_idx_; framework::Channel>* buffer_; @@ -92,7 +94,7 @@ void MultipleReader::StartNewScheduler() { waiting_file_idx_ = framework::MakeChannel(file_names_.size()); available_thread_idx_ = framework::MakeChannel(thread_num); buffer_ = - framework::MakeChannel>(thread_num); + framework::MakeChannel>(buffer_size_); for (size_t i = 0; i < file_names_.size(); ++i) { waiting_file_idx_->Send(&i); @@ -197,11 +199,13 @@ class OpenFilesOp : public framework::OperatorBase { const auto& file_names = Attr>("file_names"); PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); const size_t thread_num = Attr("thread_num"); + const size_t buffer_size = Attr("buffer_size"); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new MultipleReader( - file_names, RestoreShapes(shape_concat, ranks), thread_num)); + out->Reset(new MultipleReader(file_names, + RestoreShapes(shape_concat, ranks), + thread_num, buffer_size)); } }; @@ -212,6 +216,7 @@ class OpenFilesOpMaker : public FileReaderMakerBase { AddAttr>("file_names", "Files to be read."); AddAttr("thread_num", "The maximal concurrent prefetch thread number.") .GreaterThan(0); + AddAttr("buffer_size", "The size of prefetch buffer.").GreaterThan(0); AddComment(R"DOC( OpenFiles Operator diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index bd7e9c30fe..da5b4853d3 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -287,7 +287,14 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes): startup_var) -def open_files(filenames, thread_num, shapes, lod_levels, dtypes): +def open_files(filenames, + shapes, + lod_levels, + dtypes, + thread_num, + buffer_size=None): + if buffer_size is None: + buffer_size = thread_num dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] @@ -308,7 +315,8 @@ def open_files(filenames, thread_num, shapes, lod_levels, dtypes): 'lod_levels': lod_levels, 'ranks': ranks, 'file_names': filenames, - 'thread_num': thread_num + 'thread_num': thread_num, + 'buffer_size': buffer_size }) startup_var.desc.set_dtypes(dtypes) From 6dcfd97a9285161efa767516d466a084b6a45bed Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 4 Apr 2018 15:35:28 +0800 Subject: [PATCH 02/33] add docstring --- python/paddle/fluid/layers/io.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index da5b4853d3..97ac01b775 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -293,8 +293,40 @@ def open_files(filenames, dtypes, thread_num, buffer_size=None): + """ + Open files + + This layer takes a list of files to read from and returns a Reader Variable. Via the Reader Variable, we can get data from given files. + + Args: + filenames(list): The list of file names. + shapes(list): List of tuples which declaring data shapes. + lod_levels(list): List of ints which declaring data lod_level. + dtypes(list): List of strs which declaring data type. + thread_num(int): The maximal concurrent prefetch thread number. + buffer_size(int): The size of prefetch buffer. + + Returns: + Variable: A Reader Variable via which we can get file data. + + Examples: + .. code-block:: python + + reader = fluid.layers.open_files(filenames=['./data1.recordio', + './data2.recordio'], + shapes=[(3,224,224), (1)], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + thread_num=2, + buffer_size=2) + + # Via the reader, we can use 'read_file' layer to get data: + image, label = fluid.layers.read_file(reader) + """ if buffer_size is None: buffer_size = thread_num + if isinstance(filenames, basestring): + filenames = [filenames] dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] From 442c150333ce169b9e1221c0f2e61af8cfdc1e2b Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 4 Apr 2018 20:35:59 +0800 Subject: [PATCH 03/33] a draft of ThreadedReader --- .../reader/create_threaded_reader_op.cc | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 paddle/fluid/operators/reader/create_threaded_reader_op.cc diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc new file mode 100644 index 0000000000..a4aebafa8b --- /dev/null +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -0,0 +1,125 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/detail/safe_ref.h" +#include "paddle/fluid/operators/reader/reader_op_registry.h" + +namespace paddle { +namespace operators { +namespace reader { + +class ThreadedReader : public framework::DecoratedReader { + public: + ThreadedReader(ReaderBase* reader, bool unsafe_mode) + : DecoratedReader(reader), unsafe_mode_(unsafe_mode) {} + + void ReadNext(std::vector* out) override { + std::lock_guard lock(mutex_); + if (!unsafe_mode) { + if (!reader_->HasNext()) { + PADDLE_THROW("There is no next data!"); + } + reader_->ReadNext(out); + } else { + auto& thread_buffer = thread_buffers_[std::this_thread::get_id()]; + if (thread_buffer.empty()) { + PADDLE_THROW( + "thread_buffer is empty! HasNext() must be invoked before " + "ReadNext() in the same thread."); + } + *out = thread_buffer; + thread_buffer.clear(); + } + } + + bool HasNext() const override { + if (!unsafe_mode_) { + PADDLE_THROW( + "ThreadedReader::HasNext() is disabled when 'unsafe_mode' is false."); + } + std::thread::id thread_id = std::this_thread::get_id(); + std::lock_guard lock(mutex_); + auto& thread_buffer = thread_buffers_[thread_id]; + if (thread_buffer.empty() && reader_->HasNext()) { + reader_->ReadNext(&thread_buffer); + } + return !threda_buffer.empty(); + } + + void ReInit() override; + + ~ThreadedReader() { + for (auto& p : thread_buffers_) { + if (!p.second.empty()) { + PADDLE_THROW( + "Find an unused data batch in ThreadedReader! Maybe one thread " + "invokes 'HasNext()' without subsequent 'ReadNext()'."); + } + } + } + + private: + mutable std::mutex mutex_; + mutable std::unordered_map> + thread_buffers_; +}; + +class CreateThreadedReaderOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + auto* out = detail::Ref(scope.FindVar(Output("Out"))) + .GetMutable(); + if (out->Get() != nullptr) { + return; + } + const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) + ->Get(); + bool unsafe_mode = Attr("unsafe_mode"); + out->Reset(new ThreadedReader(underlying_reader.Get(), unsafe_mode)); + } +}; + +class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { + public: + CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) + : DecoratedReaderMakerBase(op_proto, op_checker) { + AddAttr("unsafe_mode", + "When 'unsafe_mode' is false, invoking 'HasNext()' or " + "'ReInit()' is not allowed to avoid unexpected bugs in " + "multi-thread environment.") + .SetDefault(false); + AddComment(R"DOC( + CreateThreadedReader Operator + + This operator creates a threaded reader. A threaded reader's + 'ReadNext()' can be invoked by several threads at the same + time. + When the attribute 'unsafe_mode' is false, the threaded reader's + 'HasNext()' and 'ReInit()' will be disabled to avoid unexpected + bugs in multi-thread environment. If you really need them, you + can enable them by setting 'unsafe_mode' true. In this case, + 'HasNext()' returning true only guarantees the safety of + invoking 'ReadNext()' in the same thread. Each thread must + invoke 'HasNext()' and 'ReadNext()' in pair. + )DOC") + } +}; + +} // namespace reader +} // namespace operators +} // namespace paddle From 8fed780f14bf24954300ba37cebd2338ee7d199c Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Apr 2018 11:26:06 +0800 Subject: [PATCH 04/33] Complete threaded reader --- paddle/fluid/operators/reader/CMakeLists.txt | 1 + .../operators/reader/create_threaded_reader_op.cc | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 6fa0195b9a..845528860f 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) +reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) # Export local libraries to parent set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index a4aebafa8b..489866ca80 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -57,7 +57,15 @@ class ThreadedReader : public framework::DecoratedReader { return !threda_buffer.empty(); } - void ReInit() override; + void ReInit() override { + if (!unsafe_mode_) { + PADDLE_THROW( + "ThreadedReader::ReInit() is disabled when 'unsafe_mode' is false."); + } + VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in " + "multi-thread environment."; + reader_->ReInit(); + } ~ThreadedReader() { for (auto& p : thread_buffers_) { @@ -123,3 +131,8 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { } // namespace reader } // namespace operators } // namespace paddle + +namespace reader = paddle::operators::reader; +REGISTER_FILE_READER_OPERATOR(create_threaded_reader, + reader::CreateThreadedReaderOp, + reader::CreateThreadedReaderOpMaker); From 03ff0e58fe433496330801627e0ae2f15e21df20 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Sun, 8 Apr 2018 04:25:56 +0000 Subject: [PATCH 05/33] fix compile errors --- paddle/fluid/operators/reader/create_threaded_reader_op.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 489866ca80..565cbe4d9f 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -26,7 +26,7 @@ class ThreadedReader : public framework::DecoratedReader { void ReadNext(std::vector* out) override { std::lock_guard lock(mutex_); - if (!unsafe_mode) { + if (!unsafe_mode_) { if (!reader_->HasNext()) { PADDLE_THROW("There is no next data!"); } @@ -54,7 +54,7 @@ class ThreadedReader : public framework::DecoratedReader { if (thread_buffer.empty() && reader_->HasNext()) { reader_->ReadNext(&thread_buffer); } - return !threda_buffer.empty(); + return !thread_buffer.empty(); } void ReInit() override { @@ -78,6 +78,7 @@ class ThreadedReader : public framework::DecoratedReader { } private: + bool unsafe_mode_; mutable std::mutex mutex_; mutable std::unordered_map> thread_buffers_; @@ -124,7 +125,7 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { 'HasNext()' returning true only guarantees the safety of invoking 'ReadNext()' in the same thread. Each thread must invoke 'HasNext()' and 'ReadNext()' in pair. - )DOC") + )DOC"); } }; From 49ab52d64d8aced5da6d4eedd34773baebae5546 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Apr 2018 13:01:26 +0800 Subject: [PATCH 06/33] Modify MultipleReader 1. Removes MultipleReader's multi-thread support, for we have got ThreadedReader. 2. Rename MultipleReader to MultiFileReader --- .../reader/create_threaded_reader_op.cc | 2 +- .../fluid/operators/reader/open_files_op.cc | 66 +++++++------------ 2 files changed, 24 insertions(+), 44 deletions(-) diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 565cbe4d9f..854381e0ee 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -124,7 +124,7 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { can enable them by setting 'unsafe_mode' true. In this case, 'HasNext()' returning true only guarantees the safety of invoking 'ReadNext()' in the same thread. Each thread must - invoke 'HasNext()' and 'ReadNext()' in pair. + invoke 'HasNext()' and 'ReadNext()' in pairs. )DOC"); } }; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index db4e619e7b..45db94e780 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -19,27 +19,11 @@ namespace paddle { namespace operators { namespace reader { -class MultipleReader : public framework::ReaderBase { +class MultiFileReader : public framework::ReaderBase { public: - class ThreadBufferMap { - public: - std::vector& operator[]( - const std::thread::id& thread_id) { - std::lock_guard lock(mutex_); - return buffer_[thread_id]; - } - - void Clear() { buffer_.clear(); } - - private: - std::mutex mutex_; - std::unordered_map> - buffer_; - }; - - MultipleReader(const std::vector& file_names, - const std::vector& dims, size_t thread_num, - size_t buffer_size) + MultiFileReader(const std::vector& file_names, + const std::vector& dims, size_t thread_num, + size_t buffer_size) : file_names_(file_names), dims_(dims), buffer_size_(buffer_size) { prefetchers_.resize(thread_num); StartNewScheduler(); @@ -49,7 +33,7 @@ class MultipleReader : public framework::ReaderBase { bool HasNext() const override; void ReInit() override; - ~MultipleReader() { EndScheduler(); } + ~MultiFileReader() { EndScheduler(); } private: void StartNewScheduler(); @@ -65,31 +49,27 @@ class MultipleReader : public framework::ReaderBase { framework::Channel* waiting_file_idx_; framework::Channel* available_thread_idx_; framework::Channel>* buffer_; - mutable ThreadBufferMap thread_buffer_map_; }; -void MultipleReader::ReadNext(std::vector* out) { +void MultiFileReader::ReadNext(std::vector* out) { if (!HasNext()) { PADDLE_THROW("There is no next data!"); } - auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()]; - *out = thread_local_buffer; - thread_local_buffer.clear(); + buffer_->Receive(out); } -bool MultipleReader::HasNext() const { - auto& thread_local_buffer = thread_buffer_map_[std::this_thread::get_id()]; - return thread_local_buffer.empty() ? buffer_->Receive(&thread_local_buffer) - : true; +bool MultiFileReader::HasNext() const { + while (!buffer_->IsClosed() && !buffer_->CanReceive()) { + } + return buffer_->CanReceive(); } -void MultipleReader::ReInit() { +void MultiFileReader::ReInit() { EndScheduler(); - thread_buffer_map_.Clear(); StartNewScheduler(); } -void MultipleReader::StartNewScheduler() { +void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_file_idx_ = framework::MakeChannel(file_names_.size()); available_thread_idx_ = framework::MakeChannel(thread_num); @@ -107,7 +87,7 @@ void MultipleReader::StartNewScheduler() { scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); } -void MultipleReader::EndScheduler() { +void MultiFileReader::EndScheduler() { available_thread_idx_->Close(); buffer_->Close(); waiting_file_idx_->Close(); @@ -119,8 +99,8 @@ void MultipleReader::EndScheduler() { delete waiting_file_idx_; } -void MultipleReader::ScheduleThreadFunc() { - VLOG(5) << "MultipleReader schedule thread starts."; +void MultiFileReader::ScheduleThreadFunc() { + VLOG(5) << "MultiFileReader schedule thread starts."; size_t completed_thread_num = 0; size_t thread_idx; while (available_thread_idx_->Receive(&thread_idx)) { @@ -152,11 +132,11 @@ void MultipleReader::ScheduleThreadFunc() { p.join(); } } - VLOG(5) << "MultipleReader schedule thread terminates."; + VLOG(5) << "MultiFileReader schedule thread terminates."; } -void MultipleReader::PrefetchThreadFunc(std::string file_name, - size_t thread_idx) { +void MultiFileReader::PrefetchThreadFunc(std::string file_name, + size_t thread_idx) { VLOG(5) << "The prefetch thread of file '" << file_name << "' starts."; std::unique_ptr reader = CreateReaderByFileName(file_name, dims_); @@ -203,9 +183,9 @@ class OpenFilesOp : public framework::OperatorBase { auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new MultipleReader(file_names, - RestoreShapes(shape_concat, ranks), - thread_num, buffer_size)); + out->Reset(new MultiFileReader(file_names, + RestoreShapes(shape_concat, ranks), + thread_num, buffer_size)); } }; @@ -221,7 +201,7 @@ class OpenFilesOpMaker : public FileReaderMakerBase { AddComment(R"DOC( OpenFiles Operator - An OpenFilesOp creates a MultipleReader, which is able to + An OpenFilesOp creates a MultiFileReader, which is able to read data multi-threaded from multiple files. )DOC"); } From fca9e8847d5017601251ee8813e7af513b2603ed Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Sun, 8 Apr 2018 16:10:14 +0800 Subject: [PATCH 07/33] Update Readers Python API 1. Combine 'open_files', 'multi_pass_reader' and 'threaded_reader' together to make the new 'open_files' interface. 2. Add some docstring. 3. Simplify interface names of 'create_XXX_reader', e.g, rename 'create_double_buffer_reader' to 'double_buffer'. --- python/paddle/fluid/layers/io.py | 109 ++++++++++++++++++++++++------- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 7413e69234..fc8809ce15 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -22,7 +22,7 @@ from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', 'open_files', 'read_file', 'create_shuffle_reader', - 'create_double_buffer_reader', 'create_multi_pass_reader' + 'create_double_buffer_reader' ] @@ -283,7 +283,43 @@ def _copy_reader_create_op_(block, op): return new_op -def open_recordio_file(filename, shapes, lod_levels, dtypes): +def open_recordio_file(filename, + shapes, + lod_levels, + dtypes, + pass_num=1, + for_parallel=False): + """ + Open a RecordIO file + + This layer takes a RecordIO file to read from and returns a Reader Variable. + Via the Reader Variable, we can get data from the given RecordIO file. + + Args: + filename(str): The RecordIO file's name. + shapes(list): List of tuples which declaring data shapes. + lod_levels(list): List of ints which declaring data lod_level. + dtypes(list): List of strs which declaring data type. + pass_num(int): Number of passes to run. After completing the + given number of passes, 'has_next()' will return False. + for_parallel(Bool): Set it as True if you are going to run + subsequent operators in parallel. + + Returns: + Variable: A Reader Variable via which we can get RecordIO file data. + + Examples: + .. code-block:: python + + reader = fluid.layers.io.open_recordio_file( + filename='./data.recordio', + shapes=[(3,224,224), (1)], + lod_levels=[0, 0], + dtypes=['float32', 'int64']) + + # Via the reader, we can use 'read_file' layer to get data: + image, label = fluid.layers.read_file(reader) + """ dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] @@ -310,6 +346,13 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes): startup_var.persistable = True main_prog_var = _copy_reader_var_(default_main_program().current_block(), startup_var) + + if pass_num > 1: + main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) + + if for_parallel: + main_prog_var = for_parallel(reader=main_prog_var) + return monkey_patch_reader_methods(main_prog_var) @@ -318,11 +361,15 @@ def open_files(filenames, lod_levels, dtypes, thread_num, - buffer_size=None): + buffer_size=None, + pass_num=1, + for_parallel=False): """ Open files - This layer takes a list of files to read from and returns a Reader Variable. Via the Reader Variable, we can get data from given files. + This layer takes a list of files to read from and returns a Reader Variable. + Via the Reader Variable, we can get data from given files. All files must + have name suffixs to indicate their formats, e.g., '*.recordio'. Args: filenames(list): The list of file names. @@ -331,6 +378,10 @@ def open_files(filenames, dtypes(list): List of strs which declaring data type. thread_num(int): The maximal concurrent prefetch thread number. buffer_size(int): The size of prefetch buffer. + pass_num(int): Number of passes to run. After completing the + given number of passes, 'has_next()' will return False. + for_parallel(Bool): Set it as True if you are going to run + subsequent operators in parallel. Returns: Variable: A Reader Variable via which we can get file data. @@ -338,16 +389,16 @@ def open_files(filenames, Examples: .. code-block:: python - reader = fluid.layers.open_files(filenames=['./data1.recordio', + reader = fluid.layers.io.open_files(filenames=['./data1.recordio', './data2.recordio'], - shapes=[(3,224,224), (1)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) + shapes=[(3,224,224), (1)], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + thread_num=2, + buffer_size=2) # Via the reader, we can use 'read_file' layer to get data: - image, label = fluid.layers.read_file(reader) + image, label = fluid.layers.io.read_file(reader) """ if buffer_size is None: buffer_size = thread_num @@ -361,13 +412,12 @@ def open_files(filenames, shape_concat.extend(shape) ranks.append(len(shape)) - var_name = unique_name('multiple_reader') - + multi_file_reader_name = unique_name('multi_file_reader') startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) + startup_reader = startup_blk.create_var(name=multi_file_reader_name) startup_blk.append_op( type='open_files', - outputs={'Out': [startup_var]}, + outputs={'Out': [startup_reader]}, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, @@ -377,14 +427,21 @@ def open_files(filenames, 'buffer_size': buffer_size }) - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - return monkey_patch_reader_methods(main_prog_var) + startup_reader.desc.set_dtypes(dtypes) + startup_reader.persistable = True + main_prog_reader = _copy_reader_var_(default_main_program().current_block(), + startup_reader) + if pass_num > 1: + main_prog_reader = multi_pass( + reader=main_prog_reader, pass_num=pass_num) + if for_parallel: + main_prog_reader = for_parallel(reader=main_prog_reader) -def __create_decorated_reader__(op_type, reader, attrs): + return monkey_patch_reader_methods(main_prog_reader) + + +def __create_decorated_reader__(op_type, reader, attrs={}): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) @@ -400,12 +457,12 @@ def __create_decorated_reader__(op_type, reader, attrs): return monkey_patch_reader_methods(main_prog_var) -def create_shuffle_reader(reader, buffer_size): +def shuffle(reader, buffer_size): return __create_decorated_reader__('create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) -def create_double_buffer_reader(reader, place=None): +def double_buffer(reader, place=None): attrs = dict() if place is not None: attrs['place'] = str(place).upper() @@ -413,11 +470,15 @@ def create_double_buffer_reader(reader, place=None): attrs) -def create_multi_pass_reader(reader, pass_num): +def multi_pass(reader, pass_num): return __create_decorated_reader__('create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) +def for_parallel(reader): + return __create_decorated_reader__('create_threaded_reader', reader) + + def read_file(file_obj): helper = LayerHelper('read_file') out = [ From 5ad2486905214e658a0ef8f54e9b447c1fec03b2 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Sun, 8 Apr 2018 09:15:58 +0000 Subject: [PATCH 08/33] fix errors --- python/paddle/fluid/layers/io.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index fc8809ce15..dbba1a46eb 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,8 +21,7 @@ from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'open_files', 'read_file', 'create_shuffle_reader', - 'create_double_buffer_reader' + 'open_files', 'read_file', 'shuffle', 'double_buffer' ] From 3f90a583b4e5f8a3534b03fb9ed83280ac2d69e4 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Mon, 9 Apr 2018 04:35:25 +0000 Subject: [PATCH 09/33] update unittest --- python/paddle/fluid/tests/unittests/test_multi_pass_reader.py | 2 +- python/paddle/fluid/tests/unittests/test_recordio_reader.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py index 0b7a290759..c8a8afbea6 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py @@ -44,7 +44,7 @@ class TestMultipleReader(unittest.TestCase): shapes=[(-1, 784), (-1, 1)], lod_levels=[0, 0], dtypes=['float32', 'int64']) - data_file = fluid.layers.create_multi_pass_reader( + data_file = fluid.layers.io.multi_pass( reader=data_file, pass_num=self.pass_num) img, label = fluid.layers.read_file(data_file) diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index 24a0074d9b..096d99a3f3 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -74,8 +74,8 @@ class TestRecordIO(unittest.TestCase): self.assertLess(avg_loss_np[-1], avg_loss_np[0]) def test_shuffle_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) + self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle(reader, buffer_size=200)) def test_double_buffer_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader, + self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader, place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) From 7e7611d06753a6eafafda8042ad473535895e07f Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Mon, 9 Apr 2018 12:53:28 +0800 Subject: [PATCH 10/33] when the number of samples of current batch is less than the count of devices, let it crash. --- paddle/fluid/framework/parallel_executor.cc | 5 +++++ python/paddle/fluid/parallel_executor.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 74945fb4f2..99b3065d8d 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -174,6 +174,11 @@ void ParallelExecutor::SplitTensorToPlaces( const std::unordered_map &feed_tensors) { for (auto it : feed_tensors) { auto lod_tensors = it.second.SplitLoDTensor(member_->places_); + PADDLE_ENFORCE_EQ( + member_->places_.size(), lod_tensors.size(), + "The number of samples of current batch is less than the count of " + "devices, currently, it is not allowed. (%d vs %d)", + member_->places_.size(), lod_tensors.size()); for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? member_->local_scopes_[j] diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index b93f2f974c..24dfa6144a 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -87,7 +87,8 @@ class ParallelExecutor(object): # performance. Worth tunning for other models in the future. num_threads = len(self._places) else: - min(len(self._places) * 2, multiprocessing.cpu_count()) + num_threads = min( + len(self._places) * 2, multiprocessing.cpu_count()) main = main_program main = main if main else framework.default_main_program() From 5416bac5d84b1d846744481505749df0a87db133 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Mon, 9 Apr 2018 15:47:46 +0800 Subject: [PATCH 11/33] Make shared decorated readers' creater be only in main_program --- .../reader/create_double_buffer_reader_op.cc | 9 ++++-- python/paddle/fluid/layers/io.py | 30 ++++++++++++++----- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ed868786ab..d9f799f14d 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -109,7 +109,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { auto place_str = Attr("place"); platform::Place place; - if (place_str == "CPU") { + if (place_str == "AUTO") { + place = dev_place; + } else if (place_str == "CPU") { place = platform::CPUPlace(); } else { std::istringstream sin(place_str); @@ -140,8 +142,9 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { enum_range.insert(string::Sprintf("CUDA:%d", i)); } enum_range.insert("CPU"); - AddAttr("place", "The double buffer place, default is CPU") - .SetDefault("CPU") + enum_range.insert("AUTO"); + AddAttr("place", "The double buffer place") + .SetDefault("AUTO") .InEnum({enum_range}); } }; diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index dbba1a46eb..4901521db5 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -440,7 +440,7 @@ def open_files(filenames, return monkey_patch_reader_methods(main_prog_reader) -def __create_decorated_reader__(op_type, reader, attrs={}): +def __create_unshared_decorated_reader__(op_type, reader, attrs={}): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) @@ -456,26 +456,40 @@ def __create_decorated_reader__(op_type, reader, attrs={}): return monkey_patch_reader_methods(main_prog_var) +def __create_shared_decorated_reader__(op_type, reader, attrs={}): + new_reader_name = unique_name(op_type) + main_blk = default_main_program().current_block() + new_reader = main_blk.create_var(name=new_reader_name) + main_blk.append_op( + type=op_type, + inputs={'UnderlyingReader': reader}, + outputs={'Out': [new_reader]}, + attrs=attrs) + new_reader.persistable = True + new_reader.stop_gradient = True + return monkey_patch_reader_methods(new_reader) + + def shuffle(reader, buffer_size): - return __create_decorated_reader__('create_shuffle_reader', reader, - {'buffer_size': int(buffer_size)}) + return __create_unshared_decorated_reader__( + 'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) def double_buffer(reader, place=None): attrs = dict() if place is not None: attrs['place'] = str(place).upper() - return __create_decorated_reader__('create_double_buffer_reader', reader, - attrs) + return __create_unshared_decorated_reader__('create_double_buffer_reader', + reader, attrs) def multi_pass(reader, pass_num): - return __create_decorated_reader__('create_multi_pass_reader', reader, - {'pass_num': int(pass_num)}) + return __create_shared_decorated_reader__( + 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) def for_parallel(reader): - return __create_decorated_reader__('create_threaded_reader', reader) + return __create_shared_decorated_reader__('create_threaded_reader', reader) def read_file(file_obj): From d05071f0b91f7bf5f5ee978dcdecd828cdb9df2a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 9 Apr 2018 15:57:25 +0800 Subject: [PATCH 12/33] k8s dist train for en --- .../multi_cluster/k8s_distributed_en.md | 366 +++++++++++++++++- 1 file changed, 364 insertions(+), 2 deletions(-) diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md index bc3d50b3ff..dfc0f0d3e6 100644 --- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md +++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md @@ -1,3 +1,365 @@ -# Kubernetes Distributed +# Kubernetes Distributed Training -TBD +We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the +previous document. +In this article, we will introduce how to craete a PaddlePaddle job with multiple nodes +on Kubernetes cluster. + +## Overall Architecture + +Before creating a training job, the users need to deploy the Python scripts and +training data which have already been sliced on the precast path in the distributed file +system(We can use the different type of Kuberentes Volumes to mount different distributed +file system). Before start training, The program would copy the training data into the +Container and also save the models at the same path during training. The global architecture +is as follows: + +![PaddlePaddle on Kubernetes Architecture](src/k8s-paddle-arch.png) + +The above figure describes a distributed training architecture which contains 3 nodes, each +Pod would mount a folder of the distributed file system to save training data and models +by Kubernetes Volume. Kubernetes created 3 Pod for this training phase and scheduled these on +3 nodes, each Pod has a PaddlePaddle container. After the containers have been created, +PaddlePaddle would start up the communication between PServer and Trainer and read training +data for this training job. + +As the description above, we can start up a PaddlePaddle distributed training job on a ready +Kubernetes cluster as the following steps: + +1. [Build PaddlePaddle Docker Image](#Build a Docker Image) +1. [Split training data and upload to the distributed file system](#Upload Training Data) +1. [Edit a YAML file and create a Kubernetes Job](#Create a Job) +1. [Check the output](#Check The Output) + +We will introduce these steps as follows: + +### Build a Docker Image + +PaddlePaddle Docker Image needs to support the runtime environment of `Paddle PServer` and +`Paddle Trainer` process and this Docker Image has the two import features: + +- Copy the training data into the container. +- Generate the start arguments of `Paddle PServer` and `Paddle Training` process. + +Because of the official Docker Image `paddlepaddle/paddle:latest` has already included the +PaddlePaddle executable file, but above features so that we can use the official Docker Image as +a base Image and add some additional scripts to finish the work of building a new image. +You can reference [Dockerfile](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile). + + +```bash +$ cd doc/howto/usage/k8s/src/k8s_train +$ docker build -t [YOUR_REPO]/paddle:mypaddle . +``` + +And then upload the new Docker Image to a Docker hub: + +```bash +docker push [YOUR_REPO]/paddle:mypaddle +``` + +**[NOTE]**, in the above command arguments, `[YOUR_REPO]` representative your Docker repository, +you need to use your repository instead of it. We will use `[YOUR_REPO]/paddle:mypaddle` to +represent the Docker Image which built in this step. + +### Prepare Training Data + +We can download and split the training job by creating a Kubernetes Job, or custom your image +by editing [k8s_train](./src/k8s_train/README.md). + +Before creating a Job, we need to bind a [persistenVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes) by the different type of +the different distributed file system, the generated dataset would be saved on this volume. + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: paddle-data +spec: + template: + metadata: + name: pi + spec: + hostNetwork: true + containers: + - name: paddle-data + image: paddlepaddle/paddle-tutorial:k8s_data + imagePullPolicy: Always + volumeMounts: + - mountPath: "/mnt" + name: nfs + env: + - name: OUT_DIR + value: /home/work/mfs/paddle-cluster-job + - name: SPLIT_COUNT + value: "3" + volumes: + - name: nfs + persistentVolumeClaim: + claimName: mfs + restartPolicy: Never +``` + +If success, you can see some information like this: + +```base +[root@paddle-kubernetes-node0 nfsdir]$ tree -d +. +`-- paddle-cluster-job + |-- 0 + | `-- data + |-- 1 + | `-- data + |-- 2 + | `-- data + |-- output + |-- quick_start +``` + +The `paddle-cluster-job` above is the job name for this training job; we need 3 +PaddlePaddle training node and save the split training data on `paddle-cluster-job` path, +the folder `0`, `1` and `2` representative the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs. + + +### Create a Job + +Kubernetes allow users to create an object with YAML files, and we can use a command-line tool +to create it. + +The Job YAML file describes that which Docker Image would be used in this training job, how much nodes would be created, what's the startup arguments of `Paddle PServer/Trainer` process and what's the type of Volumes. You can find the details of the YAML filed in +[Kubernetes Job API](http://kubernetes.io/docs/api-reference/batch/v1/definitions/#_v1_job). +The following is an example for this training job: + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: paddle-cluster-job +spec: + parallelism: 3 + completions: 3 + template: + metadata: + name: paddle-cluster-job + spec: + volumes: + - name: jobpath + hostPath: + path: /home/work/mfs + containers: + - name: trainer + image: [YOUR_REPO]/paddle:mypaddle + command: ["bin/bash", "-c", "/root/start.sh"] + env: + - name: JOB_NAME + value: paddle-cluster-job + - name: JOB_PATH + value: /home/jobpath + - name: JOB_NAMESPACE + value: default + - name: TRAIN_CONFIG_DIR + value: recommendation + - name: CONF_PADDLE_NIC + value: eth0 + - name: CONF_PADDLE_PORT + value: "7164" + - name: CONF_PADDLE_PORTS_NUM + value: "2" + - name: CONF_PADDLE_PORTS_NUM_SPARSE + value: "2" + - name: CONF_PADDLE_GRADIENT_NUM + value: "3" + volumeMounts: + - name: jobpath + mountPath: /home/jobpath + restartPolicy: Never +``` + +In the above YAML file: +- `metadata.name`, The job name. +- `parallelism`, The Kubernetes Job would create `parallelism` Pods at the same time. +- `completions`, The Job would become the success status only the number of successful Pod(the exit code is 0) + is equal to `completions`. +- `volumeMounts`, the name field `jobpath` is a key, the `mountPath` field represents + the path in the container, and we can define the `jobpath` in `volumes` filed, use `hostPath` + to configure the host path we want to mount. +- `env`, the environment variables in the Container, we pass some startup arguments by + this approach, some details are as following: + - JOB_PATH:the mount path in the container + - JOB_NAME:the job name + - TRAIN_CONFIG_DIR:the job path in the container, we can find the training data path by + combine with JOB_NAME. + - CONF_PADDLE_NIC: the argument `--nics` of `Paddle PServer` process, the network + device name. + - CONF_PADDLE_PORT: the argument `--port` of `Paddle PServer` process. + - CONF_PADDLE_PORTS_NUM: the argument `--ports_num` of `Paddle PServer`, the port number + for dense prameter update. + - CONF_PADDLE_PORTS_NUM_SPARSE:the argument `--ports_num_for_sparse` of `Paddle PServer`, + the port number for sparse parameter update. + - CONF_PADDLE_GRADIENT_NUM:the number of training node, the argument + `--num_gradient_servers` of `Paddle PServer` and `Paddle Trainer`. + +You can find some details information at [here] +(http://www.paddlepaddle.org/docs/develop/documentation/zh/howto/usage/cmd_parameter/detail_introduction_cn.html)。 + +We can use the command-line tool of Kubernetes to create a Job when we finish the YAML file: + +```bash +kubectl create -f job.yaml +``` + +Upon successful creation, Kubernetes would create 3 Pods as PaddlePaddle training node, +, pull the Docker image and begin to train. + + +### Checkout the Output + +At the process of training, we can check the logs and the output models, such as we store +the output on `output` folder. **NOTE**, `node_0`, `node_1` and `node_2` represent the +`trainer_id` of the PaddlePaddle training job rather than the node id of Kubernetes. + +```bash +[root@paddle-kubernetes-node0 output]# tree -d +. +├── node_0 +│   ├── server.log +│   └── train.log +├── node_1 +│   ├── server.log +│   └── train.log +├── node_2 +...... +├── pass-00002 +│   ├── done +│   ├── ___embedding_0__.w0 +│   ├── ___embedding_1__.w0 +...... +``` + +We can checkout the status of each training Pod by viewing the logs: + +```bash +[root@paddle-kubernetes-node0 node_0]# cat train.log +I1116 09:10:17.123121 50 Util.cpp:155] commandline: + /usr/local/bin/../opt/paddle/bin/paddle_trainer + --nics=eth0 --port=7164 + --ports_num=2 --comment=paddle_process_by_paddle + --pservers=192.168.129.66,192.168.223.143,192.168.129.71 + --ports_num_for_sparse=2 --config=./trainer_config.py + --trainer_count=4 --num_passes=10 --use_gpu=0 + --log_period=50 --dot_period=10 --saving_period=1 + --local=0 --trainer_id=0 + --save_dir=/home/jobpath/paddle-cluster-job/output +I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions +I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done. +[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config. +[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating] +[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__square_error_cost_0__] +I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal +I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process +I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process +I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters.. +I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done. +I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164 +I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165 +I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164 +I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165 +I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164 +I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165 +``` + +## Some Additional Details + +### Using Environment Variables + +Usually we use the environment varialbes to configurate the PaddlePaddle Job which running on +Kubernetes, `start_paddle.py` provides a start up script to convert the environment variable +to the start up argument of PaddlePaddle process: + +```bash +API = "/api/v1/namespaces/" +JOBSELECTOR = "labelSelector=job-name=" +JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME") +JOB_PATH_OUTPUT = JOB_PATH + "/output" +JOBNAME = os.getenv("JOB_NAME") +NAMESPACE = os.getenv("JOB_NAMESPACE") +PADDLE_NIC = os.getenv("CONF_PADDLE_NIC") +PADDLE_PORT = os.getenv("CONF_PADDLE_PORT") +PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM") +PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE") +PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM") +``` + +### Communication between Pods + +At the begin of `start_paddle.py`, it would initialize and parse the arguments. + +```python +parser = argparse.ArgumentParser(prog="start_paddle.py", + description='simple tool for k8s') + args, train_args_list = parser.parse_known_args() + train_args = refine_unknown_args(train_args_list) + train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2])) + podlist = getPodList() +``` + +And then query the status of all the other Pods of this Job by the function `getPodList()`, and fetch `triner_id` by the function `getIdMap(podlist)` if all the Pods status is `RUNNING`. + +```python + podlist = getPodList() + # need to wait until all pods are running + while not isPodAllRunning(podlist): + time.sleep(10) + podlist = getPodList() + idMap = getIdMap(podlist) +``` + +**NOTE**: `getPodList()` would fetch all the pod in the current namespace, if some Pods are running, may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of +Kubernetes Pod or Replicaset in the future. + +For the implement of `getIdMap(podlist)`, this function would fetch each IP address of +`podlist` and then sort them to generate `trainer_id`. + +```python +def getIdMap(podlist): + ''' + generate tainer_id by ip + ''' + ips = [] + for pod in podlist["items"]: + ips.append(pod["status"]["podIP"]) + ips.sort() + idMap = {} + for i in range(len(ips)): + idMap[ips[i]] = i + return idMap +``` + +After getting the `idMap`, we can generate the arguments of `Paddle PServer` and `Paddle Trainer` +so that we can start up them by `startPaddle(idMap, train_args_dict)`. + +### Create Job + +The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and `Paddle Trainer` processes. Such as `Paddle Trainer`, we parse the environment variable and then get +`PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc..., finally find `trainerId` from +`idMap` according to its IP address. + +```python + program = 'paddle train' + args = " --nics=" + PADDLE_NIC + args += " --port=" + str(PADDLE_PORT) + args += " --ports_num=" + str(PADDLE_PORTS_NUM) + args += " --comment=" + "paddle_process_by_paddle" + ip_string = "" + for ip in idMap.keys(): + ip_string += (ip + ",") + ip_string = ip_string.rstrip(",") + args += " --pservers=" + ip_string + args_ext = "" + for key, value in train_args_dict.items(): + args_ext += (' --' + key + '=' + value) + localIP = socket.gethostbyname(socket.gethostname()) + trainerId = idMap[localIP] + args += " " + args_ext + " --trainer_id=" + \ + str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT +``` From ee178d5aebaa48f3434ec577ab1b450f4e3d7eab Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Tue, 10 Apr 2018 04:41:39 +0000 Subject: [PATCH 13/33] fix bugs --- paddle/fluid/framework/parallel_executor.cc | 4 +--- paddle/fluid/operators/read_op.cc | 7 ------- .../operators/reader/create_threaded_reader_op.cc | 8 ++++---- python/paddle/fluid/layers/io.py | 13 +++++++------ 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 74945fb4f2..1bb089c344 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs( for (auto &var : vars) { auto *main_var = main_scope->FindVar(var); - if (!main_var->IsType()) { + if (main_var == nullptr || !main_var->IsType()) { continue; } auto &main_tensor = main_var->Get(); - auto &dims = main_tensor.dims(); - if (paddle::platform::is_gpu_place(main_tensor.place())) { size_t numel = main_tensor.numel(); ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 2925b8a85d..4496110cf8 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -66,13 +66,6 @@ class ReadOp : public framework::OperatorBase { std::vector out_arg_names = Outputs("Out"); std::vector ins; reader->ReadNext(&ins); - if (ins.empty()) { - reader->ReInit(); - reader->ReadNext(&ins); - PADDLE_ENFORCE( - !ins.empty(), - "Reader can not read the next data even it has been re-initialized."); - } PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); for (size_t i = 0; i < ins.size(); ++i) { auto* out = diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 854381e0ee..7b10135afc 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -111,7 +111,7 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { "When 'unsafe_mode' is false, invoking 'HasNext()' or " "'ReInit()' is not allowed to avoid unexpected bugs in " "multi-thread environment.") - .SetDefault(false); + .SetDefault(true); AddComment(R"DOC( CreateThreadedReader Operator @@ -134,6 +134,6 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { } // namespace paddle namespace reader = paddle::operators::reader; -REGISTER_FILE_READER_OPERATOR(create_threaded_reader, - reader::CreateThreadedReaderOp, - reader::CreateThreadedReaderOpMaker); +REGISTER_DECORATED_READER_OPERATOR(create_threaded_reader, + reader::CreateThreadedReaderOp, + reader::CreateThreadedReaderOpMaker); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 4901521db5..d016ab9008 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -350,7 +350,7 @@ def open_recordio_file(filename, main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) if for_parallel: - main_prog_var = for_parallel(reader=main_prog_var) + main_prog_var = parallelize(reader=main_prog_var) return monkey_patch_reader_methods(main_prog_var) @@ -435,12 +435,12 @@ def open_files(filenames, reader=main_prog_reader, pass_num=pass_num) if for_parallel: - main_prog_reader = for_parallel(reader=main_prog_reader) + main_prog_reader = parallelize(reader=main_prog_reader) return monkey_patch_reader_methods(main_prog_reader) -def __create_unshared_decorated_reader__(op_type, reader, attrs={}): +def __create_shared_decorated_reader__(op_type, reader, attrs): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) @@ -456,7 +456,7 @@ def __create_unshared_decorated_reader__(op_type, reader, attrs={}): return monkey_patch_reader_methods(main_prog_var) -def __create_shared_decorated_reader__(op_type, reader, attrs={}): +def __create_unshared_decorated_reader__(op_type, reader, attrs): new_reader_name = unique_name(op_type) main_blk = default_main_program().current_block() new_reader = main_blk.create_var(name=new_reader_name) @@ -488,8 +488,9 @@ def multi_pass(reader, pass_num): 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) -def for_parallel(reader): - return __create_shared_decorated_reader__('create_threaded_reader', reader) +def parallelize(reader): + return __create_shared_decorated_reader__('create_threaded_reader', reader, + {}) def read_file(file_obj): From e7467d94a0418fe88fbd9eba23b4322914ebc10d Mon Sep 17 00:00:00 2001 From: Luo Tao Date: Tue, 10 Apr 2018 14:42:13 +0800 Subject: [PATCH 14/33] add remove_op, remove_var in Python end --- paddle/fluid/framework/block_desc.cc | 50 +------------------ python/paddle/fluid/framework.py | 11 ++++ .../tests/unittests/test_protobuf_descs.py | 20 -------- 3 files changed, 13 insertions(+), 68 deletions(-) diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc index fbe08349c3..b8847e4b90 100644 --- a/paddle/fluid/framework/block_desc.cc +++ b/paddle/fluid/framework/block_desc.cc @@ -13,11 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/block_desc.h" +#include #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" -#include - namespace paddle { namespace framework { @@ -147,52 +146,7 @@ void BlockDesc::RemoveOp(size_t s, size_t e) { if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) { return; } - auto get_vars = [](std::deque>::iterator &op, - std::vector &v) { - auto in_names = (*op)->InputArgumentNames(); - v.insert(v.end(), in_names.begin(), in_names.end()); - auto out_names = (*op)->OutputArgumentNames(); - v.insert(v.end(), out_names.begin(), out_names.end()); - std::sort(v.begin(), v.end()); - auto last = std::unique(v.begin(), v.end()); - v.erase(last, v.end()); - }; - need_update_ = true; - - for (size_t i = s; i < e; i++) { - // since remove op one by one, every time remove the first op. - auto op = ops_.begin() + s; - - // collect input and output variables from current delete op - std::vector cur_vars; - get_vars(op, cur_vars); - - // remove current op - ops_.erase(ops_.begin() + s); - - // collect input and output variables from other ops - std::vector other_vars; - for (auto it = ops_.begin(); it != ops_.end(); it++) { - get_vars(it, other_vars); - } - - // variables should be deleted - std::vector delete_vars; - // delete_vars = cur_vars - cur_vars ^ other_input_vars - std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(), - other_vars.end(), - std::inserter(delete_vars, delete_vars.end())); - // remove variables - for (size_t i = 0; i < delete_vars.size(); i++) { - auto name = delete_vars[i]; - auto it = vars_.find(name); - PADDLE_ENFORCE(it != vars_.end(), - "%s is not in variable list, it should not be deleted", - name); - vars_.erase(it); - VLOG(3) << "deleting variable " << name; - } - } + ops_.erase(ops_.begin() + s, ops_.begin() + e); } std::vector BlockDesc::AllOps() const { diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 33cf691817..793421a22f 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -818,6 +818,11 @@ class Block(object): del self.vars[name] self.sync_with_cpp() + def remove_var(self, name): + self.sync_with_cpp() + self.desc.remove_var(name) + del self.vars[name] + def create_parameter(self, *args, **kwargs): global_block = self.program.global_block() param = Parameter(global_block, *args, **kwargs) @@ -838,6 +843,11 @@ class Block(object): self.ops.insert(index, op) return op + def remove_op(self, index): + self.sync_with_cpp() + self.desc.remove_op(index, index + 1) + del self.ops[index] + def delete_ops(self, ops): # remove from cpp # FIXME(typhoonzero): remove only the first occurrence. @@ -846,6 +856,7 @@ class Block(object): end = list(self.ops).index(ops[-1]) except Exception, e: raise e + self.desc.remove_op(start, end + 1) def slice_ops(self, start, end): diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py index f98a8bbc68..3f9059fb5b 100644 --- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py +++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py @@ -201,24 +201,6 @@ class TestBlockDesc(unittest.TestCase): op1.set_type("test") op2.set_type("test") - var0 = block.var("var0") - var1 = block.var("var1") - var2 = block.var("var2") - var3 = block.var("var3") - var4 = block.var("var4") - var5 = block.var("var5") - - op0.set_input("X", ["var0"]) - op0.set_output("Y", ["var0"]) - op1.set_input("X", ["var1", "var2"]) - op1.set_output("Y", ["var3", "var4"]) - op2.set_input("X", ["var1"]) - op2.set_output("Y", ["var4", "var5"]) - - program.sync_with_cpp() - - # remove op1, its input var2 and output var3 will be removed at the same time, - # but its input var1 and output var4 will not be removed since they are used for op2. block.remove_op(1, 2) program.sync_with_cpp() @@ -226,8 +208,6 @@ class TestBlockDesc(unittest.TestCase): for idx in xrange(0, block.op_size()): all_ops.append(block.op(idx)) self.assertEqual(all_ops, [op0, op2]) - all_vars = block.all_vars() - self.assertEqual(set(all_vars), {var0, var1, var4, var5}) if __name__ == '__main__': From 284a2137742f63e8a70f4d98805328edc067f054 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Tue, 10 Apr 2018 07:46:03 +0000 Subject: [PATCH 15/33] fix a name conflict --- python/paddle/fluid/layers/io.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index dbba1a46eb..7b590fb510 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -350,7 +350,7 @@ def open_recordio_file(filename, main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) if for_parallel: - main_prog_var = for_parallel(reader=main_prog_var) + main_prog_var = parallel(reader=main_prog_var) return monkey_patch_reader_methods(main_prog_var) @@ -435,7 +435,7 @@ def open_files(filenames, reader=main_prog_reader, pass_num=pass_num) if for_parallel: - main_prog_reader = for_parallel(reader=main_prog_reader) + main_prog_reader = parallel(reader=main_prog_reader) return monkey_patch_reader_methods(main_prog_reader) @@ -474,7 +474,7 @@ def multi_pass(reader, pass_num): {'pass_num': int(pass_num)}) -def for_parallel(reader): +def parallel(reader): return __create_decorated_reader__('create_threaded_reader', reader) From 40e3fe173ca06a8196c3a24906923833cfb0f372 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 10 Apr 2018 17:46:53 +0800 Subject: [PATCH 16/33] Make cuda_helper.h Pass cpplint --- paddle/fluid/platform/cuda_helper.h | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/platform/cuda_helper.h b/paddle/fluid/platform/cuda_helper.h index 881d611d4a..8758af0804 100644 --- a/paddle/fluid/platform/cuda_helper.h +++ b/paddle/fluid/platform/cuda_helper.h @@ -33,22 +33,26 @@ constexpr int PADDLE_CUDA_NUM_THREADS = 512; USE_CUDA_ATOMIC(Add, float); USE_CUDA_ATOMIC(Add, int); USE_CUDA_ATOMIC(Add, unsigned int); -USE_CUDA_ATOMIC(Add, unsigned long long int); +// CUDA API uses unsigned long long int, we cannot use uint64_t here. +// It because unsigned long long int is not necessarily uint64_t +USE_CUDA_ATOMIC(Add, unsigned long long int); // NOLINT CUDA_ATOMIC_WRAPPER(Add, int64_t) { - static_assert(sizeof(int64_t) == sizeof(long long int), + // Here, we check long long int must be int64_t. + static_assert(sizeof(int64_t) == sizeof(long long int), // NOLINT "long long should be int64"); - return CudaAtomicAdd(reinterpret_cast(address), - static_cast(val)); + return CudaAtomicAdd( + reinterpret_cast(address), // NOLINT + static_cast(val)); // NOLINT } #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 600 USE_CUDA_ATOMIC(Add, double); #else CUDA_ATOMIC_WRAPPER(Add, double) { - unsigned long long int* address_as_ull = - reinterpret_cast(address); - unsigned long long int old = *address_as_ull, assumed; + unsigned long long int* address_as_ull = // NOLINT + reinterpret_cast(address); // NOLINT + unsigned long long int old = *address_as_ull, assumed; // NOLINT do { assumed = old; From adaa9c5bfeabc917b6640b8f1379b6d8c2c8115c Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 10 Apr 2018 19:49:29 +0800 Subject: [PATCH 17/33] update by comments --- .../multi_cluster/k8s_distributed_en.md | 85 ++++++++++--------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md index dfc0f0d3e6..2ed75b4dcc 100644 --- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md +++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md @@ -2,29 +2,29 @@ We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the previous document. -In this article, we will introduce how to craete a PaddlePaddle job with multiple nodes +In this article, we will introduce how to create a PaddlePaddle job with multiple nodes on Kubernetes cluster. ## Overall Architecture -Before creating a training job, the users need to deploy the Python scripts and -training data which have already been sliced on the precast path in the distributed file -system(We can use the different type of Kuberentes Volumes to mount different distributed -file system). Before start training, The program would copy the training data into the +Before creating a training job, the users need to slice the training data and deploy +the Python scripts along with it into the distributed file system +(We can use the different type of Kuberentes Volumes to mount different distributed +file systems). Before training starts, The program will copy the training data into the Container and also save the models at the same path during training. The global architecture is as follows: ![PaddlePaddle on Kubernetes Architecture](src/k8s-paddle-arch.png) The above figure describes a distributed training architecture which contains 3 nodes, each -Pod would mount a folder of the distributed file system to save training data and models -by Kubernetes Volume. Kubernetes created 3 Pod for this training phase and scheduled these on -3 nodes, each Pod has a PaddlePaddle container. After the containers have been created, -PaddlePaddle would start up the communication between PServer and Trainer and read training +Pod mounts a folder of the distributed file system to save training data and models +by Kubernetes Volume. Kubernetes created 3 Pods for this training phase and scheduled these on +3 nodes, each Pod has a PaddlePaddle container. After the containers car created, +PaddlePaddle starts up the communication between PServer and Trainer and read training data for this training job. -As the description above, we can start up a PaddlePaddle distributed training job on a ready -Kubernetes cluster as the following steps: +As the description above, we can start up a PaddlePaddle distributed training job on a +Kubernetes ready cluster with the following steps: 1. [Build PaddlePaddle Docker Image](#Build a Docker Image) 1. [Split training data and upload to the distributed file system](#Upload Training Data) @@ -35,16 +35,13 @@ We will introduce these steps as follows: ### Build a Docker Image -PaddlePaddle Docker Image needs to support the runtime environment of `Paddle PServer` and -`Paddle Trainer` process and this Docker Image has the two import features: +Training docker image needs to package the paddle pserver and paddle trainer runtimes, as well as two more processes before we can kick off the training: -- Copy the training data into the container. -- Generate the start arguments of `Paddle PServer` and `Paddle Training` process. +- Copying the training data into container. +- Generating the initialization arguments for `Paddle PServer` and `Paddle Training` processes. -Because of the official Docker Image `paddlepaddle/paddle:latest` has already included the -PaddlePaddle executable file, but above features so that we can use the official Docker Image as -a base Image and add some additional scripts to finish the work of building a new image. -You can reference [Dockerfile](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile). +Since the paddlepaddle official docker image already has the runtimes we need, we'll take it as the base image and pack some additional scripts for the processes mentioned above to build our training image. for more detail, please find from the following link: +- https://github.com/PaddlePaddle/Paddle/blob/develop/doc/howto/usage/cluster/src/k8s_train/Dockerfile ```bash @@ -58,17 +55,17 @@ And then upload the new Docker Image to a Docker hub: docker push [YOUR_REPO]/paddle:mypaddle ``` -**[NOTE]**, in the above command arguments, `[YOUR_REPO]` representative your Docker repository, -you need to use your repository instead of it. We will use `[YOUR_REPO]/paddle:mypaddle` to +**[NOTE]**, in the above command arguments, `[YOUR_REPO]` represents your Docker repository, +you need to use your repository instead of it. We will replace it with your respository name to represent the Docker Image which built in this step. ### Prepare Training Data We can download and split the training job by creating a Kubernetes Job, or custom your image -by editing [k8s_train](./src/k8s_train/README.md). +by editing [k8s_train](./src/k8s_train/). Before creating a Job, we need to bind a [persistenVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes) by the different type of -the different distributed file system, the generated dataset would be saved on this volume. +the different file system, the generated dataset would be saved on this volume. ```yaml apiVersion: batch/v1 @@ -100,7 +97,13 @@ spec: restartPolicy: Never ``` -If success, you can see some information like this: +Create the Job with the following command: + +```bash +> kubectl create -f xxx.yaml +``` + +If created successfully, you can see some information like this: ```base [root@paddle-kubernetes-node0 nfsdir]$ tree -d @@ -117,13 +120,13 @@ If success, you can see some information like this: ``` The `paddle-cluster-job` above is the job name for this training job; we need 3 -PaddlePaddle training node and save the split training data on `paddle-cluster-job` path, -the folder `0`, `1` and `2` representative the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs. +PaddlePaddle training nodes and save the split training data in `paddle-cluster-job` path, +the folder `0`, `1` and `2` represents the `training_id` on each node, `quick_start` folder is used to store training data, `output` folder is used to store the models and logs. ### Create a Job -Kubernetes allow users to create an object with YAML files, and we can use a command-line tool +Kubernetes allow users to create objects with YAML files, and we can use a command-line tool to create it. The Job YAML file describes that which Docker Image would be used in this training job, how much nodes would be created, what's the startup arguments of `Paddle PServer/Trainer` process and what's the type of Volumes. You can find the details of the YAML filed in @@ -177,8 +180,8 @@ spec: In the above YAML file: - `metadata.name`, The job name. -- `parallelism`, The Kubernetes Job would create `parallelism` Pods at the same time. -- `completions`, The Job would become the success status only the number of successful Pod(the exit code is 0) +- `parallelism`, Whether the Kubernetes Job would create `parallelism` Pods at the same time. +- `completions`, The Job would become the success status only when the number of successful Pod(the exit code is 0) is equal to `completions`. - `volumeMounts`, the name field `jobpath` is a key, the `mountPath` field represents the path in the container, and we can define the `jobpath` in `volumes` filed, use `hostPath` @@ -209,13 +212,15 @@ kubectl create -f job.yaml ``` Upon successful creation, Kubernetes would create 3 Pods as PaddlePaddle training node, -, pull the Docker image and begin to train. +pull the Docker image and begin to train. ### Checkout the Output -At the process of training, we can check the logs and the output models, such as we store -the output on `output` folder. **NOTE**, `node_0`, `node_1` and `node_2` represent the +At the process of training, we can check the logs and the output models which is stored in +the `output` folder. + +**NOTE**, `node_0`, `node_1` and `node_2` represent the `trainer_id` of the PaddlePaddle training job rather than the node id of Kubernetes. ```bash @@ -292,7 +297,7 @@ PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM") ### Communication between Pods -At the begin of `start_paddle.py`, it would initialize and parse the arguments. +At the begin of `start_paddle.py`, it would initializes and parses the arguments. ```python parser = argparse.ArgumentParser(prog="start_paddle.py", @@ -314,11 +319,12 @@ And then query the status of all the other Pods of this Job by the function `get idMap = getIdMap(podlist) ``` -**NOTE**: `getPodList()` would fetch all the pod in the current namespace, if some Pods are running, may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of +**NOTE**: `getPodList()` would prefetch all the Pods in the current namespace, if some +Pods are alreay running, it may cause some error. We will use [statfulesets](https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets) instead of Kubernetes Pod or Replicaset in the future. -For the implement of `getIdMap(podlist)`, this function would fetch each IP address of -`podlist` and then sort them to generate `trainer_id`. +The function `getIdMap(podlist)` fetches IPs addresses of `podlist` and then sort them +to generate `trainer_id`. ```python def getIdMap(podlist): @@ -340,9 +346,10 @@ so that we can start up them by `startPaddle(idMap, train_args_dict)`. ### Create Job -The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and `Paddle Trainer` processes. Such as `Paddle Trainer`, we parse the environment variable and then get -`PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc..., finally find `trainerId` from -`idMap` according to its IP address. +The main goal of `startPaddle` is generating the arguments of `Paddle PServer` and +`Paddle Trainer` processes. Take `Paddle Trainer` as an example, we parse the +environment variable and then get `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM` and etc..., +finally find `trainerId` from `idMap` according to its IP address. ```python program = 'paddle train' From 4a2234987621d4ca1e1a749d056e2402f5e39b5a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 10 Apr 2018 19:52:20 +0800 Subject: [PATCH 18/33] update by comments --- doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md index 2ed75b4dcc..08e546c4f7 100644 --- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md +++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md @@ -277,9 +277,9 @@ I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7 ### Using Environment Variables -Usually we use the environment varialbes to configurate the PaddlePaddle Job which running on +Usually we use the environment varialbes to configurate the PaddlePaddle Job which runs in Kubernetes, `start_paddle.py` provides a start up script to convert the environment variable -to the start up argument of PaddlePaddle process: +to the start up arguments of PaddlePaddle process: ```bash API = "/api/v1/namespaces/" From 875d48d106c0ec8943d0d3e9560bd7effe86c1b0 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 10 Apr 2018 19:53:40 +0800 Subject: [PATCH 19/33] edit the title --- doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md index 08e546c4f7..dee1b7554f 100644 --- a/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md +++ b/doc/v2/howto/cluster/multi_cluster/k8s_distributed_en.md @@ -1,4 +1,4 @@ -# Kubernetes Distributed Training +# Distributed Training on Kubernetes We introduced how to create a PaddlePaddle Job with a single node on Kuberentes in the previous document. From 19c1a68ee9d73e24205ce05265ac4c24e60552cc Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Tue, 10 Apr 2018 22:20:25 +0800 Subject: [PATCH 20/33] Fix lost of LoD while splitting tensor in parallel executor. --- paddle/fluid/framework/parallel_executor.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 99b3065d8d..f393105fe8 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -181,10 +181,10 @@ void ParallelExecutor::SplitTensorToPlaces( member_->places_.size(), lod_tensors.size()); for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? - member_->local_scopes_[j] - ->Var(it.first) - ->GetMutable() - ->ShareDataWith(lod_tensors[j]); + auto t = + member_->local_scopes_[j]->Var(it.first)->GetMutable(); + t->ShareDataWith(lod_tensors[j]); + t->set_lod(lod_tensors[j].lod()); } } } From a84b81502cd26c02fbc3b4c46d751b0363a5ff46 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 11 Apr 2018 01:30:56 +0800 Subject: [PATCH 21/33] Remove Readers' HasNext() --- paddle/fluid/framework/lod_tensor.cc | 32 ++++---- paddle/fluid/framework/lod_tensor.h | 7 +- paddle/fluid/framework/lod_tensor_test.cc | 18 ++--- paddle/fluid/framework/reader.h | 13 +--- .../reader/create_double_buffer_reader_op.cc | 38 +++++----- .../reader/create_multi_pass_reader_op.cc | 16 +--- .../reader/create_random_data_generator_op.cc | 4 +- .../reader/create_recordio_file_reader_op.cc | 10 +-- .../reader/create_shuffle_reader_op.cc | 24 +++--- .../reader/create_threaded_reader_op.cc | 75 ++++--------------- .../fluid/operators/reader/open_files_op.cc | 25 ++++--- paddle/fluid/pybind/pybind.cc | 1 - paddle/fluid/pybind/recordio.cc | 2 +- python/paddle/fluid/layers/io.py | 10 +-- 14 files changed, 105 insertions(+), 170 deletions(-) diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 8155cb55a4..a56674cbe2 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -12,9 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/framework/lod_tensor.h" +#include +#include +#include +#include + #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/framework.pb.h" +#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/memory/memory.h" @@ -22,11 +27,6 @@ limitations under the License. */ #include "paddle/fluid/recordio/scanner.h" #include "paddle/fluid/recordio/writer.h" -#include -#include -#include -#include - namespace paddle { namespace framework { @@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, TensorFromStream(is, static_cast(tensor), dev_ctx); } -void WriteToRecordIO(recordio::Writer &writer, +void WriteToRecordIO(recordio::Writer *writer, const std::vector &tensor, const platform::DeviceContext &dev_ctx) { std::stringstream buffer; @@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer, for (auto &each : tensor) { SerializeToStream(buffer, each, dev_ctx); } - writer.Write(buffer.str()); + writer->Write(buffer.str()); } std::vector ReadFromRecordIO( - recordio::Scanner &scanner, const platform::DeviceContext &dev_ctx) { - std::istringstream sin(scanner.Next()); - uint32_t sz; - sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); + recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) { std::vector result; - result.resize(sz); - for (uint32_t i = 0; i < sz; ++i) { - DeserializeFromStream(sin, &result[i], dev_ctx); + if (scanner->HasNext()) { + std::istringstream sin(scanner->Next()); + uint32_t sz; + sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); + result.resize(sz); + for (uint32_t i = 0; i < sz; ++i) { + DeserializeFromStream(sin, &result[i], dev_ctx); + } } return result; } diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 4f130d2659..1159fee39b 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -15,6 +15,9 @@ limitations under the License. */ #pragma once #include +#include +#include +#include #ifdef PADDLE_WITH_CUDA #include #include @@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); -extern void WriteToRecordIO(recordio::Writer& writer, +extern void WriteToRecordIO(recordio::Writer* writer, const std::vector& tensor, const platform::DeviceContext& dev_ctx); extern std::vector ReadFromRecordIO( - recordio::Scanner& scanner, const platform::DeviceContext& dev_ctx); + recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index e691e29383..97ab98f09b 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/framework/lod_tensor.h" - -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - #include #include #include #include #include +#include "paddle/fluid/framework/lod_tensor.h" + +#include "paddle/fluid/recordio/scanner.h" +#include "paddle/fluid/recordio/writer.h" + namespace paddle { namespace framework { @@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) { *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); { recordio::Writer writer(stream, recordio::Compressor::kSnappy); - WriteToRecordIO(writer, {tensor, tensor}, ctx); - WriteToRecordIO(writer, {tensor, tensor}, ctx); + WriteToRecordIO(&writer, {tensor, tensor}, ctx); + WriteToRecordIO(&writer, {tensor, tensor}, ctx); writer.Flush(); } @@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) { { std::unique_ptr stream_ptr(stream); recordio::Scanner scanner(std::move(stream_ptr)); - auto tensors = ReadFromRecordIO(scanner, ctx); + auto tensors = ReadFromRecordIO(&scanner, ctx); ASSERT_EQ(tensors.size(), 2); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); - tensors = ReadFromRecordIO(scanner, ctx); + tensors = ReadFromRecordIO(&scanner, ctx); ASSERT_EQ(tensors.size(), 2); assert_tensor_ok(tensors[0]); assert_tensor_ok(tensors[1]); diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 3573b99bec..3a413941df 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -14,14 +14,13 @@ #pragma once +#include +#include + #include "paddle/fluid/framework/ddim.h" #include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/platform/place.h" -#include -#include -#include - namespace paddle { namespace framework { @@ -31,8 +30,6 @@ class ReaderBase { virtual void ReInit() = 0; - virtual bool HasNext() const = 0; - virtual ~ReaderBase(); }; @@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase { void ReInit() override { reader_->ReInit(); } - bool HasNext() const override { return reader_->HasNext(); } - protected: ReaderBase* reader_; }; @@ -80,8 +75,6 @@ class ReaderHolder { reader_->ReInit(); } - bool HasNext() const { return reader_->HasNext(); } - private: std::unique_ptr reader_; }; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index d9f799f14d..33a50b5ceb 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader { StartPrefetcher(); } - bool HasNext() const override; void ReadNext(std::vector* out) override; void ReInit() override; ~DoubleBufferReader() { EndPrefetcher(); } private: + bool HasNext() const; + void StartPrefetcher() { channel_ = framework::MakeChannel(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); @@ -149,22 +150,15 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { } }; -bool DoubleBufferReader::HasNext() const { - while (!channel_->IsClosed() && !channel_->CanReceive()) { - } - return channel_->CanReceive(); -} - void DoubleBufferReader::ReadNext(std::vector* out) { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } - - Item batch; - channel_->Receive(&batch); - *out = batch.payloads_; - if (batch.ctx_) { - batch.ctx_->Wait(); + out->clear(); + if (HasNext()) { + Item batch; + channel_->Receive(&batch); + *out = batch.payloads_; + if (batch.ctx_) { + batch.ctx_->Wait(); + } } } @@ -174,16 +168,26 @@ void DoubleBufferReader::ReInit() { StartPrefetcher(); } +bool DoubleBufferReader::HasNext() const { + while (!channel_->IsClosed() && !channel_->CanReceive()) { + } + return channel_->CanReceive(); +} + void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; std::vector> cpu_tensor_cache(kCacheSize); std::vector> gpu_tensor_cache(kCacheSize); size_t cached_tensor_id = 0; - while (reader_->HasNext()) { + while (true) { Item batch; auto& cpu_batch = cpu_tensor_cache[cached_tensor_id]; reader_->ReadNext(&cpu_batch); + if (cpu_batch.empty()) { + // The underlying reader have no next data. + break; + } if (platform::is_gpu_place(place_)) { auto& gpu_batch = gpu_tensor_cache[cached_tensor_id]; auto* gpu_ctx = ctxs_[cached_tensor_id].get(); diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc index b72ccc77a3..0573345ba5 100644 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc @@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader { : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} void ReadNext(std::vector* out) override { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } reader_->ReadNext(out); - } - - bool HasNext() const override { - if (reader_->HasNext()) { - return true; - } else { + if (out->empty()) { ++pass_count_; - if (pass_count_ >= pass_num_) { - return false; - } else { + if (pass_count_ < pass_num_) { reader_->ReInit(); - return true; + reader_->ReadNext(out); } } } diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index 95d8674c08..d1cb8e47da 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase { void ReInit() override { return; } - bool HasNext() const override { return true; } - private: float min_; float max_; @@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase { const auto& ranks = Attr>("ranks"); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - int(shape_concat.size()), + static_cast(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); std::vector shapes = RestoreShapes(shape_concat, ranks); diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index adaa0b9e5f..2ae2972556 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/recordio/scanner.h" @@ -35,17 +33,15 @@ class RecordIOFileReader : public framework::FileReader { LOG(INFO) << "Creating file reader" << filename; } - bool HasNext() const override { return scanner_.HasNext(); } - void ReInit() override { scanner_.Reset(); } protected: void ReadNextImpl(std::vector* out) override { if (ThreadSafe) { std::lock_guard guard(*mutex_); - *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); } else { - *out = framework::ReadFromRecordIO(scanner_, dev_ctx_); + *out = framework::ReadFromRecordIO(&scanner_, dev_ctx_); } } @@ -66,7 +62,7 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { const auto& ranks = Attr>("ranks"); PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - int(shape_concat.size()), + static_cast(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); std::string filename = Attr("filename"); diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index b164ce232d..13825d6591 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -30,35 +30,33 @@ class ShuffleReader : public framework::DecoratedReader { std::random_device device; seed_ = device(); } - ReadIntoBuffers(); + ReloadBuffer(); } void ReadNext(std::vector* out) override { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); - } + out->clear(); if (iteration_pos_ >= buffer_.size()) { VLOG(10) << "Resetting shuffle buffer"; - ReadIntoBuffers(); + ReloadBuffer(); + if (buffer_.empty()) { + return; + } } *out = buffer_[iteration_pos_++]; } - bool HasNext() const override { - return iteration_pos_ < buffer_.size() || reader_->HasNext(); - } - private: - void ReadIntoBuffers() { + void ReloadBuffer() { buffer_.clear(); buffer_.reserve(buffer_size_); iteration_pos_ = 0; for (size_t i = 0; i < buffer_size_; ++i) { - if (!reader_->HasNext()) { + std::vector ins; + reader_->ReadNext(&ins); + if (ins.empty()) { break; } - buffer_.emplace_back(); - reader_->ReadNext(&buffer_.back()); + buffer_.emplace_back(ins); } std::mt19937 g(seed_); std::shuffle(buffer_.begin(), buffer_.end(), g); diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index 7b10135afc..cbf709d5e7 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -21,67 +21,27 @@ namespace reader { class ThreadedReader : public framework::DecoratedReader { public: - ThreadedReader(ReaderBase* reader, bool unsafe_mode) - : DecoratedReader(reader), unsafe_mode_(unsafe_mode) {} + ThreadedReader(ReaderBase* reader, bool safe_mode) + : DecoratedReader(reader), safe_mode_(safe_mode) {} void ReadNext(std::vector* out) override { std::lock_guard lock(mutex_); - if (!unsafe_mode_) { - if (!reader_->HasNext()) { - PADDLE_THROW("There is no next data!"); - } - reader_->ReadNext(out); - } else { - auto& thread_buffer = thread_buffers_[std::this_thread::get_id()]; - if (thread_buffer.empty()) { - PADDLE_THROW( - "thread_buffer is empty! HasNext() must be invoked before " - "ReadNext() in the same thread."); - } - *out = thread_buffer; - thread_buffer.clear(); - } - } - - bool HasNext() const override { - if (!unsafe_mode_) { - PADDLE_THROW( - "ThreadedReader::HasNext() is disabled when 'unsafe_mode' is false."); - } - std::thread::id thread_id = std::this_thread::get_id(); - std::lock_guard lock(mutex_); - auto& thread_buffer = thread_buffers_[thread_id]; - if (thread_buffer.empty() && reader_->HasNext()) { - reader_->ReadNext(&thread_buffer); - } - return !thread_buffer.empty(); + reader_->ReadNext(out); } void ReInit() override { - if (!unsafe_mode_) { + if (safe_mode_) { PADDLE_THROW( - "ThreadedReader::ReInit() is disabled when 'unsafe_mode' is false."); + "ThreadedReader::ReInit() is disabled when 'safe_mode' is true."); } VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in " "multi-thread environment."; reader_->ReInit(); } - ~ThreadedReader() { - for (auto& p : thread_buffers_) { - if (!p.second.empty()) { - PADDLE_THROW( - "Find an unused data batch in ThreadedReader! Maybe one thread " - "invokes 'HasNext()' without subsequent 'ReadNext()'."); - } - } - } - private: - bool unsafe_mode_; - mutable std::mutex mutex_; - mutable std::unordered_map> - thread_buffers_; + bool safe_mode_; + std::mutex mutex_; }; class CreateThreadedReaderOp : public framework::OperatorBase { @@ -98,8 +58,8 @@ class CreateThreadedReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - bool unsafe_mode = Attr("unsafe_mode"); - out->Reset(new ThreadedReader(underlying_reader.Get(), unsafe_mode)); + bool safe_mode = Attr("safe_mode"); + out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode)); } }; @@ -107,10 +67,9 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { public: CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) : DecoratedReaderMakerBase(op_proto, op_checker) { - AddAttr("unsafe_mode", - "When 'unsafe_mode' is false, invoking 'HasNext()' or " - "'ReInit()' is not allowed to avoid unexpected bugs in " - "multi-thread environment.") + AddAttr("safe_mode", + "When 'safe_mode' is true, 'ReInit()' is disabled to avoid " + "unexpected bugs in multi-thread environment.") .SetDefault(true); AddComment(R"DOC( CreateThreadedReader Operator @@ -118,13 +77,9 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { This operator creates a threaded reader. A threaded reader's 'ReadNext()' can be invoked by several threads at the same time. - When the attribute 'unsafe_mode' is false, the threaded reader's - 'HasNext()' and 'ReInit()' will be disabled to avoid unexpected - bugs in multi-thread environment. If you really need them, you - can enable them by setting 'unsafe_mode' true. In this case, - 'HasNext()' returning true only guarantees the safety of - invoking 'ReadNext()' in the same thread. Each thread must - invoke 'HasNext()' and 'ReadNext()' in pairs. + When the attribute 'safe_mode' is true, the threaded reader's + 'ReInit()' is disabled to avoid unexpected bugs in multi-thread + environment. )DOC"); } }; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 45db94e780..9ce2e5dc2c 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -30,12 +30,12 @@ class MultiFileReader : public framework::ReaderBase { } void ReadNext(std::vector* out) override; - bool HasNext() const override; void ReInit() override; ~MultiFileReader() { EndScheduler(); } private: + bool HasNext(); void StartNewScheduler(); void EndScheduler(); void ScheduleThreadFunc(); @@ -52,16 +52,10 @@ class MultiFileReader : public framework::ReaderBase { }; void MultiFileReader::ReadNext(std::vector* out) { - if (!HasNext()) { - PADDLE_THROW("There is no next data!"); + out->clear(); + if (HasNext()) { + buffer_->Receive(out); } - buffer_->Receive(out); -} - -bool MultiFileReader::HasNext() const { - while (!buffer_->IsClosed() && !buffer_->CanReceive()) { - } - return buffer_->CanReceive(); } void MultiFileReader::ReInit() { @@ -69,6 +63,12 @@ void MultiFileReader::ReInit() { StartNewScheduler(); } +bool MultiFileReader::HasNext() { + while (!buffer_->IsClosed() && !buffer_->CanReceive()) { + } + return buffer_->CanReceive(); +} + void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_file_idx_ = framework::MakeChannel(file_names_.size()); @@ -140,9 +140,12 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name, VLOG(5) << "The prefetch thread of file '" << file_name << "' starts."; std::unique_ptr reader = CreateReaderByFileName(file_name, dims_); - while (reader->HasNext()) { + while (true) { std::vector ins; reader->ReadNext(&ins); + if (ins.empty()) { + break; + } try { buffer_->Send(&ins); } catch (paddle::platform::EnforceNotMet e) { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index bd8446df66..c7a5d1c714 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -252,7 +252,6 @@ All parameter, weight, gradient are variables in Paddle. py::return_value_policy::reference); py::class_(m, "Reader", "") - .def("has_next", &framework::ReaderHolder::HasNext) .def("reset", &framework::ReaderHolder::ReInit); py::class_(m, "Scope", "") diff --git a/paddle/fluid/pybind/recordio.cc b/paddle/fluid/pybind/recordio.cc index 0644d91425..330d104e0a 100644 --- a/paddle/fluid/pybind/recordio.cc +++ b/paddle/fluid/pybind/recordio.cc @@ -39,7 +39,7 @@ class RecordIOWriter { void CompleteAppendTensor() { auto& ctx = *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - framework::WriteToRecordIO(writer_, tensors_, ctx); + framework::WriteToRecordIO(&writer_, tensors_, ctx); tensors_.clear(); } diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index d016ab9008..8ba6bd18e9 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -236,13 +236,9 @@ def monkey_patch_reader_methods(reader): var = scope.find_var(reader.name) return var.get_reader() - def eof(): - return not __get_reader__().has_next() - def reset(): return __get_reader__().reset() - reader.eof = eof reader.reset = reset reader.stop_gradient = True reader.persistable = True @@ -299,8 +295,7 @@ def open_recordio_file(filename, shapes(list): List of tuples which declaring data shapes. lod_levels(list): List of ints which declaring data lod_level. dtypes(list): List of strs which declaring data type. - pass_num(int): Number of passes to run. After completing the - given number of passes, 'has_next()' will return False. + pass_num(int): Number of passes to run. for_parallel(Bool): Set it as True if you are going to run subsequent operators in parallel. @@ -377,8 +372,7 @@ def open_files(filenames, dtypes(list): List of strs which declaring data type. thread_num(int): The maximal concurrent prefetch thread number. buffer_size(int): The size of prefetch buffer. - pass_num(int): Number of passes to run. After completing the - given number of passes, 'has_next()' will return False. + pass_num(int): Number of passes to run. for_parallel(Bool): Set it as True if you are going to run subsequent operators in parallel. From 7a7829466664aff8a41364d566b852ca5859bed2 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 11 Apr 2018 01:37:24 +0800 Subject: [PATCH 22/33] Remove Readers' HasNext() --- paddle/fluid/operators/reader/open_files_op.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 9ce2e5dc2c..779dc8a6a0 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include // NOLINT + #include "paddle/fluid/framework/channel.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" From 3ebc5e1b7a3545b31df85d1b65afec0c8b7aefc1 Mon Sep 17 00:00:00 2001 From: cwgis Date: Wed, 11 Apr 2018 02:07:16 +0800 Subject: [PATCH 23/33] Update compile_paddle_lib_en.md (#9795) * Update compile_paddle_lib_en.md Fix https://github.com/PaddlePaddle/Paddle/issues/8916 * Update compile_paddle_lib_en.md --- doc/v2/howto/capi/compile_paddle_lib_en.md | 174 ++++++++++++++++++++- 1 file changed, 173 insertions(+), 1 deletion(-) diff --git a/doc/v2/howto/capi/compile_paddle_lib_en.md b/doc/v2/howto/capi/compile_paddle_lib_en.md index 11d69b9b79..6212a30811 100644 --- a/doc/v2/howto/capi/compile_paddle_lib_en.md +++ b/doc/v2/howto/capi/compile_paddle_lib_en.md @@ -1,3 +1,175 @@ ## Install and Build -TBD +### Download & Install + + Download the latest C-API development package from CI system and install. You can find the required version in the table below: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Version TipsC-API
cpu_avx_mklpaddle.tgz
cpu_avx_openblas-
cpu_noavx_openblaspaddle.tgz
cuda7.5_cudnn5_avx_mklpaddle.tgz
cuda8.0_cudnn5_avx_mklpaddle.tgz
cuda8.0_cudnn7_avx_mklpaddle.tgz
+ +### From source + + Users can also compile the C-API library from PaddlePaddle source code by compiling with the following compilation options: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionsValue
WITH_C_APION
WITH_PYTHONOFF(recommended)
WITH_SWIG_PYOFF(recommended)
WITH_GOLANGOFF(recommended)
WITH_GPUON/OFF
WITH_MKLON/OFF
+ +It is best to set up with recommended values to avoid linking with unnecessary libraries. Set other compilation options as you need. + +Pull the latest following code snippet from github, and configure compilation options(replace PADDLE_ROOT with the installation path of the PaddlePaddle C-API inference library): + +```shell +PADDLE_ROOT=/path/of/capi +git clone https://github.com/PaddlePaddle/Paddle.git +cd Paddle +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=$PADDLE_ROOT \ + -DCMAKE_BUILD_TYPE=Release \ + -DWITH_C_API=ON \ + -DWITH_SWIG_PY=OFF \ + -DWITH_GOLANG=OFF \ + -DWITH_PYTHON=OFF \ + -DWITH_MKL=OFF \ + -DWITH_GPU=OFF \ + .. +``` + +After running the above code to generate Makefile , run: `make && make install`. After successful compilation, the dependencies required by C-API(includes: (1)PaddlePaddle inference library and header files; (2) Third-party libraries and header files) will be stored in the `PADDLE_ROOT` directory. + +If the compilation is successful, see the following directory structure under `PADDLE_ROOT`(includes PaddlePaddle header files and libraries, and third-party libraries and header files(determined by the link methods if necessary)): + +```text +├── include +│   └── paddle +│   ├── arguments.h +│   ├── capi.h +│   ├── capi_private.h +│   ├── config.h +│   ├── error.h +│   ├── gradient_machine.h +│   ├── main.h +│   ├── matrix.h +│   ├── paddle_capi.map +│   └── vector.h +├── lib +│   ├── libpaddle_capi_engine.a +│   ├── libpaddle_capi_layers.a +│   ├── libpaddle_capi_shared.so +│   └── libpaddle_capi_whole.a +└── third_party + ├── gflags + │   ├── include + │   │   └── gflags + │   │   ├── gflags_completions.h + │   │   ├── gflags_declare.h + │   │   ... + │   └── lib + │   └── libgflags.a + ├── glog + │   ├── include + │   │   └── glog + │   │   ├── config.h + │   │   ... + │   └── lib + │   └── libglog.a + ├── openblas + │   ├── include + │   │   ├── cblas.h + │   │   ... + │   └── lib + │   ... + ├── protobuf + │   ├── include + │   │   └── google + │   │   └── protobuf + │   │   ... + │   └── lib + │   └── libprotobuf-lite.a + └── zlib + ├── include + │   ... + └── lib + ... + +``` + +### Linking Description: + +There are three kinds of linking methods: + +1. Linking with dynamic library `libpaddle_capi_shared.so`(This way is much more convenient and easier, **Without special requirements, it is recommended**), refer to the following: + 1. Compiling with CPU version and using `OpenBLAS`; only need to link one library named `libpaddle_capi_shared.so` to develop prediction program through C-API. + 1. Compiling with CPU version and using `MKL` lib, you need to link MKL library directly to develop prediction program through PaddlePaddle C-API, due to `MKL` has its own dynamic library. + 1. Compiling with GPU version, CUDA library will be loaded dynamically on prediction program run-time, and also set CUDA library to  `LD_LIBRARY_PATH` environment variable. + +2. Linking with static library `libpaddle_capi_whole.a`,refer to the following: + 1. Specify `-Wl,--whole-archive` linking options. + 1. Explicitly link third-party libraries such as `gflags`、`glog`、`libz`、`protobuf` .etc, you can find them under `PADDLE_ROOT/third_party` directory. + 1. Use OpenBLAS library if compiling C-API,must explicitly link `libopenblas.a`. + 1. Use MKL when compiling C-API, must explicitly link MKL dynamic library. + +3. Linking with static library `libpaddle_capi_layers.a` and `libpaddle_capi_engine.a`,refer to the following: + 1. This linking methods is mainly used for mobile prediction. + 1. Split `libpaddle_capi_whole.a` into two static linking library at least to reduce the size of linking libraries. + 1. Specify `-Wl,--whole-archive -lpaddle_capi_layers`  and `-Wl,--no-whole-archive -lpaddle_capi_engine` for linking. + 1. The third-party dependencies need explicitly link same as method 2 above. From cea391217ac34b63ac589a2bc7ee296f0321f298 Mon Sep 17 00:00:00 2001 From: Siddharth Goyal Date: Tue, 10 Apr 2018 13:04:16 -0700 Subject: [PATCH 24/33] Fix cpplint errors (#9800) --- paddle/fluid/operators/batch_norm_op.cc | 1 + paddle/fluid/operators/batch_norm_op.cu.cc | 3 +-- paddle/fluid/operators/batch_size_like.h | 3 ++- paddle/fluid/operators/box_coder_op.h | 1 + paddle/fluid/operators/compare_op.cc | 1 + paddle/fluid/operators/concat_op.cc | 1 + paddle/fluid/operators/cond_op.h | 1 + paddle/fluid/operators/conv_transpose_op.cc | 2 ++ paddle/fluid/operators/conv_transpose_op.h | 2 +- paddle/fluid/operators/crf_decoding_op.h | 1 + paddle/fluid/operators/crop_op.h | 3 ++- 11 files changed, 14 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/operators/batch_norm_op.cc b/paddle/fluid/operators/batch_norm_op.cc index 36049ee6a4..c9939e8602 100644 --- a/paddle/fluid/operators/batch_norm_op.cc +++ b/paddle/fluid/operators/batch_norm_op.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/batch_norm_op.h" +#include #include "paddle/fluid/framework/data_layout.h" namespace paddle { diff --git a/paddle/fluid/operators/batch_norm_op.cu.cc b/paddle/fluid/operators/batch_norm_op.cu.cc index 6ceacc3992..eecb58e11e 100644 --- a/paddle/fluid/operators/batch_norm_op.cu.cc +++ b/paddle/fluid/operators/batch_norm_op.cu.cc @@ -13,9 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/batch_norm_op.h" -#include "paddle/fluid/framework/data_layout.h" - #include +#include "paddle/fluid/framework/data_layout.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/cudnn_helper.h" #include "paddle/fluid/platform/float16.h" diff --git a/paddle/fluid/operators/batch_size_like.h b/paddle/fluid/operators/batch_size_like.h index 0bdf27e620..dd51a11fbe 100644 --- a/paddle/fluid/operators/batch_size_like.h +++ b/paddle/fluid/operators/batch_size_like.h @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once - +#include +#include #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/math/math_function.h" diff --git a/paddle/fluid/operators/box_coder_op.h b/paddle/fluid/operators/box_coder_op.h index 3c7cac1cd1..77fc6c2b62 100644 --- a/paddle/fluid/operators/box_coder_op.h +++ b/paddle/fluid/operators/box_coder_op.h @@ -10,6 +10,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/math/math_function.h" diff --git a/paddle/fluid/operators/compare_op.cc b/paddle/fluid/operators/compare_op.cc index 9a139ab27e..3a6a357e81 100644 --- a/paddle/fluid/operators/compare_op.cc +++ b/paddle/fluid/operators/compare_op.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/compare_op.h" +#include #include "paddle/fluid/framework/op_registry.h" namespace paddle { diff --git a/paddle/fluid/operators/concat_op.cc b/paddle/fluid/operators/concat_op.cc index 0eedd8ee51..d65a7b3467 100644 --- a/paddle/fluid/operators/concat_op.cc +++ b/paddle/fluid/operators/concat_op.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/concat_op.h" +#include #include namespace paddle { diff --git a/paddle/fluid/operators/cond_op.h b/paddle/fluid/operators/cond_op.h index a04fae2182..d3888923db 100644 --- a/paddle/fluid/operators/cond_op.h +++ b/paddle/fluid/operators/cond_op.h @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include #include #include "glog/logging.h" #include "paddle/fluid/framework/ddim.h" diff --git a/paddle/fluid/operators/conv_transpose_op.cc b/paddle/fluid/operators/conv_transpose_op.cc index b2a3cfc89f..08f5939d42 100644 --- a/paddle/fluid/operators/conv_transpose_op.cc +++ b/paddle/fluid/operators/conv_transpose_op.cc @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/conv_transpose_op.h" +#include +#include namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/conv_transpose_op.h b/paddle/fluid/operators/conv_transpose_op.h index d4e4b641ec..bfc0177c2a 100644 --- a/paddle/fluid/operators/conv_transpose_op.h +++ b/paddle/fluid/operators/conv_transpose_op.h @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once - +#include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/math/im2col.h" diff --git a/paddle/fluid/operators/crf_decoding_op.h b/paddle/fluid/operators/crf_decoding_op.h index 2b2a733fb9..3f5fab3b38 100644 --- a/paddle/fluid/operators/crf_decoding_op.h +++ b/paddle/fluid/operators/crf_decoding_op.h @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/math/math_function.h" diff --git a/paddle/fluid/operators/crop_op.h b/paddle/fluid/operators/crop_op.h index c5ac684978..f05c2e2328 100644 --- a/paddle/fluid/operators/crop_op.h +++ b/paddle/fluid/operators/crop_op.h @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once - +#include +#include #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/operators/strided_memcpy.h" From 7ed457e77a44581503f929cef64675d458f10642 Mon Sep 17 00:00:00 2001 From: Kexin Zhao Date: Tue, 10 Apr 2018 18:30:08 -0700 Subject: [PATCH 25/33] Fix cuda 7.5 error with cublas GEMM (#9811) * fix gemm error for cuda 7.5 * fix version number --- paddle/fluid/operators/math/math_function.cu | 21 +++++++++++++---- paddle/fluid/platform/dynload/cublas.cc | 4 ++++ paddle/fluid/platform/dynload/cublas.h | 24 +++++++++++++------- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/operators/math/math_function.cu b/paddle/fluid/operators/math/math_function.cu index 82e1294314..e53183603f 100644 --- a/paddle/fluid/operators/math/math_function.cu +++ b/paddle/fluid/operators/math/math_function.cu @@ -39,13 +39,14 @@ void gemm( cublasOperation_t cuTransB = (transB == CblasNoTrans) ? CUBLAS_OP_N : CUBLAS_OP_T; - float h_alpha = static_cast(alpha); - float h_beta = static_cast(beta); - // TODO(kexinzhao): add processing code for compute capability < 53 case PADDLE_ENFORCE_GE(context.GetComputeCapability(), 53, "cublas fp16 gemm requires GPU compute capability >= 53"); +#if CUDA_VERSION >= 8000 + float h_alpha = static_cast(alpha); + float h_beta = static_cast(beta); + cublasGemmAlgo_t algo = CUBLAS_GEMM_DFALT; #if CUDA_VERSION >= 9000 if (context.GetComputeCapability() >= 70) { @@ -56,7 +57,7 @@ void gemm( PADDLE_ENFORCE(platform::dynload::cublasSetMathMode(context.cublas_handle(), CUBLAS_DEFAULT_MATH)); } -#endif +#endif // CUDA_VERSION >= 9000 // cublasHgemm does true FP16 computation which is slow for non-Volta // GPUs. So use cublasGemmEx instead which does pesudo FP16 computation: @@ -66,6 +67,18 @@ void gemm( context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, B, CUDA_R_16F, ldb, A, CUDA_R_16F, lda, &h_beta, C, CUDA_R_16F, N, CUDA_R_32F, algo)); +#else + // CUDA 7.5 does not support cublasGemmEx, hence we fall back to use hgemm + const half h_alpha = static_cast(alpha); + const half h_beta = static_cast(beta); + const half* h_A = reinterpret_cast(A); + const half* h_B = reinterpret_cast(B); + half* h_C = reinterpret_cast(C); + + PADDLE_ENFORCE(platform::dynload::cublasHgemm( + context.cublas_handle(), cuTransB, cuTransA, N, M, K, &h_alpha, h_B, ldb, + h_A, lda, &h_beta, h_C, N)); +#endif // CUDA_VERSION >= 8000 } template <> diff --git a/paddle/fluid/platform/dynload/cublas.cc b/paddle/fluid/platform/dynload/cublas.cc index eb541579a1..361d3439b8 100644 --- a/paddle/fluid/platform/dynload/cublas.cc +++ b/paddle/fluid/platform/dynload/cublas.cc @@ -28,6 +28,10 @@ CUBLAS_BLAS_ROUTINE_EACH(DEFINE_WRAP); CUBLAS_BLAS_ROUTINE_EACH_R2(DEFINE_WRAP); #endif +#ifdef CUBLAS_BLAS_ROUTINE_EACH_R3 +CUBLAS_BLAS_ROUTINE_EACH_R3(DEFINE_WRAP); +#endif + } // namespace dynload } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/dynload/cublas.h b/paddle/fluid/platform/dynload/cublas.h index a41018d350..1ab55d6b9b 100644 --- a/paddle/fluid/platform/dynload/cublas.h +++ b/paddle/fluid/platform/dynload/cublas.h @@ -71,7 +71,6 @@ extern void *cublas_dso_handle; __macro(cublasDgemm_v2); \ __macro(cublasHgemm); \ __macro(cublasSgemmEx); \ - __macro(cublasGemmEx); \ __macro(cublasSgeam_v2); \ __macro(cublasDgeam_v2); \ __macro(cublasCreate_v2); \ @@ -83,11 +82,6 @@ extern void *cublas_dso_handle; __macro(cublasDgemmBatched); \ __macro(cublasCgemmBatched); \ __macro(cublasZgemmBatched); \ - __macro(cublasSgemmStridedBatched); \ - __macro(cublasDgemmStridedBatched); \ - __macro(cublasCgemmStridedBatched); \ - __macro(cublasZgemmStridedBatched); \ - __macro(cublasHgemmStridedBatched); \ __macro(cublasSgetrfBatched); \ __macro(cublasSgetriBatched); \ __macro(cublasDgetrfBatched); \ @@ -95,10 +89,24 @@ extern void *cublas_dso_handle; CUBLAS_BLAS_ROUTINE_EACH(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) +// APIs available after CUDA 8.0 +#if CUDA_VERSION >= 8000 +#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) \ + __macro(cublasGemmEx); \ + __macro(cublasSgemmStridedBatched); \ + __macro(cublasDgemmStridedBatched); \ + __macro(cublasCgemmStridedBatched); \ + __macro(cublasZgemmStridedBatched); \ + __macro(cublasHgemmStridedBatched); + +CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) +#endif + // APIs available after CUDA 9.0 #if CUDA_VERSION >= 9000 -#define CUBLAS_BLAS_ROUTINE_EACH_R2(__macro) __macro(cublasSetMathMode); -CUBLAS_BLAS_ROUTINE_EACH_R2(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) +#define CUBLAS_BLAS_ROUTINE_EACH_R3(__macro) __macro(cublasSetMathMode); + +CUBLAS_BLAS_ROUTINE_EACH_R3(DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP) #endif #undef DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP From 72b5de05fee1c94c7bd40c8b69ff8c4fe2aff7d9 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Wed, 11 Apr 2018 02:54:56 +0000 Subject: [PATCH 26/33] update unittest --- paddle/fluid/operators/read_op.cc | 1 + .../tests/unittests/test_parallel_executor.py | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 4496110cf8..bf02b99589 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -66,6 +66,7 @@ class ReadOp : public framework::OperatorBase { std::vector out_arg_names = Outputs("Out"); std::vector ins; reader->ReadNext(&ins); + PADDLE_ENFORCE(!ins.empty(), "There is no next data."); PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size()); for (size_t i = 0; i < ins.size(); ++i) { auto* out = diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 8401716db8..3c00f708f0 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -26,11 +26,14 @@ def simple_fc_net(use_feed): img = fluid.layers.data(name='image', shape=[784], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') else: - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', + reader = fluid.layers.open_files( + filenames=['./mnist.recordio'], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + thread_num=1, + for_parallel=True) + reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img for _ in xrange(4): @@ -51,11 +54,14 @@ def fc_with_batchnorm(use_feed): img = fluid.layers.data(name='image', shape=[784], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') else: - reader = fluid.layers.open_recordio_file( - filename='./mnist.recordio', + reader = fluid.layers.open_files( + filenames=['mnist.recordio'], shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], - dtypes=['float32', 'int64']) + dtypes=['float32', 'int64'], + thread_num=1, + for_parallel=True) + reader = fluid.layers.io.double_buffer(reader) img, label = fluid.layers.read_file(reader) hidden = img From 129859e732fa7ac056c4c453619b2c84c98bc0ac Mon Sep 17 00:00:00 2001 From: qingqing01 Date: Wed, 11 Apr 2018 12:34:46 +0800 Subject: [PATCH 27/33] Support data type int64 in NCCL. (#9818) --- paddle/fluid/platform/nccl_helper.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 2999004320..3a2a423486 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -14,8 +14,9 @@ #pragma once -#include +#include // NOLINT #include +#include #include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/enforce.h" @@ -29,6 +30,8 @@ inline ncclDataType_t ToNCCLDataType(std::type_index type) { return ncclDouble; } else if (type == typeid(int)) { // NOLINT return ncclInt; + } else if (type == typeid(int64_t)) { // NOLINT + return ncclInt64; } else { PADDLE_THROW("Not supported"); } @@ -66,23 +69,23 @@ struct NCCLContext { return boost::get(ctx_->GetPlace()).device; } - static void InitNCCLContext(std::unordered_map &contexts, + static void InitNCCLContext(std::unordered_map *contexts, const std::vector &places) { std::vector comms; std::vector devs; - comms.resize(contexts.size()); - devs.reserve(contexts.size()); + comms.resize(contexts->size()); + devs.reserve(contexts->size()); for (auto &p : places) { devs.push_back(boost::get(p).device); } PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - &comms[0], static_cast(contexts.size()), &devs[0])); + &comms[0], static_cast(contexts->size()), &devs[0])); int i = 0; for (auto &dev_id : devs) { - contexts.at(dev_id).comm_ = comms[i++]; + contexts->at(dev_id).comm_ = comms[i++]; } } }; @@ -91,7 +94,7 @@ struct NCCLContextMap { std::unordered_map contexts_; std::vector order_; - NCCLContextMap(const std::vector &places) { + explicit NCCLContextMap(const std::vector &places) { order_.reserve(places.size()); for (auto &p : places) { int dev_id = boost::get(p).device; From 273f4892b21ff8e17fba300071943846e06b75cf Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Wed, 11 Apr 2018 04:52:08 +0000 Subject: [PATCH 28/33] update recordio unittest --- paddle/fluid/framework/reader.cc | 4 +++- .../fluid/tests/unittests/test_recordio_reader.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 56bf00e5f9..76126f3dc6 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector &dims) : dims_(dims) {} void FileReader::ReadNext(std::vector *out) { ReadNextImpl(out); - PADDLE_ENFORCE_EQ(out->size(), dims_.size()); + if (out->empty()) { + return; + } for (size_t i = 0; i < dims_.size(); ++i) { auto &actual = out->at(i).dims(); auto &expect = dims_[i]; diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index 096d99a3f3..2982cb8ceb 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -65,8 +65,14 @@ class TestRecordIO(unittest.TestCase): # train a pass batch_id = 0 - while not data_file.eof(): - tmp, = exe.run(fetch_list=[avg_loss]) + while True: + ex = None + try: + tmp, = exe.run(fetch_list=[avg_loss]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break + avg_loss_np.append(tmp) batch_id += 1 data_file.reset() From b1cc28dab34c2fb5b4eeb9446ee3b066e86fac71 Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Wed, 11 Apr 2018 04:54:14 +0000 Subject: [PATCH 29/33] update --- python/paddle/fluid/tests/unittests/test_recordio_reader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index 2982cb8ceb..7c8e7f634f 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -66,7 +66,6 @@ class TestRecordIO(unittest.TestCase): # train a pass batch_id = 0 while True: - ex = None try: tmp, = exe.run(fetch_list=[avg_loss]) except fluid.core.EnforceNotMet as ex: From 0dacbbe1fe79d7643ce56c6ee630b8fe1270990b Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 11 Apr 2018 14:07:58 +0800 Subject: [PATCH 30/33] update multi_pass_reader unittest --- .../fluid/tests/unittests/test_multi_pass_reader.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py index c8a8afbea6..1471843ded 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py @@ -57,8 +57,12 @@ class TestMultipleReader(unittest.TestCase): exe.run(fluid.default_startup_program()) batch_count = 0 - while not data_file.eof(): - img_val, = exe.run(fetch_list=[img]) + while True: + try: + img_val, = exe.run(fetch_list=[img]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) data_file.reset() From 8c1eb8693e58b2d516eb5bba1ed966ee81bf6cbf Mon Sep 17 00:00:00 2001 From: JiayiFeng Date: Wed, 11 Apr 2018 06:12:35 +0000 Subject: [PATCH 31/33] update unittest --- ...{test_multiple_reader.py => test_multi_file_reader.py} | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) rename python/paddle/fluid/tests/unittests/{test_multiple_reader.py => test_multi_file_reader.py} (91%) diff --git a/python/paddle/fluid/tests/unittests/test_multiple_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py similarity index 91% rename from python/paddle/fluid/tests/unittests/test_multiple_reader.py rename to python/paddle/fluid/tests/unittests/test_multi_file_reader.py index a60a5d6c4a..5dc41e54d6 100644 --- a/python/paddle/fluid/tests/unittests/test_multiple_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py @@ -61,8 +61,12 @@ class TestMultipleReader(unittest.TestCase): exe.run(fluid.default_startup_program()) batch_count = 0 - while not data_files.eof(): - img_val, = exe.run(fetch_list=[img]) + while True: + try: + img_val, = exe.run(fetch_list=[img]) + except fluid.core.EnforceNotMet as ex: + self.assertIn("There is no next data.", ex.message) + break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) data_files.reset() From c64190ecbb211c09054b0ffea25179fdcad50207 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Wed, 11 Apr 2018 14:44:22 +0800 Subject: [PATCH 32/33] Polish NCCLHelper --- paddle/fluid/platform/nccl_helper.h | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index 3a2a423486..ca9ab2c7ae 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -61,7 +61,7 @@ struct NCCLContext { ncclComm_t comm_; explicit NCCLContext(int dev_id) - : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))) {} + : ctx_(new CUDADeviceContext(CUDAPlace(dev_id))), comm_{nullptr} {} cudaStream_t stream() const { return ctx_->stream(); } @@ -95,6 +95,7 @@ struct NCCLContextMap { std::vector order_; explicit NCCLContextMap(const std::vector &places) { + PADDLE_ENFORCE(!places.empty()); order_.reserve(places.size()); for (auto &p : places) { int dev_id = boost::get(p).device; @@ -105,15 +106,17 @@ struct NCCLContextMap { order_.size(), contexts_.size(), "NCCL Context Map does not support contain two or more same device"); - std::vector comms; - comms.resize(order_.size()); + if (places.size() > 1) { + std::vector comms; + comms.resize(order_.size()); - PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( - &comms[0], static_cast(order_.size()), &order_[0])); + PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( + &comms[0], static_cast(order_.size()), &order_[0])); - int i = 0; - for (auto &dev_id : order_) { - contexts_.at(dev_id).comm_ = comms[i++]; + int i = 0; + for (auto &dev_id : order_) { + contexts_.at(dev_id).comm_ = comms[i++]; + } } } From d52fa26fdab7a0497a3e7f49833d1b3827955c44 Mon Sep 17 00:00:00 2001 From: dzhwinter Date: Wed, 11 Apr 2018 17:03:39 +0800 Subject: [PATCH 33/33] Feature/metrics (#9791) * "add metrics" * "add fluid metrics" * "add import guards" * "show warnings" * "add demo" * "fix ci" * "add some details" * "fix cci" * "add demo Python" * "add metrics" --- benchmark/fluid/mnist.py | 7 +- python/paddle/fluid/__init__.py | 1 + python/paddle/fluid/average.py | 6 + python/paddle/fluid/evaluator.py | 4 + python/paddle/fluid/layers/metric.py | 37 ++- python/paddle/fluid/metrics.py | 378 +++++++++++++++++++++++++++ 6 files changed, 427 insertions(+), 6 deletions(-) create mode 100644 python/paddle/fluid/metrics.py diff --git a/benchmark/fluid/mnist.py b/benchmark/fluid/mnist.py index 43866da9cb..dc10ac2ec1 100644 --- a/benchmark/fluid/mnist.py +++ b/benchmark/fluid/mnist.py @@ -139,9 +139,6 @@ def run_benchmark(model, args): # inference program inference_program = fluid.default_main_program().clone() - with fluid.program_guard(inference_program): - inference_program = fluid.io.get_inference_program( - target_vars=[batch_acc, batch_size_tensor]) # Optimization opt = fluid.optimizer.AdamOptimizer( @@ -161,7 +158,7 @@ def run_benchmark(model, args): train_reader = paddle.batch( paddle.dataset.mnist.train(), batch_size=args.batch_size) - accuracy = fluid.average.WeightedAverage() + accuracy = fluid.metrics.Accuracy() iters, num_samples, start_time = 0, 0, time.time() for pass_id in range(args.pass_num): accuracy.reset() @@ -184,7 +181,7 @@ def run_benchmark(model, args): "label": y_data}, fetch_list=[avg_cost, batch_acc, batch_size_tensor] ) # The accuracy is the accumulation of batches, but not the current batch. - accuracy.add(value=outs[1], weight=outs[2]) + accuracy.update(value=outs[1], weight=outs[2]) iters += 1 num_samples += len(y_data) loss = np.array(outs[0]) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index a5a3884750..f757411b85 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -29,6 +29,7 @@ import optimizer import backward import regularizer import average +import metrics from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace diff --git a/python/paddle/fluid/average.py b/python/paddle/fluid/average.py index ded6eb0859..6abe8233b0 100644 --- a/python/paddle/fluid/average.py +++ b/python/paddle/fluid/average.py @@ -13,6 +13,7 @@ # limitations under the License. import numpy as np +import warnings """ Class of all kinds of Average. @@ -22,6 +23,8 @@ import numpy as np wrappers of Python functions. """ +__all__ = ["WeightedAverage"] + def _is_number_(var): return isinstance(var, int) or isinstance(var, float) or (isinstance( @@ -34,6 +37,9 @@ def _is_number_or_matrix_(var): class WeightedAverage(object): def __init__(self): + warnings.warn( + "The %s is deprecated, please use fluid.metrics.Accuracy instead." % + (self.__class__.__name__), Warning) self.reset() def reset(self): diff --git a/python/paddle/fluid/evaluator.py b/python/paddle/fluid/evaluator.py index 19e5b61b0b..13475025b5 100644 --- a/python/paddle/fluid/evaluator.py +++ b/python/paddle/fluid/evaluator.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings import numpy as np import layers @@ -59,6 +60,9 @@ class Evaluator(object): """ def __init__(self, name, **kwargs): + warnings.warn( + "The %s is deprecated, because maintain a modified program inside evaluator cause bug easily, please use fluid.metrics.%s instead." + % (self.__class__.__name__, self.__class__.__name__), Warning) self.states = [] self.metrics = [] self.helper = LayerHelper(name, **kwargs) diff --git a/python/paddle/fluid/layers/metric.py b/python/paddle/fluid/layers/metric.py index 3d9157ad4e..f66dccfa2d 100644 --- a/python/paddle/fluid/layers/metric.py +++ b/python/paddle/fluid/layers/metric.py @@ -15,12 +15,13 @@ All layers just related to metric. """ +import warnings from ..layer_helper import LayerHelper from ..initializer import Normal, Constant from ..framework import Variable from ..param_attr import ParamAttr -__all__ = ['accuracy'] +__all__ = ['accuracy', 'auc'] def accuracy(input, label, k=1, correct=None, total=None): @@ -55,3 +56,37 @@ def accuracy(input, label, k=1, correct=None, total=None): "Total": [total], }) return acc_out + + +def auc(input, label, curve='ROC', num_thresholds=200): + warnings.warn( + "This interface not recommended, fluid.layers.auc compute the auc at every minibatch, \ + but can not aggregate them and get the pass AUC, because pass \ + auc can not be averaged with weighted from the minibatch auc value. \ + Please use fluid.metrics.Auc, it can compute the auc value via Python natively, \ + which can get every minibatch and every pass auc value.", Warning) + helper = LayerHelper("auc", **locals()) + topk_out = helper.create_tmp_variable(dtype=input.dtype) + topk_indices = helper.create_tmp_variable(dtype="int64") + helper.append_op( + type="top_k", + inputs={"X": [input]}, + outputs={"Out": [topk_out], + "Indices": [topk_indices]}, + attrs={"k": k}) + auc_out = helper.create_tmp_variable(dtype="float32") + if correct is None: + correct = helper.create_tmp_variable(dtype="int64") + if total is None: + total = helper.create_tmp_variable(dtype="int64") + helper.append_op( + type="accuracy", + inputs={ + "Out": [topk_out], + "Indices": [topk_indices], + "Label": [label] + }, + attrs={"curve": curve, + "num_thresholds": num_thresholds}, + outputs={"AUC": [auc_out], }) + return auc_out diff --git a/python/paddle/fluid/metrics.py b/python/paddle/fluid/metrics.py new file mode 100644 index 0000000000..99a81c1d42 --- /dev/null +++ b/python/paddle/fluid/metrics.py @@ -0,0 +1,378 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Fluid Metrics + +The metrics are accomplished via Python natively. +""" +import numpy as np +import copy +import warnings + +__all__ = [ + 'MetricBase', + 'CompositeMetric', + 'Accuracy', + 'ChunkEvaluator', + 'EditDistance', + 'DetectionMAP', + 'Auc', +] + + +def _is_numpy_(var): + return isinstance(var, (np.ndarray, np.generic)) + + +def _is_number_(var): + return isinstance(var, int) or isinstance(var, float) or (isinstance( + var, np.ndarray) and var.shape == (1, )) + + +def _is_number_or_matrix_(var): + return _is_number_(var) or isinstance(var, np.ndarray) + + +class MetricBase(object): + """ + Base Class for all evaluators + + Args: + name(str): The name of evaluator. such as, "accuracy". Used for generate + temporary variable name. + Interface: + Note(*) : the states is the attributes who not has _ prefix. + + get_config(): print current states and configuration + reset(): clear the states. If the Metrics states type is not (int, float, np.ndarray), + Please override this method. + update(): update states at every minibatch + eval(): get metric evaluation in numpy type. + """ + + def __init__(self, name, **kwargs): + self._name = str(name) if name != None else self.__class__.__name__ + self._kwargs = kwargs if kwargs != None else dict() + self.reset() + + def __str__(self): + return self._name + + def reset(self): + """ + states is the attributes who not has _ prefix. + reset the states of metrics. + """ + states = { + attr: value + for attr, value in self.__dict__.iteritems() + if not attr.startswith("_") + } + for attr, value in states.iteritems(): + if isinstance(value, int): + setattr(self, attr, 0) + elif isinstance(value, float): + setattr(self, attr, .0) + elif isinstance(value, (np.ndarray, np.generic)): + setattr(self, attr, np.zeros_like(value)) + else: + setattr(self, attr, None) + + def get_config(self): + states = { + attr: value + for attr, value in self.__dict__.iteritems() + if not attr.startswith("_") + } + config = copy.deepcopy(self._kwargs) + config.update({"name": self._name, "states": copy.deepcopy(states)}) + return config + + def update(self): + raise NotImplementedError() + + def eval(self): + raise NotImplementedError() + + +class CompositeMetric(MetricBase): + """ + Compute multiple metrics in each minibatch. + for example, merge F1, accuracy, recall into one Metric. + """ + + def __init__(self, name=None, **kwargs): + super(CompositeMetric, self).__init__(name, kwargs) + self._metrics = [] + + def add_metric(self, metric): + if not isinstance(metric, MetricBase): + raise ValueError("SubMetric should be inherit from MetricBase.") + self._metrics.append(metric) + + def eval(self): + ans = [] + for m in self._metrics: + ans.append(m.eval()) + return ans + + +class Accuracy(MetricBase): + """ + Accumulate the accuracy from minibatches and compute the average accuracy + for every pass. + + Args: + name: the metrics name + + Example: + minibatch_accuracy = fluid.layers.accuracy(pred, label) + accuracy_evaluator = fluid.metrics.Accuracy() + for epoch in PASS_NUM: + accuracy_evaluator.reset() + for data in batches: + loss = exe.run(fetch_list=[cost, minibatch_accuracy]) + accuracy_evaluator.update(value=minibatch_accuracy, weight=batches) + accuracy = accuracy_evaluator.eval() + """ + + def __init__(self, name=None): + super(Accuracy, self).__init__(name) + self.value = .0 + self.weight = .0 + + def update(self, value, weight): + if not _is_number_or_matrix_(value): + raise ValueError( + "The 'value' must be a number(int, float) or a numpy ndarray.") + if not _is_number_(weight): + raise ValueError("The 'weight' must be a number(int, float).") + self.value += value * weight + self.weight += weight + + def eval(self): + if self.weight == 0: + raise ValueError( + "There is no data in Accuracy Metrics. Please check layers.accuracy output has added to Accuracy." + ) + return self.value / self.weight + + +class ChunkEvalutor(MetricBase): + """ + Accumulate counter numbers output by chunk_eval from mini-batches and + compute the precision recall and F1-score using the accumulated counter + numbers. + """ + + def __init__(self, name=None): + super(ChunkEvalutor, self).__init__(name) + self.num_infer_chunks = 0 + self.num_label_chunks = 0 + self.num_correct_chunks = 0 + + def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks): + if not _is_number_or_matrix_(num_infer_chunks): + raise ValueError( + "The 'num_infer_chunks' must be a number(int, float) or a numpy ndarray." + ) + if not _is_number_or_matrix_(num_label_chunks): + raise ValueError( + "The 'num_label_chunks' must be a number(int, float) or a numpy ndarray." + ) + if not _is_number_or_matrix_(num_correct_chunks): + raise ValueError( + "The 'num_correct_chunks' must be a number(int, float) or a numpy ndarray." + ) + self.num_infer_chunks += num_infer_chunks + self.num_label_chunks += num_label_chunks + self.num_correct_chunks += num_correct_chunks + + def eval(self): + precision = float( + self.num_correct_chunks + ) / self.num_infer_chunks if self.num_infer_chunks else 0 + recall = float(self.num_correct_chunks + ) / self.num_label_chunks if self.num_label_chunks else 0 + f1_score = float(2 * precision * recall) / ( + precision + recall) if self.num_correct_chunks else 0 + return precision, recall, f1_score + + +class EditDistance(MetricBase): + """ + Accumulate edit distance sum and sequence number from mini-batches and + compute the average edit_distance and instance error of all batches. + + Args: + name: the metrics name + + Example: + edit_distance_metrics = fluid.layers.edit_distance(input, label) + distance_evaluator = fluid.metrics.EditDistance() + for epoch in PASS_NUM: + distance_evaluator.reset() + for data in batches: + loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics)) + distance_evaluator.update(*edit_distance_metrics) + distance, instance_error = distance_evaluator.eval() + + In the above example: + 'distance' is the average of the edit distance in a pass. + 'instance_error' is the instance error rate in a pass. + + """ + + def __init__(self, name): + super(EditDistance, self).__init__(name) + self.total_distance = .0 + self.seq_num = 0 + self.instance_error = 0 + + def update(self, distances, seq_num): + if not _is_numpy_(distances): + raise ValueError("The 'distances' must be a numpy ndarray.") + if not _is_number_(seq_num): + raise ValueError("The 'seq_num' must be a number(int, float).") + seq_right_count = np.sum(distances == 0) + total_distance = np.sum(distances) + self.seq_num += seq_num + self.instance_error += seq_num - seq_right_count + self.total_distance += total_distance + + def eval(): + if self.seq_num == 0: + raise ValueError( + "There is no data in EditDistance Metric. Please check layers.edit_distance output has been added to EditDistance." + ) + avg_distance = self.total_distance / self.seq_num + avg_instance_error = self.instance_error / self.seq_num + return avg_distance, avg_instance_error + + +class DetectionMAP(MetricBase): + """ + Calculate the detection mean average precision (mAP). + + TODO (Dang Qingqing): update the following doc. + The general steps are as follows: + 1. calculate the true positive and false positive according to the input + of detection and labels. + 2. calculate mAP value, support two versions: '11 point' and 'integral'. + + Please get more information from the following articles: + https://sanchom.wordpress.com/tag/average-precision/ + https://arxiv.org/abs/1512.02325 + """ + + def __init__(self, name=None): + super(DetectionMAP, self).__init__(name) + # the current map value + self.value = .0 + + def update(self, value, weight): + if not _is_number_or_matrix_(value): + raise ValueError( + "The 'value' must be a number(int, float) or a numpy ndarray.") + if not _is_number_(weight): + raise ValueError("The 'weight' must be a number(int, float).") + self.value += value + self.weight += weight + + def eval(self): + if self.weight == 0: + raise ValueError( + "There is no data in DetectionMAP Metrics. " + "Please check layers.detection_map output has added to DetectionMAP." + ) + return self.value / self.weight + + +class Auc(MetricBase): + """ + Auc Metrics which adapts to binary classification. + Need to note that auc metrics compute the value via Python natively. + If you concern the speed, please use the fluid.layers.auc instead. + + The `auc` function creates four local variables, `true_positives`, + `true_negatives`, `false_positives` and `false_negatives` that are used to + compute the AUC. To discretize the AUC curve, a linearly spaced set of + thresholds is used to compute pairs of recall and precision values. The area + under the ROC-curve is therefore computed using the height of the recall + values by the false positive rate, while the area under the PR-curve is the + computed using the height of the precision values by the recall. + + Args: + name: metric name + curve: Specifies the name of the curve to be computed, 'ROC' [default] or + 'PR' for the Precision-Recall-curve. + num_thresholds: The number of thresholds to use when discretizing the roc + curve. + + "NOTE: only implement the ROC curve type via Python now." + """ + + def __init__(self, name, curve='ROC', num_thresholds=200): + super(MetricBase, self).__init__(name, curve, num_thresholds) + self._curve = curve + self._num_thresholds = num_thresholds + self._epsilon = 1e-6 + self.tp_list = np.ndarray((num_thresholds, )) + self.fn_list = np.ndarray((num_thresholds, )) + self.tn_list = np.ndarray((num_thresholds, )) + self.fp_list = np.ndarray((num_thresholds, )) + + def update(self, labels, predictions, axis=1): + if not _is_numpy_(labels): + raise ValueError("The 'labels' must be a numpy ndarray.") + if not _is_numpy_(predictions): + raise ValueError("The 'predictions' must be a numpy ndarray.") + + kepsilon = 1e-7 # to account for floating point imprecisions + thresholds = [(i + 1) * 1.0 / (num_thresholds - 1) + for i in range(num_thresholds - 2)] + thresholds = [0.0 - kepsilon] + thresholds + [1.0 + kepsilon] + + # caculate TP, FN, TN, FP count + for idx_thresh, thresh in enumerate(thresholds): + tp, fn, tn, fp = 0, 0, 0, 0 + for i, lbl in enumerate(labels): + if lbl: + if predictions[i, 0] >= thresh: + tp += 1 + else: + fn += 1 + else: + if predictions[i, 0] >= thresh: + fp += 1 + else: + tn += 1 + tp_list[idx_thresh] += tp + fn_list[idx_thresh] += fn + tn_list[idx_thresh] += tn + fp_list[idx_thresh] += fp + + def eval(self): + epsilon = self._epsilon + num_thresholds = self._num_thresholds + tpr = (tp_list.astype("float32") + epsilon) / ( + tp_list + fn_list + epsilon) + fpr = fp_list.astype("float32") / (fp_list + tn_list + epsilon) + rec = (tp_list.astype("float32") + epsilon) / ( + tp_list + fp_list + epsilon) + + x = fpr[:num_thresholds - 1] - fpr[1:] + y = (tpr[:num_thresholds - 1] + tpr[1:]) / 2.0 + auc_value = np.sum(x * y) + return auc_value