From 0827ab329d37b1f3e0d02364a1599532aa26fc56 Mon Sep 17 00:00:00 2001 From: anzhengqi Date: Thu, 17 Sep 2020 22:08:11 +0800 Subject: [PATCH] fix stack problem when raise exception in sink mode --- .../dataset/engine/datasetops/device_queue_op.cc | 8 ++++++-- .../dataset/engine/datasetops/device_queue_op.h | 5 +++++ .../ccsrc/minddata/dataset/engine/execution_tree.cc | 11 ++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index a18c9d3e7e..38efaaf5d9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -39,7 +39,11 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i device_id_(device_id), prefetch_size_(prefetch_size), send_epoch_end_(send_epoch_end), - stop_send_(false) {} + stop_send_(false) { +#ifdef ENABLE_TDTQUE + ascend_keep_waiting_ = true; +#endif +} DeviceQueueOp::~DeviceQueueOp() {} @@ -120,7 +124,7 @@ Status DeviceQueueOp::SendDataToAscend() { TensorRow currRow; for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); - while (stop_send_) { + while (stop_send_ && ascend_keep_waiting_) { MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal..."; std::this_thread::sleep_for(std::chrono::microseconds(100)); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 6b84d60b16..dc24380f0d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -128,6 +128,10 @@ class DeviceQueueOp : public PipelineOp { stop_send_ = false; } +#ifdef ENABLE_TDTQUE + void StopWaiting() { ascend_keep_waiting_ = false; } +#endif + // Name: Print() // Description: A function that prints info about the node void Print(std::ostream &out, // In: The output stream to print to @@ -159,6 +163,7 @@ class DeviceQueueOp : public PipelineOp { private: #ifdef ENABLE_TDTQUE Status SendDataToAscend(); + bool ascend_keep_waiting_; #endif #ifdef ENABLE_GPUQUE diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index 6aefe23986..d2eba5ff64 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -18,6 +18,7 @@ #include #include "minddata/dataset/engine/datasetops/dataset_op.h" #include "minddata/dataset/engine/datasetops/shuffle_op.h" +#include "minddata/dataset/engine/datasetops/device_queue_op.h" #include "minddata/dataset/util/task_manager.h" #include "minddata/dataset/engine/opt/pass.h" #include "minddata/dataset/engine/opt/pre/removal_pass.h" @@ -42,7 +43,15 @@ ExecutionTree::ExecutionTree() : id_count_(0) { } // Destructor -ExecutionTree::~ExecutionTree() { (void)tg_->ServiceStop(); } +ExecutionTree::~ExecutionTree() { +#ifdef ENABLE_TDTQUE + DeviceQueueOp *op = dynamic_cast(root_.get()); + if (op != nullptr) { + op->StopWaiting(); + } +#endif + (void)tg_->ServiceStop(); +} // Associates a DatasetOp with this tree. This assigns a valid node id to the operator and // provides it with a link to the tree. A node cannot form any relationships (parent/child) with