From 40effc61afa72cbda5c3e7b8f846b6a97d584d52 Mon Sep 17 00:00:00 2001 From: Zeng Jinle <32832641+sneaxiy@users.noreply.github.com> Date: Mon, 14 Oct 2019 10:34:29 +0800 Subject: [PATCH] Refine py_reader exit (#20331) * refine py_reader exit, test=develop * fix multiprocess_reader exception unittest, test=develop * increase code coverage for legacy fluid.layers.py_reader, test=develop --- .../fluid/operators/reader/blocking_queue.h | 34 ++++++-- .../fluid/operators/reader/buffered_reader.cc | 5 +- .../reader/lod_tensor_blocking_queue.h | 2 + paddle/fluid/pybind/pybind.cc | 1 + paddle/fluid/pybind/reader_py.cc | 84 +++++++++++++------ python/paddle/fluid/layers/io.py | 2 +- python/paddle/fluid/reader.py | 2 +- .../test_multiprocess_reader_exception.py | 84 ++++++++++++------- 8 files changed, 151 insertions(+), 63 deletions(-) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index b23105916b..b8e2fca9ee 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -33,7 +33,7 @@ class BlockingQueue { // doesn't support GPU and it implements on buffered blocking queue. public: explicit BlockingQueue(size_t capacity, bool speed_test_mode = false) - : capacity_(capacity), speed_test_mode_(speed_test_mode), closed_(false) { + : capacity_(capacity), speed_test_mode_(speed_test_mode) { PADDLE_ENFORCE_GT( capacity_, static_cast(0), "The capacity of a reader::BlockingQueue must be greater than 0."); @@ -41,7 +41,9 @@ class BlockingQueue { bool Send(const T& elem) { std::unique_lock lock(mutex_); - send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + send_cv_.wait( + lock, [&] { return queue_.size() < capacity_ || closed_ || killed_; }); + EnforceNotKilled(); if (closed_) { VLOG(5) << "WARNING: Sending an element to a closed reader::BlokcingQueue."; @@ -55,7 +57,9 @@ class BlockingQueue { bool Send(T&& elem) { std::unique_lock lock(mutex_); - send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + send_cv_.wait( + lock, [&] { return queue_.size() < capacity_ || closed_ || killed_; }); + EnforceNotKilled(); if (closed_) { VLOG(5) << "WARNING: Sending an element to a closed reader::BlokcingQueue."; @@ -69,7 +73,9 @@ class BlockingQueue { bool Receive(T* elem) { std::unique_lock lock(mutex_); - receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; }); + receive_cv_.wait(lock, + [&] { return !queue_.empty() || closed_ || killed_; }); + EnforceNotKilled(); if (!queue_.empty()) { PADDLE_ENFORCE_NOT_NULL(elem); *elem = queue_.front(); @@ -87,6 +93,7 @@ class BlockingQueue { void ReOpen() { std::lock_guard lock(mutex_); + EnforceNotKilled(); VLOG(1) << "reopen queue"; closed_ = false; std::deque new_deque; @@ -118,10 +125,27 @@ class BlockingQueue { return queue_.size(); } + void Kill() { + std::lock_guard lock(mutex_); + VLOG(1) << "kill queue"; + closed_ = true; + killed_ = true; + send_cv_.notify_all(); + receive_cv_.notify_all(); + } + + private: + inline void EnforceNotKilled() { + PADDLE_ENFORCE_NE( + killed_, true, + "Blocking queue is killed because the data reader raises an exception"); + } + private: size_t capacity_; bool speed_test_mode_; - bool closed_; + bool closed_{false}; + bool killed_{false}; // the queue is broken since exception raises std::deque queue_; mutable std::mutex mutex_; diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index b332450c25..894d98ca99 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -26,7 +26,10 @@ BufferedReader::~BufferedReader() { VLOG(1) << "~BufferedReader"; reader_->Shutdown(); while (!position_.empty()) { - position_.front().wait(); + auto &front = position_.front(); + if (front.valid()) { + front.wait(); + } position_.pop(); } #ifdef PADDLE_WITH_CUDA diff --git a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h index be044085f1..cd295552cb 100644 --- a/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h +++ b/paddle/fluid/operators/reader/lod_tensor_blocking_queue.h @@ -65,6 +65,8 @@ class LoDTensorBlockingQueue { inline bool IsClosed() const { return queue_.IsClosed(); } + inline void Kill() { queue_.Kill(); } + private: BlockingQueue> queue_; }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 9858b6c8a1..a408c847ee 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -860,6 +860,7 @@ All parameter, weight, gradient are variables in Paddle. .def("size", &LoDTensorBlockingQueue::Size) .def("capacity", &LoDTensorBlockingQueue::Cap) .def("close", &LoDTensorBlockingQueue::Close) + .def("kill", &LoDTensorBlockingQueue::Kill) .def("is_closed", &LoDTensorBlockingQueue::IsClosed); m.def("init_lod_tensor_blocking_queue", diff --git a/paddle/fluid/pybind/reader_py.cc b/paddle/fluid/pybind/reader_py.cc index 4009bcf2a8..d33c3ec3c1 100644 --- a/paddle/fluid/pybind/reader_py.cc +++ b/paddle/fluid/pybind/reader_py.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/pybind/reader_py.h" +#include #include #include #include @@ -30,12 +31,6 @@ namespace pybind { namespace py = pybind11; -static void RaiseStopIterationException() { - VLOG(2) << "Raise StopIteration Exception in Python"; - py::gil_scoped_acquire guard; - throw py::stop_iteration(); -} - class MultiDeviceFeedReader { public: using ResultDictList = @@ -71,17 +66,12 @@ class MultiDeviceFeedReader { futures_.resize(dst_places.size()); ret_.resize(dst_places.size()); + exceptions_.assign(dst_places.size(), nullptr); ReadAsync(); } ResultDictList ReadNext() { - bool success = WaitFutures(); - - if (!success) { - RaiseStopIterationException(); - return {}; - } - + CheckNextStatus(); ResultDictList result(ret_.size()); for (size_t i = 0; i < ret_.size(); ++i) { for (size_t j = 0; j < names_.size(); ++j) { @@ -93,12 +83,7 @@ class MultiDeviceFeedReader { } ResultList ReadNextList() { - bool success = WaitFutures(); - if (!success) { - RaiseStopIterationException(); - return {}; - } - + CheckNextStatus(); ResultList result; result.reserve(ret_.size()); for (size_t i = 0; i < ret_.size(); ++i) { @@ -120,12 +105,32 @@ class MultiDeviceFeedReader { } private: - bool WaitFutures() { - bool success = true; - for (auto &f : futures_) { - success &= f.get(); + enum Status { + kSuccess = 0, // Read next data successfully + kEOF = 1, // Reach EOF + kException = 2 // Exception raises when reading + }; + + Status WaitFutures(std::exception_ptr *excep) { + bool is_success = true; + *excep = nullptr; + for (size_t i = 0; i < futures_.size(); ++i) { + auto each_status = futures_[i].get(); + if (UNLIKELY(each_status != Status::kSuccess)) { + is_success = false; + if (UNLIKELY(each_status == Status::kException)) { + PADDLE_ENFORCE_NOT_NULL(exceptions_[i]); + *excep = exceptions_[i]; + exceptions_[i] = nullptr; + } + } + } + + if (UNLIKELY(*excep)) { + return Status::kException; + } else { + return is_success ? Status::kSuccess : Status::kEOF; } - return success; } void Shutdown() { @@ -139,19 +144,44 @@ class MultiDeviceFeedReader { void ReadAsync() { for (size_t i = 0; i < readers_.size(); ++i) { futures_[i] = pool_->enqueue([this, i] { - readers_[i]->ReadNext(&ret_[i]); - return !ret_[i].empty(); + try { + readers_[i]->ReadNext(&ret_[i]); + return ret_[i].empty() ? Status::kEOF : Status::kSuccess; + } catch (...) { + exceptions_[i] = std::current_exception(); + return Status::kException; + } }); } } + void CheckNextStatus() { + std::exception_ptr excep; + Status status = WaitFutures(&excep); + + if (UNLIKELY(excep)) { + PADDLE_ENFORCE_EQ(status, Status::kException); + std::rethrow_exception(excep); + } + + if (UNLIKELY(status == Status::kEOF)) { + VLOG(2) << "Raise StopIteration Exception in Python"; + py::gil_scoped_acquire guard; + throw py::stop_iteration(); + } + + PADDLE_ENFORCE_EQ(status, Status::kSuccess); + } + std::shared_ptr queue_; std::vector names_; std::unique_ptr<::ThreadPool> pool_; std::vector> readers_; - std::vector> futures_; + std::vector> futures_; + std::vector exceptions_; + std::vector> ret_; }; diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index c006a46d04..35a33d8a8f 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -469,7 +469,7 @@ def _py_reader(capacity, break feed_queue.close() except Exception as ex: - feed_queue.close() + feed_queue.kill() logging.warn('Your decorated reader has raised an exception!') six.reraise(*sys.exc_info()) diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 0f61d20196..508076b161 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -482,7 +482,7 @@ class GeneratorLoader(DataLoaderBase): self._queue.close() self._thread = None except Exception as ex: - self._queue.close() + self._queue.kill() self._thread = None logging.warn('Your reader has raised an exception!') six.reraise(*sys.exc_info()) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py index 0507b05e62..16006b7e3d 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py @@ -20,6 +20,10 @@ import six import sys +class ReaderException(Exception): + pass + + class TestMultiprocessReaderException(unittest.TestCase): def setUp(self): self.use_pipe = False @@ -31,10 +35,13 @@ class TestMultiprocessReaderException(unittest.TestCase): else: return [fluid.CPUPlace()] - def main_impl(self, place, iterable): + def main_impl(self, place, iterable, use_legacy_py_reader): + sample_num = 40 + batch_size = 4 + def fake_reader(): def __impl__(): - for _ in range(40): + for _ in range(sample_num): if not self.raise_exception: yield list( np.random.uniform( @@ -45,37 +52,54 @@ class TestMultiprocessReaderException(unittest.TestCase): return __impl__ with fluid.program_guard(fluid.Program(), fluid.Program()): - image = fluid.layers.data(name='image', dtype='float32', shape=[10]) + if not use_legacy_py_reader: + image = fluid.data( + name='image', dtype='float32', shape=[None, 10]) - reader = fluid.io.PyReader( - feed_list=[image], capacity=2, iterable=iterable) + reader = fluid.io.PyReader( + feed_list=[image], capacity=2, iterable=iterable) + else: + reader = fluid.layers.py_reader( + capacity=2, shapes=[[-1, 10], ], dtypes=['float32', ]) + image = fluid.layers.read_file(reader) image_p_1 = image + 1 decorated_reader = multiprocess_reader( [fake_reader(), fake_reader()], use_pipe=self.use_pipe) - if isinstance(place, fluid.CUDAPlace): - reader.decorate_sample_generator( - decorated_reader, batch_size=4, places=fluid.cuda_places()) + if use_legacy_py_reader: + reader.decorate_paddle_reader( + fluid.io.batch( + decorated_reader, batch_size=batch_size)) else: - reader.decorate_sample_generator( - decorated_reader, batch_size=4, places=fluid.cpu_places()) + if isinstance(place, fluid.CUDAPlace): + reader.decorate_sample_generator( + decorated_reader, + batch_size=batch_size, + places=fluid.cuda_places()) + else: + reader.decorate_sample_generator( + decorated_reader, + batch_size=batch_size, + places=fluid.cpu_places()) exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + batch_num = int(sample_num * 2 / batch_size) + if iterable: for _ in range(3): num = 0 - for data in reader(): - exe.run(feed=data, fetch_list=[image_p_1]) - num += 1 - if not self.raise_exception: - self.assertEquals(num, 20) - else: + try: + for data in reader(): + exe.run(feed=data, fetch_list=[image_p_1]) + num += 1 + self.assertEquals(num, batch_num) + except fluid.core.EnforceNotMet as ex: self.assertEquals(num, 0) - raise ValueError('Reader raises exception') + raise ReaderException() else: for _ in range(3): num = 0 @@ -86,22 +110,26 @@ class TestMultiprocessReaderException(unittest.TestCase): num += 1 except fluid.core.EOFException: reader.reset() - if not self.raise_exception: - self.assertEquals(num, 20) - else: - self.assertEquals(num, 0) - raise ValueError('Reader raises exception') + self.assertFalse(self.raise_exception) + self.assertEquals(num, batch_num) + except fluid.core.EnforceNotMet as ex: + self.assertTrue(self.raise_exception) + self.assertEquals(num, 0) + raise ReaderException() def test_main(self): for p in self.places(): for iterable in [False, True]: - try: - with fluid.scope_guard(fluid.Scope()): - self.main_impl(p, iterable) + use_legacy_py_reader_range = [False + ] if iterable else [False, True] + for use_legacy_py_reader in use_legacy_py_reader_range: + try: + with fluid.scope_guard(fluid.Scope()): + self.main_impl(p, iterable, use_legacy_py_reader) - self.assertTrue(not self.raise_exception) - except ValueError: - self.assertTrue(self.raise_exception) + self.assertTrue(not self.raise_exception) + except ReaderException: + self.assertTrue(self.raise_exception) class TestCase1(TestMultiprocessReaderException):