|
|
|
@ -93,21 +93,18 @@ Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status DeviceQueueOp::CheckExceptions(const std::unique_ptr<DataBuffer> &buffer) const {
|
|
|
|
|
// this method checks if the buffer meets the conditions to be sent to TDT
|
|
|
|
|
if (buffer->NumRows() != 0) {
|
|
|
|
|
TensorRow row;
|
|
|
|
|
buffer->GetRow(0, &row);
|
|
|
|
|
for (const auto &item : row) {
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
|
|
|
|
|
}
|
|
|
|
|
Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
|
|
|
|
|
// this method checks if the row meets the conditions to be sent to TDT
|
|
|
|
|
for (const auto &item : row) {
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status DeviceQueueOp::operator()() {
|
|
|
|
|
TaskManager::FindMe()->Post();
|
|
|
|
|
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
|
|
|
|
|
|
|
|
|
|
#ifdef ENABLE_DUMP_IR
|
|
|
|
|
if (md_channel_info_ == nullptr) {
|
|
|
|
@ -163,43 +160,39 @@ Status DeviceQueueOp::SendDataToAscend() {
|
|
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
|
|
|
|
md_channel_info_->RecordPreprocessBatch(0);
|
|
|
|
|
#endif
|
|
|
|
|
std::unique_ptr<DataBuffer> current_buffer;
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
|
|
|
|
|
while (!current_buffer->eof() && !is_break_loop) {
|
|
|
|
|
while (!current_buffer->eoe() && !is_break_loop) {
|
|
|
|
|
RETURN_IF_NOT_OK(CheckExceptions(current_buffer));
|
|
|
|
|
TensorRow currRow;
|
|
|
|
|
for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) {
|
|
|
|
|
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow));
|
|
|
|
|
WaitContinueSignal();
|
|
|
|
|
TensorRow curr_row;
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
|
|
|
|
while (!curr_row.eof() && !is_break_loop) {
|
|
|
|
|
while (!curr_row.eoe() && !is_break_loop) {
|
|
|
|
|
RETURN_IF_NOT_OK(CheckExceptions(curr_row));
|
|
|
|
|
WaitContinueSignal();
|
|
|
|
|
#ifdef ENABLE_DUMP_IR
|
|
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
|
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch);
|
|
|
|
|
md_channel_info_->RecordPushStartTime();
|
|
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
|
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch);
|
|
|
|
|
md_channel_info_->RecordPushStartTime();
|
|
|
|
|
#endif
|
|
|
|
|
RETURN_IF_NOT_OK(SendRowToTdt(currRow, isProfilingEnable, &tdt_cost));
|
|
|
|
|
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
|
|
|
|
|
connector_capacity, connector_size);
|
|
|
|
|
send_batch++;
|
|
|
|
|
RETURN_IF_NOT_OK(SendRowToTdt(curr_row, isProfilingEnable, &tdt_cost));
|
|
|
|
|
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
|
|
|
|
|
connector_capacity, connector_size);
|
|
|
|
|
send_batch++;
|
|
|
|
|
#ifdef ENABLE_DUMP_IR
|
|
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
|
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch);
|
|
|
|
|
md_channel_info_->RecordPushEndTime();
|
|
|
|
|
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
|
|
|
|
|
md_channel_info_->RecordPreprocessBatch(send_batch);
|
|
|
|
|
md_channel_info_->RecordPushEndTime();
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (total_batch_ > 0 && send_batch >= total_batch_) {
|
|
|
|
|
is_break_loop = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (total_batch_ > 0 && send_batch >= total_batch_) {
|
|
|
|
|
is_break_loop = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isProfilingEnable) {
|
|
|
|
|
connector_size = ChildOpConnectorSize();
|
|
|
|
|
connector_capacity = ChildOpConnectorCapacity();
|
|
|
|
|
}
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
|
|
|
|
}
|
|
|
|
|
if (current_buffer->eoe() && send_epoch_end_) {
|
|
|
|
|
if (curr_row.eoe() && send_epoch_end_) {
|
|
|
|
|
TensorRow currRow;
|
|
|
|
|
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost,
|
|
|
|
|
ACL_TENSOR_DATA_END_OF_SEQUENCE);
|
|
|
|
@ -219,7 +212,7 @@ Status DeviceQueueOp::SendDataToAscend() {
|
|
|
|
|
connector_capacity = ChildOpConnectorCapacity();
|
|
|
|
|
tree_->SetEpochEnd();
|
|
|
|
|
}
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// now we use this flag to judge whether exception raised.
|
|
|
|
@ -444,27 +437,23 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
// Every thread use cuda api should SetThreadDevice
|
|
|
|
|
RETURN_IF_NOT_OK(SetThreadDevice());
|
|
|
|
|
TaskManager::FindMe()->Post();
|
|
|
|
|
std::unique_ptr<DataBuffer> current_buffer;
|
|
|
|
|
TensorRow current_row;
|
|
|
|
|
uint32_t batch_num = 0;
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_buffer));
|
|
|
|
|
while (!current_buffer->quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
TensorRow curr_row;
|
|
|
|
|
for (int row_id = 0; row_id < current_buffer->NumRows() && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) {
|
|
|
|
|
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row));
|
|
|
|
|
std::vector<device::DataItemGpu> items;
|
|
|
|
|
for (int i = 0; i < curr_row.size(); i++) {
|
|
|
|
|
device::DataItemGpu data_item;
|
|
|
|
|
data_item.data_len_ = static_cast<size_t>(curr_row[i]->SizeInBytes());
|
|
|
|
|
data_item.data_ptr_ = nullptr;
|
|
|
|
|
data_item.worker_id_ = worker_id;
|
|
|
|
|
items.push_back(data_item);
|
|
|
|
|
}
|
|
|
|
|
RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row, worker_id));
|
|
|
|
|
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
|
|
|
|
|
batch_num++;
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
|
|
|
|
|
while (!current_row.quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
std::vector<device::DataItemGpu> items;
|
|
|
|
|
for (int i = 0; i < current_row.size(); i++) {
|
|
|
|
|
device::DataItemGpu data_item;
|
|
|
|
|
data_item.data_len_ = static_cast<size_t>(current_row[i]->SizeInBytes());
|
|
|
|
|
data_item.data_ptr_ = nullptr;
|
|
|
|
|
data_item.worker_id_ = worker_id;
|
|
|
|
|
items.push_back(data_item);
|
|
|
|
|
}
|
|
|
|
|
RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
|
|
|
|
|
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
|
|
|
|
|
batch_num++;
|
|
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch.";
|
|
|
|
@ -477,31 +466,31 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
Status DeviceQueueOp::SendDataToGPU() {
|
|
|
|
|
RETURN_IF_NOT_OK(LaunchParallelCopyThread());
|
|
|
|
|
MS_LOG(INFO) << "Device queue, sending data to GPU.";
|
|
|
|
|
std::unique_ptr<DataBuffer> current_buffer;
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
TensorRow current_row;
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
|
|
|
|
|
int64_t num_buf = 0;
|
|
|
|
|
bool is_break_loop = false;
|
|
|
|
|
while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
RETURN_IF_NOT_OK(CheckExceptions(current_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_buffer)));
|
|
|
|
|
while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
while (!current_row.eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
RETURN_IF_NOT_OK(CheckExceptions(current_row));
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
|
|
|
|
|
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
|
|
|
|
|
} else {
|
|
|
|
|
is_break_loop = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
|
|
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer));
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
|
|
|
|
|
} else {
|
|
|
|
|
is_break_loop = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (uint32_t index = 0; index < num_workers_; index++) {
|
|
|
|
|
auto quit = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagQuit);
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit)));
|
|
|
|
|
TensorRow quit_flag(TensorRow::kFlagQuit);
|
|
|
|
|
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch.";
|
|
|
|
@ -537,10 +526,9 @@ Status DeviceQueueOp::SendDataToCPU() {
|
|
|
|
|
MS_LOG(INFO) << "Device queue, sending data to CPU.";
|
|
|
|
|
int64_t total_batch = 0;
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<ChildIterator> child_iterator = std::make_unique<ChildIterator>(this, 0, 0);
|
|
|
|
|
while (!(child_iterator->eof_handled())) {
|
|
|
|
|
while (!(child_iterator_->eof_handled())) {
|
|
|
|
|
TensorRow curr_row;
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&curr_row));
|
|
|
|
|
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
|
|
|
|
|
|
|
|
|
|
if (!curr_row.empty()) {
|
|
|
|
|
for (auto &tensor : curr_row) {
|
|
|
|
|