diff --git a/paddle/fluid/framework/details/data_balance_op_handle.cc b/paddle/fluid/framework/details/data_balance_op_handle.cc index b914851fe0..d07235df58 100644 --- a/paddle/fluid/framework/details/data_balance_op_handle.cc +++ b/paddle/fluid/framework/details/data_balance_op_handle.cc @@ -62,7 +62,7 @@ std::vector> DataBalanceOpHandle::GetBalancePlan( } if (total_size < device_num) { // No enough data. - PADDLE_THROW("There is no next data."); + PADDLE_THROW_EOF(); } std::sort(size_device_vec.begin(), size_device_vec.end(), [](const std::array &a, const std::array &b) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index b1706eb12d..99b10254a7 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -98,9 +98,18 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( if (timeout) { std::lock_guard l(exception_mu_); if (exception_) { - auto exp = *exception_; - exception_.reset(); - throw exp; + std::exception *exp = exception_.get(); + if (dynamic_cast(exp)) { + auto e = *static_cast(exp); + exception_.reset(); + throw e; + } else if (dynamic_cast(exp)) { + auto e = *static_cast(exp); + exception_.reset(); + throw e; + } else { + LOG(FATAL) << "Unknown exception."; + } } else { continue; } @@ -199,6 +208,12 @@ void ThreadedSSAGraphExecutor::RunOp( running_ops_--; ready_var_q->Extend(op->Outputs()); VLOG(10) << op << " " << op->Name() << "Signal posted"; + } catch (platform::EOFException ex) { + std::lock_guard l(exception_mu_); + // EOFException will not cover up existing EnforceNotMet. + if (exception_.get() == nullptr) { + exception_.reset(new platform::EOFException(ex)); + } } catch (platform::EnforceNotMet ex) { std::lock_guard l(exception_mu_); exception_.reset(new platform::EnforceNotMet(ex)); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 90430be996..c69e0487e2 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -57,7 +57,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector places_; platform::DeviceContextPool fetch_ctxs_; std::mutex exception_mu_; - std::unique_ptr exception_; + std::unique_ptr exception_; std::atomic running_ops_; void InsertPendingOp(std::unordered_map *pending_ops, diff --git a/paddle/fluid/operators/read_op.cc b/paddle/fluid/operators/read_op.cc index 60e4eb7576..695d7ea83d 100644 --- a/paddle/fluid/operators/read_op.cc +++ b/paddle/fluid/operators/read_op.cc @@ -68,7 +68,7 @@ class ReadOp : public framework::OperatorBase { reader->ReadNext(&ins); if (ins.empty()) { if (Attr("throw_eof_exp")) { - PADDLE_THROW("There is no next data."); + PADDLE_THROW_EOF(); } else { ins.resize(out_arg_names.size()); for (auto& tensor : ins) { diff --git a/paddle/fluid/platform/enforce.h b/paddle/fluid/platform/enforce.h index 70bc9c4e83..566485cd3c 100644 --- a/paddle/fluid/platform/enforce.h +++ b/paddle/fluid/platform/enforce.h @@ -102,6 +102,15 @@ struct EnforceNotMet : public std::exception { const char* what() const noexcept { return err_str_.c_str(); } }; +struct EOFException : public std::exception { + std::string err_str_; + EOFException(const char* err_msg, const char* f, int l) { + err_str_ = string::Sprintf("%s at [%s:%d]", err_msg, f, l); + } + + const char* what() const noexcept { return err_str_.c_str(); } +}; + // Because most enforce conditions would evaluate to true, we can use // __builtin_expect to instruct the C++ compiler to generate code that // always forces branch prediction of true. @@ -242,6 +251,11 @@ inline void throw_on_error(T e) { #define PADDLE_ENFORCE(...) ::paddle::platform::throw_on_error(__VA_ARGS__); #endif +#define PADDLE_THROW_EOF() \ + do { \ + throw ::paddle::platform::EOFException("There is no next data.", __FILE__, \ + __LINE__); \ + } while (false) /* * Some enforce helpers here, usage: * int a = 1; diff --git a/paddle/fluid/platform/enforce_test.cc b/paddle/fluid/platform/enforce_test.cc index 57d751cc00..0e8684581a 100644 --- a/paddle/fluid/platform/enforce_test.cc +++ b/paddle/fluid/platform/enforce_test.cc @@ -210,3 +210,14 @@ TEST(ENFORCE_USER_DEFINED_CLASS, NE) { Dims a{{1, 2, 3, 4}}, b{{5, 6, 7, 8}}; ASSERT_THROW(PADDLE_ENFORCE_EQ(a, b), paddle::platform::EnforceNotMet); } + +TEST(EOF_EXCEPTION, THROW_EOF) { + bool caught_eof = false; + try { + PADDLE_THROW_EOF(); + } catch (paddle::platform::EOFException error) { + caught_eof = true; + EXPECT_TRUE(HasPrefix(StringPiece(error.what()), "There is no next data.")); + } + EXPECT_TRUE(caught_eof); +} diff --git a/paddle/fluid/pybind/exception.cc b/paddle/fluid/pybind/exception.cc index 08a2f185e1..831f30e35f 100644 --- a/paddle/fluid/pybind/exception.cc +++ b/paddle/fluid/pybind/exception.cc @@ -18,10 +18,13 @@ namespace paddle { namespace pybind { void BindException(pybind11::module* m) { + static pybind11::exception eof(*m, "EOFException"); static pybind11::exception exc(*m, "EnforceNotMet"); pybind11::register_exception_translator([](std::exception_ptr p) { try { if (p) std::rethrow_exception(p); + } catch (const platform::EOFException& e) { + eof(e.what()); } catch (const platform::EnforceNotMet& e) { exc(e.what()); } diff --git a/python/paddle/fluid/tests/unittests/test_data_balance.py b/python/paddle/fluid/tests/unittests/test_data_balance.py index b558d7c2ea..cffa3329ac 100644 --- a/python/paddle/fluid/tests/unittests/test_data_balance.py +++ b/python/paddle/fluid/tests/unittests/test_data_balance.py @@ -118,8 +118,7 @@ class TestDataBalance(unittest.TestCase): try: image_val, label_val = parallel_exe.run(fetch_list, return_numpy=True) - except fluid.core.EnforceNotMet as ex: - self.assertIn("There is no next data.", ex.message) + except fluid.core.EOFException: break ins_num = image_val.shape[0] broadcasted_label = np.ones( @@ -162,8 +161,7 @@ class TestDataBalance(unittest.TestCase): try: ins_tensor, label_tensor = parallel_exe.run( fetch_list, return_numpy=False) - except fluid.core.EnforceNotMet as ex: - self.assertIn("There is no next data.", ex.message) + except fluid.core.EOFException: break ins_val = np.array(ins_tensor) diff --git a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py index 3f940203b9..dbd510e64f 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py @@ -64,8 +64,7 @@ class TestMultipleReader(unittest.TestCase): while True: try: img_val, = exe.run(fetch_list=[img]) - except fluid.core.EnforceNotMet as ex: - self.assertIn("There is no next data.", ex.message) + except fluid.core.EOFException: break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py index 52e7cc1ffb..7fc9f55044 100644 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py @@ -59,8 +59,7 @@ class TestMultipleReader(unittest.TestCase): while True: try: img_val, = exe.run(fetch_list=[img]) - except fluid.core.EnforceNotMet as ex: - self.assertIn("There is no next data.", ex.message) + except fluid.core.EOFException: break batch_count += 1 self.assertLessEqual(img_val.shape[0], self.batch_size) diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index f32050014d..69a522e273 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -68,8 +68,7 @@ class TestRecordIO(unittest.TestCase): while True: try: tmp, = exe.run(fetch_list=[avg_loss]) - except fluid.core.EnforceNotMet as ex: - self.assertIn("There is no next data.", ex.message) + except fluid.core.EOFException: break avg_loss_np.append(tmp)