@ -42,11 +42,17 @@
#include "minddata/dataset/util/status.h"
#include "minddata/mindrecord/include/shard_category.h"
#include "minddata/mindrecord/include/shard_distributed_sample.h"
#include "minddata/mindrecord/include/shard_header.h"
#include "minddata/mindrecord/include/shard_index_generator.h"
#include "minddata/mindrecord/include/shard_sample.h"
#include "minddata/mindrecord/include/shard_shuffle.h"
#include "minddata/mindrecord/include/shard_writer.h"
#include "pybind11/stl.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace dataset {
using json = nlohmann::json;
using pFunction = Status (DEPipeline::*)(const py::dict &, std::shared_ptr<DatasetOp> *, std::shared_ptr<DatasetOp> *);
static std::unordered_map<uint32_t, pFunction> g_parse_op_func_ = {
@ -355,6 +361,226 @@ Status DEPipeline::ParseShuffleOp(const py::dict &args, std::shared_ptr<DatasetO
return Status::OK();
Status DEPipeline::SaveDataset(const std::vector<std::string> &file_names, const std::string &file_type) {
Status s;
auto mr_header = std::make_shared<mindrecord::ShardHeader>();
auto mr_writer = std::make_unique<mindrecord::ShardWriter>();
std::vector<std::string> blob_fields;
uint64_t mr_schema_id = 0;
if (mindrecord::SUCCESS != mindrecord::ShardWriter::initialize(&mr_writer, file_names)) {
RETURN_STATUS_UNEXPECTED("Error: failed to initialize ShardWriter.");
TensorRow row;
std::unordered_map<std::string, int32_t> column_name_id_map =
iterator_->GetColumnNameMap(); // map of column name, id
bool first_loop = true; // build schema in first loop
do {
json row_raw_data;
std::map<std::string, std::unique_ptr<std::vector<uint8_t>>> row_bin_data;
py::gil_scoped_release gil_release;
s = iterator_->FetchNextTensorRow(&row);
if (row.empty()) break;
if (first_loop) {
json mr_json;
std::vector<std::string> index_fields;
s = FetchMetaFromTensorRow(column_name_id_map, row, &mr_json, &index_fields);
mindrecord::ShardHeader::initialize(&mr_header, mr_json, index_fields, blob_fields, mr_schema_id);
first_loop = false;
// construct data
if (!row.empty()) { // write data
s = FetchDataFromTensorRow(row, column_name_id_map, &row_raw_data, &row_bin_data);
std::shared_ptr<std::vector<uint8_t>> output_bin_data;
mr_writer->MergeBlobData(blob_fields, row_bin_data, &output_bin_data);
std::map<std::uint64_t, std::vector<json>> raw_data;
raw_data.insert(std::pair<uint64_t, std::vector<json>>(mr_schema_id, std::vector<json>{row_raw_data}));
std::vector<std::vector<uint8_t>> bin_data;
if (nullptr != output_bin_data) {
mr_writer->WriteRawData(raw_data, bin_data);
} while (!row.empty());
return Status::OK();
Status DEPipeline::FetchDataFromTensorRow(const TensorRow &row,
const std::unordered_map<std::string, int32_t> &column_name_id_map,
json *row_raw_data,
std::map<std::string, std::unique_ptr<std::vector<uint8_t>>> *row_bin_data) {
if (row_raw_data == nullptr) {
RETURN_STATUS_UNEXPECTED("error: row raw data is NULL.");
if (row_bin_data == nullptr) {
RETURN_STATUS_UNEXPECTED("error: row bin data is NULL.");
if (column_name_id_map.empty()) {
RETURN_STATUS_UNEXPECTED("Error: column not found");
Status s;
for (auto &col : column_name_id_map) {
auto idx = col.second;
auto column_name = col.first;
auto &tensor = row[idx];
auto column_type = tensor->type();
std::unique_ptr<std::vector<uint8_t>> data_ptr;
if (column_type == DataType::DE_INT8) {
std::unique_ptr<int32_t> data;
std::unique_ptr<int8_t> dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy, true);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_INT16) {
std::unique_ptr<int32_t> data;
std::unique_ptr<int16_t> dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy, true);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_UINT16) {
std::unique_ptr<int32_t> data;
std::unique_ptr<uint16_t> dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy, true);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_UINT8) {
std::unique_ptr<uint8_t> data, dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_INT32) {
std::unique_ptr<int32_t> data, dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_UINT32) {
std::unique_ptr<int64_t> data;
std::unique_ptr<uint32_t> dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy, true);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_INT64) {
std::unique_ptr<int64_t> data, dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_FLOAT32) {
std::unique_ptr<float> data, dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_FLOAT64) {
std::unique_ptr<double> data, dummy;
s = TransfromTensor(tensor->GetBuffer(), tensor->shape(), tensor->Size(), &data, &data_ptr, &dummy);
if (data != nullptr) (*row_raw_data)[column_name] = std::move(*data);
} else if (column_type == DataType::DE_STRING) {
auto buffer = tensor->GetStringsBuffer();
std::string ss(reinterpret_cast<const char *>(buffer)); // assume scalar string tensor
(*row_raw_data)[column_name] = std::move(ss);
} else {
RETURN_STATUS_UNEXPECTED("Got unexpected type when casting data.");
if (data_ptr != nullptr) {
(*row_bin_data)[column_name] = std::move(data_ptr);
return Status::OK();
template <typename T, typename S>
Status DEPipeline::TransfromTensor(const unsigned char *src, const TensorShape &shape, const int64_t num_of_elements,
std::unique_ptr<T> *data, std::unique_ptr<std::vector<uint8_t>> *data_ptr,
std::unique_ptr<S> *s, bool need_convert) {
if (nullptr == src) {
RETURN_STATUS_UNEXPECTED("Error: buffer of Tensor is NULL.");
*data_ptr = std::make_unique<std::vector<uint8_t>>(num_of_elements * sizeof(T));
if (need_convert) {
auto tmp_ptr = std::make_unique<std::vector<uint8_t>>(num_of_elements * sizeof(S));
std::copy(src, src + sizeof(S) * num_of_elements, tmp_ptr->begin());
auto s_ptr = reinterpret_cast<S *>(&(*(tmp_ptr->begin())));
auto el = std::make_unique<T>();
for (uint32_t i = 0; i < num_of_elements; ++i) {
*el = *(s_ptr + i);
auto t_ptr = reinterpret_cast<uint8_t *>(el.get());
for (uint32_t j = 0; j < sizeof(T); ++j) {
*((*data_ptr)->begin() + i * sizeof(T) + j) = *(t_ptr + j);
} else {
std::copy(src, src + sizeof(T) * num_of_elements, (*data_ptr)->begin());
if (shape.empty()) {
*data = std::make_unique<T>();
auto t_ptr = reinterpret_cast<uint8_t *>((*data).get());
for (uint32_t i = 0; i < sizeof(T); ++i) {
*(t_ptr + i) = *((*data_ptr)->begin() + i);
return Status::OK();
Status DEPipeline::FetchMetaFromTensorRow(const std::unordered_map<std::string, int32_t> &column_name_id_map,
const TensorRow &row, json *schema, std::vector<std::string> *index_fields) {
if (schema == nullptr) {
if (index_fields == nullptr) {
RETURN_STATUS_UNEXPECTED("error: index fields is NULL.");
if (column_name_id_map.empty()) {
RETURN_STATUS_UNEXPECTED("Error: column not found.");
for (auto &col : column_name_id_map) {
auto idx = col.second;
auto column_name = col.first;
auto &tensor = row[idx];
auto column_type = tensor->type();
auto column_shape = tensor->shape();
std::string mr_type;
auto shapes = column_shape.AsVector();
std::vector<int> mr_shape(shapes.begin(), shapes.end());
std::string el = column_type.ToString();
if (mindrecord::kTypesMap.find(el) == mindrecord::kTypesMap.end()) {
std::string err_msg("Error: can not support data type: " + el);
} else {
mr_type = mindrecord::kTypesMap.at(el);
if (mr_shape.empty()) {
if (mr_type == "bytes") { // map to int32 when bytes without shape.
mr_type == "int32";
(*schema)[column_name] = {{"type", mr_type}};
} else {
if (mr_type == "string") { // mindrecord can not support string with shape.
std::string err_msg("Error: mindrecord can not support multi-dimensional string tensor.");
if (mr_type == "bytes") { // ignore shape of bytes in minrecord
(*schema)[column_name] = {{"type", mr_type}};
} else {
(*schema)[column_name] = {{"type", mr_type}, {"shape", mr_shape}};
if (mr_type == "bytes" || !mr_shape.empty()) continue;
index_fields->emplace_back(column_name); // candidate of index fields
return Status::OK();
Status DEPipeline::BuildMindrecordSamplerChain(const py::handle &handle,
std::vector<std::shared_ptr<mindrecord::ShardOperator>> *operators,
int num_padded) {