!10697 [MD] Fix core dump problem of get_data_info

From: @xiefangqi
Reviewed-by: @heleiwang,@pandoublefeng
Signed-off-by: @pandoublefeng
pull/10697/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit 94a2f7aa1d

@ -43,7 +43,8 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
send_epoch_end_(send_epoch_end),
stop_send_(false),
total_batch_(total_batch),
create_data_info_queue_(create_data_info_queue) {
create_data_info_queue_(create_data_info_queue),
data_info_queue_ptr_(nullptr) {
#ifdef ENABLE_GPUQUE
// Get the total device num of current machine
int32_t device_count = 0;
@ -106,8 +107,15 @@ Status DeviceQueueOp::operator()() {
if (device_type_ == DeviceType::Ascend) {
#ifdef ENABLE_TDTQUE
if (create_data_info_queue_) {
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
// This place has a race condition with GetDataInfo, so the first one
// arrive here will do the initialize work.
{
std::unique_lock<std::mutex> lock(data_info_mutex_);
if (data_info_queue_ptr_ == nullptr) {
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
}
}
}
RETURN_IF_NOT_OK(SendDataToAscend());
#endif
@ -232,6 +240,15 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
if (!create_data_info_queue_) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created.");
}
// This place has a race condition with operator(), so the first one
// arrive here will do the initialize work.
{
std::unique_lock<std::mutex> lock(data_info_mutex_);
if (data_info_queue_ptr_ == nullptr) {
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
}
}
RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
return Status::OK();
}

@ -217,6 +217,7 @@ class DeviceQueueOp : public PipelineOp {
int32_t total_batch_;
bool create_data_info_queue_;
std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_;
std::mutex data_info_mutex_;
#ifdef ENABLE_TDTQUE
std::shared_ptr<TdtPlugin> tdtInstancePtr;

Loading…
Cancel
Save