|
|
|
@ -28,7 +28,6 @@
|
|
|
|
|
#include "dataset/engine/datasetops/dataset_op.h"
|
|
|
|
|
#include "dataset/engine/db_connector.h"
|
|
|
|
|
#include "dataset/engine/execution_tree.h"
|
|
|
|
|
#include "dataset/util/make_unique.h"
|
|
|
|
|
#include "utils/log_adapter.h"
|
|
|
|
|
|
|
|
|
|
namespace mindspore {
|
|
|
|
@ -94,19 +93,19 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buf
|
|
|
|
|
io_blk_queues_.Init(num_workers_, op_connector_queue_size);
|
|
|
|
|
if (!block_reader_) return;
|
|
|
|
|
for (int32_t i = 0; i < num_workers_; ++i) {
|
|
|
|
|
block_buffer_.emplace_back(make_unique<std::vector<ShardTuple>>(std::vector<ShardTuple>{}));
|
|
|
|
|
block_buffer_.emplace_back(std::make_unique<std::vector<ShardTuple>>(std::vector<ShardTuple>{}));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Private helper method to encapsulate some common construction/reset tasks
|
|
|
|
|
Status MindRecordOp::Init() {
|
|
|
|
|
shard_reader_ = mindspore::make_unique<ShardReader>();
|
|
|
|
|
shard_reader_ = std::make_unique<ShardReader>();
|
|
|
|
|
auto rc = shard_reader_->Open(dataset_file_, num_mind_record_workers_, columns_to_load_, operators_, block_reader_);
|
|
|
|
|
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(rc != MSRStatus::FAILED,
|
|
|
|
|
"MindRecordOp init failed. Error message: " + ErrnoToMessage(rc));
|
|
|
|
|
|
|
|
|
|
data_schema_ = mindspore::make_unique<DataSchema>();
|
|
|
|
|
data_schema_ = std::make_unique<DataSchema>();
|
|
|
|
|
|
|
|
|
|
std::vector<std::shared_ptr<Schema>> schema_vec = shard_reader_->get_shard_header()->get_schemas();
|
|
|
|
|
// check whether schema exists, if so use the first one
|
|
|
|
@ -143,7 +142,7 @@ Status MindRecordOp::Init() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!load_all_cols) {
|
|
|
|
|
std::unique_ptr<DataSchema> tmp_schema = make_unique<DataSchema>();
|
|
|
|
|
std::unique_ptr<DataSchema> tmp_schema = std::make_unique<DataSchema>();
|
|
|
|
|
for (std::string colname : columns_to_load_) {
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(colname_to_ind.find(colname) != colname_to_ind.end(), colname + ": doesn't exist");
|
|
|
|
|
RETURN_IF_NOT_OK(tmp_schema->AddColumn(data_schema_->column(colname_to_ind[colname])));
|
|
|
|
@ -297,7 +296,7 @@ Status MindRecordOp::LoadFloat(TensorShape *new_shape, std::unique_ptr<T[]> *arr
|
|
|
|
|
RETURN_IF_NOT_OK(GetFloat(&value, columns_json[column_name], use_double));
|
|
|
|
|
|
|
|
|
|
*new_shape = TensorShape::CreateScalar();
|
|
|
|
|
*array_data = mindspore::make_unique<T[]>(1);
|
|
|
|
|
*array_data = std::make_unique<T[]>(1);
|
|
|
|
|
(*array_data)[0] = value;
|
|
|
|
|
} else {
|
|
|
|
|
if (column.hasShape()) {
|
|
|
|
@ -308,7 +307,7 @@ Status MindRecordOp::LoadFloat(TensorShape *new_shape, std::unique_ptr<T[]> *arr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int idx = 0;
|
|
|
|
|
*array_data = mindspore::make_unique<T[]>(new_shape->NumOfElements());
|
|
|
|
|
*array_data = std::make_unique<T[]>(new_shape->NumOfElements());
|
|
|
|
|
for (auto &element : columns_json[column_name]) {
|
|
|
|
|
T value = 0;
|
|
|
|
|
RETURN_IF_NOT_OK(GetFloat(&value, element, use_double));
|
|
|
|
@ -349,7 +348,7 @@ Status MindRecordOp::LoadInt(TensorShape *new_shape, std::unique_ptr<T[]> *array
|
|
|
|
|
RETURN_IF_NOT_OK(GetInt(&value, columns_json[column_name]));
|
|
|
|
|
|
|
|
|
|
*new_shape = TensorShape::CreateScalar();
|
|
|
|
|
*array_data = mindspore::make_unique<T[]>(1);
|
|
|
|
|
*array_data = std::make_unique<T[]>(1);
|
|
|
|
|
(*array_data)[0] = value;
|
|
|
|
|
} else {
|
|
|
|
|
if (column.hasShape()) {
|
|
|
|
@ -360,7 +359,7 @@ Status MindRecordOp::LoadInt(TensorShape *new_shape, std::unique_ptr<T[]> *array
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int idx = 0;
|
|
|
|
|
*array_data = mindspore::make_unique<T[]>(new_shape->NumOfElements());
|
|
|
|
|
*array_data = std::make_unique<T[]>(new_shape->NumOfElements());
|
|
|
|
|
for (auto &element : columns_json[column_name]) {
|
|
|
|
|
T value = 0;
|
|
|
|
|
RETURN_IF_NOT_OK(GetInt(&value, element));
|
|
|
|
@ -430,12 +429,14 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
RETURN_IF_NOT_OK(io_blk_queues_[worker_id]->PopFront(&io_block));
|
|
|
|
|
while (io_block != nullptr) {
|
|
|
|
|
if (io_block->eoe() == true) {
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))));
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
out_connector_->Add(worker_id, std::move(std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))));
|
|
|
|
|
RETURN_IF_NOT_OK(io_blk_queues_[worker_id]->PopFront(&io_block));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (io_block->eof() == true) {
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))));
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
out_connector_->Add(worker_id, std::move(std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))));
|
|
|
|
|
RETURN_IF_NOT_OK(io_blk_queues_[worker_id]->PopFront(&io_block));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -485,9 +486,9 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
|
|
|
|
|
Status MindRecordOp::GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_buffer, int64_t buffer_id,
|
|
|
|
|
int32_t worker_id) {
|
|
|
|
|
*fetched_buffer = mindspore::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
|
|
|
|
|
*fetched_buffer = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
|
|
|
|
|
(*fetched_buffer)->set_column_name_map(column_name_mapping_);
|
|
|
|
|
std::unique_ptr<TensorQTable> tensor_table = mindspore::make_unique<TensorQTable>();
|
|
|
|
|
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
|
|
|
|
|
for (int32_t i = 0; i < rows_per_buffer_; ++i) {
|
|
|
|
|
ShardTuple tupled_buffer;
|
|
|
|
|
if (block_reader_) {
|
|
|
|
@ -596,22 +597,22 @@ Status MindRecordOp::operator()() {
|
|
|
|
|
for (int32_t i = 0; i < buffers_needed_; ++i) {
|
|
|
|
|
if (block_reader_) RETURN_IF_NOT_OK(FetchBlockBuffer(i));
|
|
|
|
|
std::vector<int64_t> keys(1, i);
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
io_blk_queues_[buf_cnt_++ % num_workers_]->Add(make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
|
|
|
|
|
RETURN_IF_NOT_OK(io_blk_queues_[buf_cnt_++ % num_workers_]->Add(
|
|
|
|
|
std::make_unique<IOBlock>(IOBlock(keys, IOBlock::kDeIoBlockNone))));
|
|
|
|
|
}
|
|
|
|
|
if (!BitTest(op_ctrl_flags_, kDeOpRepeated) || BitTest(op_ctrl_flags_, kDeOpLastRepeat)) {
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEof)));
|
|
|
|
|
for (int32_t i = 0; i < num_workers_; i++) {
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
io_blk_queues_[i]->Add(std::move(make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone))));
|
|
|
|
|
RETURN_IF_NOT_OK(io_blk_queues_[i]->Add(
|
|
|
|
|
std::move(std::make_unique<IOBlock>(std::vector<int64_t>(), IOBlock::kDeIoBlockNone))));
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
} else { // not the last repeat. Acquire lock, sleeps master thread, wait for the wake-up from reset
|
|
|
|
|
RETURN_IF_NOT_OK(
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
|
|
|
|
|
io_blk_queues_[(buf_cnt_++) % num_workers_]->Add(std::make_unique<IOBlock>(IOBlock::kDeIoBlockFlagEoe)));
|
|
|
|
|
|
|
|
|
|
// reset our buffer count and go to loop again.
|
|
|
|
|
RETURN_IF_NOT_OK(shard_reader_wait_post_.Wait());
|
|
|
|
@ -655,7 +656,7 @@ Status MindRecordOp::LaunchThreadAndInitOp() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status MindRecordOp::CountTotalRows(const std::string dataset_path, int64_t *count) {
|
|
|
|
|
std::unique_ptr<ShardReader> shard_reader = mindspore::make_unique<ShardReader>();
|
|
|
|
|
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
|
|
|
|
|
MSRStatus rc = shard_reader->CountTotalRows(dataset_path, count);
|
|
|
|
|
if (rc == MSRStatus::FAILED) {
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("MindRecordOp count total rows failed.");
|
|
|
|
|