|
|
@ -53,6 +53,8 @@ MindRecordOp::Builder::Builder() : build_dataset_file_({}) {
|
|
|
|
build_op_connector_queue_size_ = cfg->op_connector_size();
|
|
|
|
build_op_connector_queue_size_ = cfg->op_connector_size();
|
|
|
|
build_block_reader_ = false;
|
|
|
|
build_block_reader_ = false;
|
|
|
|
builder_num_workers_ = 0;
|
|
|
|
builder_num_workers_ = 0;
|
|
|
|
|
|
|
|
build_num_padded_ = 0;
|
|
|
|
|
|
|
|
build_sample_ = nullptr;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// The builder "build" method creates the final object.
|
|
|
|
// The builder "build" method creates the final object.
|
|
|
@ -63,24 +65,57 @@ Status MindRecordOp::Builder::Build(std::shared_ptr<MindRecordOp> *ptr) {
|
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
|
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
|
|
|
|
"Building a MindRecordOp that has not provided a file.");
|
|
|
|
"Building a MindRecordOp that has not provided a file.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
mindrecord::json sample_json;
|
|
|
|
|
|
|
|
if (build_num_padded_ > 0) {
|
|
|
|
|
|
|
|
sample_json = ToJson(build_sample_);
|
|
|
|
|
|
|
|
}
|
|
|
|
new_mind_record_op = std::make_shared<MindRecordOp>(
|
|
|
|
new_mind_record_op = std::make_shared<MindRecordOp>(
|
|
|
|
build_num_mind_record_workers_, build_rows_per_buffer_, build_dataset_file_, build_load_dataset_,
|
|
|
|
build_num_mind_record_workers_, build_rows_per_buffer_, build_dataset_file_, build_load_dataset_,
|
|
|
|
build_op_connector_queue_size_, build_columns_to_load_, build_operators_, build_block_reader_);
|
|
|
|
build_op_connector_queue_size_, build_columns_to_load_, build_operators_, build_block_reader_, build_num_padded_,
|
|
|
|
|
|
|
|
sample_json, build_sample_bytes_);
|
|
|
|
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(new_mind_record_op->Init());
|
|
|
|
RETURN_IF_NOT_OK(new_mind_record_op->Init());
|
|
|
|
|
|
|
|
|
|
|
|
*ptr = std::move(new_mind_record_op);
|
|
|
|
*ptr = std::move(new_mind_record_op);
|
|
|
|
return Status::OK();
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Status MindRecordOp::Builder::SanityCheck() const { return Status::OK(); }
|
|
|
|
Status MindRecordOp::Builder::SanityCheck() const { return Status::OK(); }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mindrecord::json MindRecordOp::Builder::ToJson(const py::handle &obj) {
|
|
|
|
|
|
|
|
if (obj.is_none()) {
|
|
|
|
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (py::isinstance<py::int_>(obj)) {
|
|
|
|
|
|
|
|
return obj.cast<int64_t>();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (py::isinstance<py::float_>(obj)) {
|
|
|
|
|
|
|
|
return obj.cast<double>();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (py::isinstance<py::str>(obj)) { // also catch py::bytes
|
|
|
|
|
|
|
|
return obj.cast<std::string>();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (py::isinstance<py::dict>(obj)) {
|
|
|
|
|
|
|
|
auto out = mindrecord::json::object();
|
|
|
|
|
|
|
|
for (const py::handle &key : obj) {
|
|
|
|
|
|
|
|
if (py::isinstance<py::bytes>(obj[key])) {
|
|
|
|
|
|
|
|
build_sample_bytes_[py::str(key).cast<std::string>()] = obj[key].cast<std::string>();
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
out[py::str(key).cast<std::string>()] = ToJson(obj[key]);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return out;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
MS_LOG(ERROR) << "Python object convert to json failed, object is: " << py::cast<std::string>(obj);
|
|
|
|
|
|
|
|
return mindrecord::json();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Constructor of the MindRecordOp.
|
|
|
|
// Constructor of the MindRecordOp.
|
|
|
|
MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buffer,
|
|
|
|
MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buffer,
|
|
|
|
std::vector<std::string> dataset_file, bool load_dataset, int32_t op_connector_queue_size,
|
|
|
|
std::vector<std::string> dataset_file, bool load_dataset, int32_t op_connector_queue_size,
|
|
|
|
const std::vector<std::string> &columns_to_load,
|
|
|
|
const std::vector<std::string> &columns_to_load,
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators, const bool &block_reader)
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators, const bool &block_reader,
|
|
|
|
|
|
|
|
int64_t num_padded, const mindrecord::json &sample_json,
|
|
|
|
|
|
|
|
const std::map<std::string, std::string> &sample_bytes)
|
|
|
|
: ParallelOp(num_mind_record_workers, op_connector_queue_size),
|
|
|
|
: ParallelOp(num_mind_record_workers, op_connector_queue_size),
|
|
|
|
rows_per_buffer_(rows_per_buffer),
|
|
|
|
rows_per_buffer_(rows_per_buffer),
|
|
|
|
dataset_file_(dataset_file),
|
|
|
|
dataset_file_(dataset_file),
|
|
|
@ -92,7 +127,10 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buf
|
|
|
|
buffers_needed_(0),
|
|
|
|
buffers_needed_(0),
|
|
|
|
buf_cnt_(0),
|
|
|
|
buf_cnt_(0),
|
|
|
|
ended_worker_(0),
|
|
|
|
ended_worker_(0),
|
|
|
|
buffer_water_mark_(0) {
|
|
|
|
buffer_water_mark_(0),
|
|
|
|
|
|
|
|
num_padded_(num_padded),
|
|
|
|
|
|
|
|
sample_json_(sample_json),
|
|
|
|
|
|
|
|
sample_bytes_(sample_bytes) {
|
|
|
|
io_blk_queues_.Init(num_workers_, op_connector_queue_size);
|
|
|
|
io_blk_queues_.Init(num_workers_, op_connector_queue_size);
|
|
|
|
if (!block_reader_) return;
|
|
|
|
if (!block_reader_) return;
|
|
|
|
for (int32_t i = 0; i < num_workers_; ++i) {
|
|
|
|
for (int32_t i = 0; i < num_workers_; ++i) {
|
|
|
@ -104,7 +142,7 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, int32_t rows_per_buf
|
|
|
|
Status MindRecordOp::Init() {
|
|
|
|
Status MindRecordOp::Init() {
|
|
|
|
shard_reader_ = std::make_unique<ShardReader>();
|
|
|
|
shard_reader_ = std::make_unique<ShardReader>();
|
|
|
|
auto rc = shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_, operators_,
|
|
|
|
auto rc = shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_, operators_,
|
|
|
|
block_reader_);
|
|
|
|
block_reader_, num_padded_);
|
|
|
|
|
|
|
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(rc == MSRStatus::SUCCESS,
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(rc == MSRStatus::SUCCESS,
|
|
|
|
"MindRecordOp init failed. Error message: " + ErrnoToMessage(rc));
|
|
|
|
"MindRecordOp init failed. Error message: " + ErrnoToMessage(rc));
|
|
|
@ -161,10 +199,6 @@ Status MindRecordOp::Init() {
|
|
|
|
column_name_id_map_[columns_to_load_[i]] = i;
|
|
|
|
column_name_id_map_[columns_to_load_[i]] = i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
num_rows_ = shard_reader_->GetNumRows();
|
|
|
|
|
|
|
|
// Compute how many buffers we would need to accomplish rowsPerBuffer
|
|
|
|
|
|
|
|
buffers_needed_ = (num_rows_ + rows_per_buffer_ - 1) / rows_per_buffer_;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Status::OK();
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -261,22 +295,32 @@ Status MindRecordOp::GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_bu
|
|
|
|
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
|
|
|
|
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
|
|
|
|
for (int32_t i = 0; i < rows_per_buffer_; ++i) {
|
|
|
|
for (int32_t i = 0; i < rows_per_buffer_; ++i) {
|
|
|
|
ShardTuple tupled_buffer;
|
|
|
|
ShardTuple tupled_buffer;
|
|
|
|
|
|
|
|
mindrecord::TaskType task_type = mindrecord::TaskType::kCommonTask;
|
|
|
|
if (block_reader_) {
|
|
|
|
if (block_reader_) {
|
|
|
|
if (i >= block_buffer_[buffer_id % num_workers_]->size()) break;
|
|
|
|
if (i >= block_buffer_[buffer_id % num_workers_]->size()) break;
|
|
|
|
tupled_buffer = block_buffer_[buffer_id % num_workers_]->at(i);
|
|
|
|
tupled_buffer = block_buffer_[buffer_id % num_workers_]->at(i);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
int32_t row_id = buffer_id * rows_per_buffer_ + i;
|
|
|
|
int32_t row_id = buffer_id * rows_per_buffer_ + i;
|
|
|
|
tupled_buffer = shard_reader_->GetNextById(row_id, worker_id);
|
|
|
|
auto rc = shard_reader_->GetNextById(row_id, worker_id);
|
|
|
|
|
|
|
|
task_type = rc.first;
|
|
|
|
|
|
|
|
tupled_buffer = rc.second;
|
|
|
|
|
|
|
|
if (task_type == mindrecord::TaskType::kPaddedTask) {
|
|
|
|
|
|
|
|
TensorRow tensor_row;
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(LoadTensorRow(&tensor_row, {}, mindrecord::json(), task_type));
|
|
|
|
|
|
|
|
tensor_table->push_back(std::move(tensor_row));
|
|
|
|
|
|
|
|
}
|
|
|
|
if (tupled_buffer.empty()) break;
|
|
|
|
if (tupled_buffer.empty()) break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (task_type == mindrecord::TaskType::kCommonTask) {
|
|
|
|
for (const auto &tupled_row : tupled_buffer) {
|
|
|
|
for (const auto &tupled_row : tupled_buffer) {
|
|
|
|
std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
|
|
|
|
std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
|
|
|
|
mindrecord::json columns_json = std::get<1>(tupled_row);
|
|
|
|
mindrecord::json columns_json = std::get<1>(tupled_row);
|
|
|
|
TensorRow tensor_row;
|
|
|
|
TensorRow tensor_row;
|
|
|
|
RETURN_IF_NOT_OK(LoadTensorRow(&tensor_row, columns_blob, columns_json));
|
|
|
|
RETURN_IF_NOT_OK(LoadTensorRow(&tensor_row, columns_blob, columns_json, task_type));
|
|
|
|
tensor_table->push_back(std::move(tensor_row));
|
|
|
|
tensor_table->push_back(std::move(tensor_row));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Replace the TensorTable in DataBuffer with the new one.
|
|
|
|
// Replace the TensorTable in DataBuffer with the new one.
|
|
|
|
(*fetched_buffer)->set_tensor_table(std::move(tensor_table));
|
|
|
|
(*fetched_buffer)->set_tensor_table(std::move(tensor_table));
|
|
|
@ -284,7 +328,7 @@ Status MindRecordOp::GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_bu
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
|
|
|
|
Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
|
|
|
|
const mindrecord::json &columns_json) {
|
|
|
|
const mindrecord::json &columns_json, const mindrecord::TaskType task_type) {
|
|
|
|
for (uint32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
|
|
|
|
for (uint32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
|
|
|
|
auto column_name = columns_to_load_[i_col];
|
|
|
|
auto column_name = columns_to_load_[i_col];
|
|
|
|
|
|
|
|
|
|
|
@ -297,12 +341,40 @@ Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint
|
|
|
|
std::vector<int64_t> column_shape;
|
|
|
|
std::vector<int64_t> column_shape;
|
|
|
|
|
|
|
|
|
|
|
|
// Get column data
|
|
|
|
// Get column data
|
|
|
|
auto has_column = shard_reader_->GetShardColumn()->GetColumnValueByName(
|
|
|
|
auto shard_column = shard_reader_->GetShardColumn();
|
|
|
|
column_name, columns_blob, columns_json, &data, &data_ptr, &n_bytes, &column_data_type, &column_data_type_size,
|
|
|
|
if (num_padded_ > 0 && task_type == mindrecord::TaskType::kPaddedTask) {
|
|
|
|
&column_shape);
|
|
|
|
auto rc =
|
|
|
|
|
|
|
|
shard_column->GetColumnTypeByName(column_name, &column_data_type, &column_data_type_size, &column_shape);
|
|
|
|
|
|
|
|
if (rc.first != MSRStatus::SUCCESS) {
|
|
|
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Failed to retrieve data type.");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (rc.second == mindrecord::ColumnInRaw) {
|
|
|
|
|
|
|
|
auto has_column = shard_column->GetColumnFromJson(column_name, sample_json_, &data_ptr, &n_bytes);
|
|
|
|
|
|
|
|
if (has_column == MSRStatus::FAILED) {
|
|
|
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Failed to retrieve raw data from padding sample.");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else if (rc.second == mindrecord::ColumnInBlob) {
|
|
|
|
|
|
|
|
if (sample_bytes_.find(column_name) == sample_bytes_.end()) {
|
|
|
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Failed to retrieve blob data from padding sample.");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string ss(sample_bytes_[column_name]);
|
|
|
|
|
|
|
|
n_bytes = ss.size();
|
|
|
|
|
|
|
|
data_ptr = std::make_unique<unsigned char[]>(n_bytes);
|
|
|
|
|
|
|
|
std::copy(ss.begin(), ss.end(), data_ptr.get());
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Retrieved data type is unknown.");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (data == nullptr) {
|
|
|
|
|
|
|
|
data = reinterpret_cast<const unsigned char *>(data_ptr.get());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
auto has_column =
|
|
|
|
|
|
|
|
shard_column->GetColumnValueByName(column_name, columns_blob, columns_json, &data, &data_ptr, &n_bytes,
|
|
|
|
|
|
|
|
&column_data_type, &column_data_type_size, &column_shape);
|
|
|
|
if (has_column == MSRStatus::FAILED) {
|
|
|
|
if (has_column == MSRStatus::FAILED) {
|
|
|
|
RETURN_STATUS_UNEXPECTED("Failed to retrieve data from mindrecord reader.");
|
|
|
|
RETURN_STATUS_UNEXPECTED("Failed to retrieve data from mindrecord reader.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
std::shared_ptr<Tensor> tensor;
|
|
|
|
std::shared_ptr<Tensor> tensor;
|
|
|
|
const ColDescriptor &column = data_schema_->column(i_col);
|
|
|
|
const ColDescriptor &column = data_schema_->column(i_col);
|
|
|
@ -334,7 +406,8 @@ Status MindRecordOp::FetchBlockBuffer(const int32_t &buffer_id) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (int32_t i = 0; i < rows_per_buffer_; i++) {
|
|
|
|
for (int32_t i = 0; i < rows_per_buffer_; i++) {
|
|
|
|
// Block reader does NOT care about argument
|
|
|
|
// Block reader does NOT care about argument
|
|
|
|
ShardTuple tuple_buffer = shard_reader_->GetNextById(i, i);
|
|
|
|
auto rc = shard_reader_->GetNextById(i, i);
|
|
|
|
|
|
|
|
ShardTuple tuple_buffer = rc.second;
|
|
|
|
if (tuple_buffer.empty()) break;
|
|
|
|
if (tuple_buffer.empty()) break;
|
|
|
|
block_buffer_[buffer_id % num_workers_]->push_back(std::move(tuple_buffer));
|
|
|
|
block_buffer_[buffer_id % num_workers_]->push_back(std::move(tuple_buffer));
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -348,11 +421,8 @@ Status MindRecordOp::FetchBlockBuffer(const int32_t &buffer_id) {
|
|
|
|
Status MindRecordOp::operator()() {
|
|
|
|
Status MindRecordOp::operator()() {
|
|
|
|
RETURN_IF_NOT_OK(LaunchThreadAndInitOp());
|
|
|
|
RETURN_IF_NOT_OK(LaunchThreadAndInitOp());
|
|
|
|
num_rows_ = shard_reader_->GetNumRows();
|
|
|
|
num_rows_ = shard_reader_->GetNumRows();
|
|
|
|
|
|
|
|
// Compute how many buffers we would need to accomplish rowsPerBuffer
|
|
|
|
buffers_needed_ = num_rows_ / rows_per_buffer_;
|
|
|
|
buffers_needed_ = (num_rows_ + rows_per_buffer_ - 1) / rows_per_buffer_;
|
|
|
|
if (num_rows_ % rows_per_buffer_ != 0) {
|
|
|
|
|
|
|
|
buffers_needed_++;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while (true) { // each iterator is 1 epoch
|
|
|
|
while (true) { // each iterator is 1 epoch
|
|
|
|
for (int32_t i = 0; i < buffers_needed_; ++i) {
|
|
|
|
for (int32_t i = 0; i < buffers_needed_; ++i) {
|
|
|
@ -417,9 +487,9 @@ Status MindRecordOp::LaunchThreadAndInitOp() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
|
|
|
|
Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
|
|
|
|
const std::shared_ptr<ShardOperator> &op, int64_t *count) {
|
|
|
|
const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
|
|
|
|
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
|
|
|
|
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
|
|
|
|
MSRStatus rc = shard_reader->CountTotalRows(dataset_path, load_dataset, op, count);
|
|
|
|
MSRStatus rc = shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded);
|
|
|
|
if (rc == MSRStatus::FAILED) {
|
|
|
|
if (rc == MSRStatus::FAILED) {
|
|
|
|
RETURN_STATUS_UNEXPECTED("MindRecordOp count total rows failed.");
|
|
|
|
RETURN_STATUS_UNEXPECTED("MindRecordOp count total rows failed.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|