|
|
|
@ -44,9 +44,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i
|
|
|
|
|
DeviceQueueOp::~DeviceQueueOp() {}
|
|
|
|
|
|
|
|
|
|
#ifdef ENABLE_GPUQUE
|
|
|
|
|
void ReleaseData(void *addr) {
|
|
|
|
|
void DeviceQueueOp::ReleaseData(void *addr) {
|
|
|
|
|
if (addr != nullptr) {
|
|
|
|
|
free(addr);
|
|
|
|
|
pool_->Deallocate(addr);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
@ -87,6 +87,7 @@ Status DeviceQueueOp::operator()() {
|
|
|
|
|
#endif
|
|
|
|
|
} else if (device_type_ == DeviceType::GPU) {
|
|
|
|
|
#ifdef ENABLE_GPUQUE
|
|
|
|
|
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_));
|
|
|
|
|
RETURN_IF_NOT_OK(SendDataToGPU());
|
|
|
|
|
#endif
|
|
|
|
|
} else if (device_type_ == DeviceType::CPU) {
|
|
|
|
@ -187,6 +188,7 @@ Status DeviceQueueOp::SendDataToGPU() {
|
|
|
|
|
bool is_break_loop = false;
|
|
|
|
|
bool is_open = false;
|
|
|
|
|
uint32_t handle = INVALID_HANDLE;
|
|
|
|
|
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1);
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<DataBuffer> current_buffer;
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
@ -204,7 +206,7 @@ Status DeviceQueueOp::SendDataToGPU() {
|
|
|
|
|
data_size.push_back(static_cast<size_t>(curr_row[i]->SizeInBytes()));
|
|
|
|
|
}
|
|
|
|
|
if (!is_open) {
|
|
|
|
|
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData);
|
|
|
|
|
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
|
|
|
|
|
if (handle == INVALID_HANDLE) {
|
|
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed");
|
|
|
|
|
}
|
|
|
|
@ -246,7 +248,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con
|
|
|
|
|
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
|
|
|
|
|
if (ret) {
|
|
|
|
|
for (int i = 0; i < items.size(); i++) {
|
|
|
|
|
free(items[i].data_ptr_);
|
|
|
|
|
ReleaseData(items[i].data_ptr_);
|
|
|
|
|
}
|
|
|
|
|
if (ret == BlockQueueStatus_T::ERROR_INPUT) {
|
|
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "invalid input Data, please check it.");
|
|
|
|
@ -267,7 +269,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con
|
|
|
|
|
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row) {
|
|
|
|
|
int i = 0;
|
|
|
|
|
for (auto &sub_item : *items) {
|
|
|
|
|
sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_);
|
|
|
|
|
RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_));
|
|
|
|
|
if (sub_item.data_ptr_ == nullptr) {
|
|
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed.");
|
|
|
|
|
}
|
|
|
|
|