diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc index b97d35345e..8fe005383f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc @@ -63,9 +63,10 @@ Status FilterOp::operator()() { RETURN_UNEXPECTED_IF_NULL(tree_); filter_queues_.Init(num_workers_, oc_queue_size_); RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1))); + Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1)); // Synchronize with TaskManager. TaskManager::FindMe()->Post(); + RETURN_IF_NOT_OK(rc); RETURN_IF_NOT_OK(Collector()); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/db_connector.h b/mindspore/ccsrc/dataset/engine/db_connector.h index 7ea9837c44..b1fdd14ab6 100644 --- a/mindspore/ccsrc/dataset/engine/db_connector.h +++ b/mindspore/ccsrc/dataset/engine/db_connector.h @@ -62,7 +62,7 @@ class DbConnector : public Connector> { "[ERROR] nullptr detected when getting data from db connector"); } else { std::unique_lock lk(m_); - RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; })); + RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return (expect_consumer_ == worker_id) || end_of_file_; })); // Once an EOF message is encountered this flag will be set and we can return early. if (end_of_file_) { *result = std::make_unique(0, DataBuffer::kDeBFlagEOF);