diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 86a8f5eb90..de2505ab0c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -44,9 +44,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i DeviceQueueOp::~DeviceQueueOp() {} #ifdef ENABLE_GPUQUE -void ReleaseData(void *addr) { +void DeviceQueueOp::ReleaseData(void *addr) { if (addr != nullptr) { - free(addr); + pool_->Deallocate(addr); } } #endif @@ -87,6 +87,7 @@ Status DeviceQueueOp::operator()() { #endif } else if (device_type_ == DeviceType::GPU) { #ifdef ENABLE_GPUQUE + RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_)); RETURN_IF_NOT_OK(SendDataToGPU()); #endif } else if (device_type_ == DeviceType::CPU) { @@ -187,6 +188,7 @@ Status DeviceQueueOp::SendDataToGPU() { bool is_break_loop = false; bool is_open = false; uint32_t handle = INVALID_HANDLE; + auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1); std::unique_ptr current_buffer; RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); @@ -204,7 +206,7 @@ Status DeviceQueueOp::SendDataToGPU() { data_size.push_back(static_cast(curr_row[i]->SizeInBytes())); } if (!is_open) { - handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData); + handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function); if (handle == INVALID_HANDLE) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed"); } @@ -246,7 +248,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector &data_size, con BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); if (ret) { for (int i = 0; i < items.size(); i++) { - free(items[i].data_ptr_); + ReleaseData(items[i].data_ptr_); } if (ret == BlockQueueStatus_T::ERROR_INPUT) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "invalid input Data, please check it."); @@ -267,7 +269,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector &data_size, con Status DeviceQueueOp::MallocForGPUData(std::vector *items, const TensorRow &curr_row) { int i = 0; for (auto &sub_item : *items) { - sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_); + RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_)); if (sub_item.data_ptr_ == nullptr) { return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed."); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 224d36b85f..b697bae425 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -29,6 +29,7 @@ #endif #ifdef ENABLE_GPUQUE +#include "minddata/dataset/util/circular_pool.h" #include "runtime/device/gpu/gpu_buffer_mgr.h" using mindspore::device::BlockQueueStatus_T; using mindspore::device::GpuBufferMgr; @@ -162,6 +163,9 @@ class DeviceQueueOp : public PipelineOp { Status SendDataToGPU(); Status RetryPushGPUData(const std::vector &data_size, const TensorRow &curr_row, uint32_t handle); Status MallocForGPUData(std::vector *items, const TensorRow &curr_row); + void ReleaseData(void *addr); + + std::shared_ptr pool_; #endif Status SendDataToCPU();