diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc index 2394380ea4..b2e4170e84 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.cc @@ -94,7 +94,7 @@ Status CelebAOp::LaunchThreadsAndInitOp() { RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(attr_info_queue_->Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Walking attr file", std::bind(&CelebAOp::ParseAttrFile, this))); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CelebAOp::WorkerEntry, this, std::placeholders::_1))); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc index 0c2d57ff42..4a4367460f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.cc @@ -149,7 +149,7 @@ Status CifarOp::LaunchThreadsAndInitOp() { RETURN_STATUS_UNEXPECTED("tree_ not set"); } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK( tree_->AllTasks()->CreateAsyncTask("Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this))); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1))); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc index 37a74f019a..a86fd677cf 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.cc @@ -168,7 +168,7 @@ Status GeneratorOp::FillBuffer(TensorQTable *tt) { Status GeneratorOp::operator()() { // Handshake with TaskManager to synchronize thread creation TaskManager::FindMe()->Post(); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); std::unique_ptr fetched_buffer; bool eof = false; while (!eof) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc index 32d7171c8f..f57a2f8b64 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.cc @@ -386,7 +386,7 @@ Status ImageFolderOp::LaunchThreadsAndInitOp() { RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); // The following code launch 3 threads group // 1) A thread that walks all folders and push the folder names to a util:Queue mFoldernameQueue. // 2) Workers that pull foldername from mFoldernameQueue, walk it and return the sorted images to mImagenameQueue diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc index ab0c012416..d31b67fd65 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.cc @@ -140,7 +140,7 @@ Status ManifestOp::LaunchThreadsAndInitOp() { RETURN_STATUS_UNEXPECTED("tree_ not set"); } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK( tree_->LaunchWorkers(num_workers_, std::bind(&ManifestOp::WorkerEntry, this, std::placeholders::_1))); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc index 72dee6f2e6..96675e6f6e 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc @@ -644,7 +644,7 @@ Status MindRecordOp::LaunchThreadAndInitOp() { } RETURN_IF_NOT_OK(io_blk_queues_.Register(tree_->AllTasks())); - shard_reader_wait_post_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(shard_reader_wait_post_.Register(tree_->AllTasks())); if (shard_reader_->Launch(!block_reader_) == MSRStatus::FAILED) { RETURN_STATUS_UNEXPECTED("MindRecordOp launch failed."); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc index fbf041e985..da25c45027 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.cc @@ -395,7 +395,7 @@ Status MnistOp::LaunchThreadsAndInitOp() { RETURN_STATUS_UNEXPECTED("tree_ not set"); } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MnistOp::WorkerEntry, this, std::placeholders::_1))); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->WalkAllFiles()); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc index 8ab186761e..c339e81ed1 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.cc @@ -370,7 +370,7 @@ Status TextFileOp::operator()() { // must be called after launching workers. TaskManager::FindMe()->Post(); - io_block_queue_wait_post_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); NotifyToFillIOBlockQueue(); while (!finished_reading_dataset_) { int64_t buffer_id = 0; diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc index 50c60caa86..b059d54b9a 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.cc @@ -222,7 +222,7 @@ Status TFReaderOp::operator()() { // so workers have to be kept alive until the end of the program TaskManager::FindMe()->Post(); - io_block_queue_wait_post_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); NotifyToFillIOBlockQueue(); while (!finished_reading_dataset_) { diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc index e523aa84d6..834d4c512b 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.cc @@ -231,7 +231,7 @@ Status VOCOp::LaunchThreadsAndInitOp() { RETURN_STATUS_UNEXPECTED("tree_ not set"); } RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); - wp_.Register(tree_->AllTasks()); + RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&VOCOp::WorkerEntry, this, std::placeholders::_1))); TaskManager::FindMe()->Post(); RETURN_IF_NOT_OK(this->ParseImageIds()); diff --git a/mindspore/ccsrc/dataset/util/CMakeLists.txt b/mindspore/ccsrc/dataset/util/CMakeLists.txt index ff14d772ca..9ae93618ab 100644 --- a/mindspore/ccsrc/dataset/util/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/util/CMakeLists.txt @@ -3,7 +3,6 @@ add_library(utils OBJECT circular_pool.cc memory_pool.cc cond_var.cc - semaphore.cc intrp_service.cc task.cc task_manager.cc diff --git a/mindspore/ccsrc/dataset/util/semaphore.cc b/mindspore/ccsrc/dataset/util/semaphore.cc deleted file mode 100644 index 983c387df5..0000000000 --- a/mindspore/ccsrc/dataset/util/semaphore.cc +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2019 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "dataset/util/semaphore.h" -#include "dataset/util/task_manager.h" - -namespace mindspore { -namespace dataset { -Status Semaphore::P() { - std::unique_lock lck(mutex_); - return (wait_cond_.Wait(&lck, [this]() { return value_ != 0; })); -} - -void Semaphore::V() { - std::unique_lock lck(mutex_); - ++value_; - wait_cond_.NotifyOne(); -} - -void Semaphore::Register(TaskGroup *vg) { (void)wait_cond_.Register(vg->GetIntrpService()); } - -Status Semaphore::Deregister() { return (wait_cond_.Deregister()); } - -void Semaphore::ResetIntrpState() { wait_cond_.ResetIntrpState(); } -} // namespace dataset -} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/semaphore.h b/mindspore/ccsrc/dataset/util/semaphore.h deleted file mode 100644 index 74c344f7d3..0000000000 --- a/mindspore/ccsrc/dataset/util/semaphore.h +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright 2019 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef DATASET_UTIL_SEMAPHORE_H_ -#define DATASET_UTIL_SEMAPHORE_H_ - -#include "dataset/util/cond_var.h" - -namespace mindspore { -namespace dataset { -class TaskGroup; - -class Semaphore { - public: - explicit Semaphore(int init) : value_(init) {} - - virtual ~Semaphore() {} - - Status P(); - - void V(); - - void Register(TaskGroup *vg); - - Status Deregister(); - - void ResetIntrpState(); - - private: - int value_; - - std::mutex mutex_; - CondVar wait_cond_; -}; -} // namespace dataset -} // namespace mindspore -#endif // DATASET_UTIL_SEMAPHORE_H_ diff --git a/mindspore/ccsrc/dataset/util/task_manager.cc b/mindspore/ccsrc/dataset/util/task_manager.cc index a9f509385e..06340e90ea 100644 --- a/mindspore/ccsrc/dataset/util/task_manager.cc +++ b/mindspore/ccsrc/dataset/util/task_manager.cc @@ -53,7 +53,7 @@ Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::funct LockGuard lck(&tg_lock_); this->grp_list_.insert(vg); } - (*task)->wp_.Register(vg); + RETURN_IF_NOT_OK((*task)->wp_.Register(vg)); RETURN_IF_NOT_OK((*task)->Run()); // Wait for the thread to initialize successfully. RETURN_IF_NOT_OK((*task)->Wait()); diff --git a/mindspore/ccsrc/dataset/util/wait_post.cc b/mindspore/ccsrc/dataset/util/wait_post.cc index 99ee0cb77f..204f203d9a 100644 --- a/mindspore/ccsrc/dataset/util/wait_post.cc +++ b/mindspore/ccsrc/dataset/util/wait_post.cc @@ -36,7 +36,7 @@ void WaitPost::Clear() { value_ = 0; } -void WaitPost::Register(TaskGroup *vg) { (void)wait_cond_.Register(vg->GetIntrpService()); } +Status WaitPost::Register(TaskGroup *vg) { return wait_cond_.Register(vg->GetIntrpService()); } void WaitPost::ResetIntrpState() { wait_cond_.ResetIntrpState(); } diff --git a/mindspore/ccsrc/dataset/util/wait_post.h b/mindspore/ccsrc/dataset/util/wait_post.h index bac43f7a4e..4e60995bd9 100644 --- a/mindspore/ccsrc/dataset/util/wait_post.h +++ b/mindspore/ccsrc/dataset/util/wait_post.h @@ -36,7 +36,7 @@ class WaitPost { void Clear(); - void Register(TaskGroup *vg); + Status Register(TaskGroup *vg); Status Deregister(); diff --git a/tests/ut/cpp/dataset/interrupt_test.cc b/tests/ut/cpp/dataset/interrupt_test.cc index 7816346c15..ee2018a050 100644 --- a/tests/ut/cpp/dataset/interrupt_test.cc +++ b/tests/ut/cpp/dataset/interrupt_test.cc @@ -20,7 +20,6 @@ #include "dataset/util/intrp_service.h" #include "dataset/util/task_manager.h" #include "dataset/util/queue.h" -#include "dataset/util/semaphore.h" using namespace mindspore::dataset; using mindspore::MsLogLevel::INFO; @@ -55,11 +54,12 @@ TEST_F(MindDataTestIntrpService, Test1) { TEST_F(MindDataTestIntrpService, Test2) { MS_LOG(INFO) << "Test Semaphore"; Status rc; - Semaphore sem(0); - sem.Register(&vg_); + WaitPost wp; + rc = wp.Register(&vg_); + EXPECT_TRUE(rc.IsOk()); vg_.CreateAsyncTask("Test1", [&]() -> Status { TaskManager::FindMe()->Post(); - Status rc = sem.P(); + Status rc = wp.Wait(); EXPECT_TRUE(rc.IsInterrupted()); return rc; });