|
|
@ -47,20 +47,55 @@ ShardReader::ShardReader() {
|
|
|
|
block_reader_ = false;
|
|
|
|
block_reader_ = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::Init(const std::string &file_path) {
|
|
|
|
std::pair<MSRStatus, std::vector<std::string>> ShardReader::GetMeta(const std::string &file_path, json &meta_data) {
|
|
|
|
if (!IsLegalFile(file_path)) {
|
|
|
|
if (!IsLegalFile(file_path)) {
|
|
|
|
|
|
|
|
return {FAILED, {}};
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
auto ret = ShardHeader::BuildSingleHeader(file_path);
|
|
|
|
|
|
|
|
if (ret.first != SUCCESS) {
|
|
|
|
|
|
|
|
return {FAILED, {}};
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
auto header = ret.second;
|
|
|
|
|
|
|
|
meta_data = {{"header_size", header["header_size"]}, {"page_size", header["page_size"]},
|
|
|
|
|
|
|
|
{"version", header["version"]}, {"index_fields", header["index_fields"]},
|
|
|
|
|
|
|
|
{"schema", header["schema"]}, {"blob_fields", header["blob_fields"]}};
|
|
|
|
|
|
|
|
return {SUCCESS, header["shard_addresses"]};
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::Init(const std::vector<std::string> &file_paths, bool load_dataset) {
|
|
|
|
|
|
|
|
std::string file_path = file_paths[0];
|
|
|
|
|
|
|
|
json first_meta_data = json();
|
|
|
|
|
|
|
|
auto ret = GetMeta(file_path, first_meta_data);
|
|
|
|
|
|
|
|
if (ret.first != SUCCESS) {
|
|
|
|
return FAILED;
|
|
|
|
return FAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ShardHeader sh = ShardHeader();
|
|
|
|
if (file_paths.size() == 1 && load_dataset == true) {
|
|
|
|
if (sh.Build(file_path) == FAILED) {
|
|
|
|
auto ret2 = GetParentDir(file_path);
|
|
|
|
|
|
|
|
if (SUCCESS != ret2.first) {
|
|
|
|
|
|
|
|
return FAILED;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<std::string> real_addresses;
|
|
|
|
|
|
|
|
for (const auto &path : ret.second) {
|
|
|
|
|
|
|
|
std::string abs_path = ret2.second + string(path);
|
|
|
|
|
|
|
|
real_addresses.emplace_back(abs_path);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
file_paths_ = real_addresses;
|
|
|
|
|
|
|
|
} else if (file_paths.size() >= 1 && load_dataset == false) {
|
|
|
|
|
|
|
|
file_paths_ = file_paths;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
MS_LOG(ERROR) << "Error in parameter file_path or load_dataset.";
|
|
|
|
return FAILED;
|
|
|
|
return FAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
shard_header_ = std::make_shared<ShardHeader>(sh);
|
|
|
|
|
|
|
|
header_size_ = shard_header_->GetHeaderSize();
|
|
|
|
|
|
|
|
page_size_ = shard_header_->GetPageSize();
|
|
|
|
|
|
|
|
file_paths_ = shard_header_->GetShardAddresses();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (const auto &file : file_paths_) {
|
|
|
|
for (const auto &file : file_paths_) {
|
|
|
|
|
|
|
|
json meta_data = json();
|
|
|
|
|
|
|
|
auto ret1 = GetMeta(file, meta_data);
|
|
|
|
|
|
|
|
if (ret1.first != SUCCESS) {
|
|
|
|
|
|
|
|
return FAILED;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (meta_data != first_meta_data) {
|
|
|
|
|
|
|
|
MS_LOG(ERROR) << "Mindrecord files meta information is different.";
|
|
|
|
|
|
|
|
return FAILED;
|
|
|
|
|
|
|
|
}
|
|
|
|
sqlite3 *db = nullptr;
|
|
|
|
sqlite3 *db = nullptr;
|
|
|
|
// sqlite3_open create a database if not found, use sqlite3_open_v2 instead of it
|
|
|
|
// sqlite3_open create a database if not found, use sqlite3_open_v2 instead of it
|
|
|
|
int rc = sqlite3_open_v2(common::SafeCStr(file + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
|
|
|
|
int rc = sqlite3_open_v2(common::SafeCStr(file + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
|
|
|
@ -91,7 +126,13 @@ MSRStatus ShardReader::Init(const std::string &file_path) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
database_paths_.push_back(db);
|
|
|
|
database_paths_.push_back(db);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ShardHeader sh = ShardHeader();
|
|
|
|
|
|
|
|
if (sh.BuildDataset(file_paths_, load_dataset) == FAILED) {
|
|
|
|
|
|
|
|
return FAILED;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
shard_header_ = std::make_shared<ShardHeader>(sh);
|
|
|
|
|
|
|
|
header_size_ = shard_header_->GetHeaderSize();
|
|
|
|
|
|
|
|
page_size_ = shard_header_->GetPageSize();
|
|
|
|
num_rows_ = 0;
|
|
|
|
num_rows_ = 0;
|
|
|
|
auto row_group_summary = ReadRowGroupSummary();
|
|
|
|
auto row_group_summary = ReadRowGroupSummary();
|
|
|
|
for (const auto &rg : row_group_summary) {
|
|
|
|
for (const auto &rg : row_group_summary) {
|
|
|
@ -248,7 +289,6 @@ MSRStatus ShardReader::ConvertLabelToJson(const std::vector<std::vector<std::str
|
|
|
|
fs->close();
|
|
|
|
fs->close();
|
|
|
|
return FAILED;
|
|
|
|
return FAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
json label_json = json::from_msgpack(label_raw);
|
|
|
|
json label_json = json::from_msgpack(label_raw);
|
|
|
|
json tmp;
|
|
|
|
json tmp;
|
|
|
|
if (!columns.empty()) {
|
|
|
|
if (!columns.empty()) {
|
|
|
@ -713,15 +753,9 @@ MSRStatus ShardReader::Finish() {
|
|
|
|
return SUCCESS;
|
|
|
|
return SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int64_t ShardReader::GetNumClasses(const std::string &file_path, const std::string &category_field) {
|
|
|
|
int64_t ShardReader::GetNumClasses(const std::string &category_field) {
|
|
|
|
ShardHeader sh = ShardHeader();
|
|
|
|
auto shard_count = file_paths_.size();
|
|
|
|
if (sh.Build(file_path) == FAILED) {
|
|
|
|
auto index_fields = shard_header_->GetFields();
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
auto header = std::make_shared<ShardHeader>(sh);
|
|
|
|
|
|
|
|
auto file_paths = header->GetShardAddresses();
|
|
|
|
|
|
|
|
auto shard_count = file_paths.size();
|
|
|
|
|
|
|
|
auto index_fields = header->GetFields();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
std::map<std::string, int64_t> map_schema_id_fields;
|
|
|
|
std::map<std::string, int64_t> map_schema_id_fields;
|
|
|
|
for (auto &field : index_fields) {
|
|
|
|
for (auto &field : index_fields) {
|
|
|
@ -742,7 +776,7 @@ int64_t ShardReader::GetNumClasses(const std::string &file_path, const std::stri
|
|
|
|
std::set<std::string> categories;
|
|
|
|
std::set<std::string> categories;
|
|
|
|
for (int x = 0; x < shard_count; x++) {
|
|
|
|
for (int x = 0; x < shard_count; x++) {
|
|
|
|
sqlite3 *db = nullptr;
|
|
|
|
sqlite3 *db = nullptr;
|
|
|
|
int rc = sqlite3_open_v2(common::SafeCStr(file_paths[x] + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
|
|
|
|
int rc = sqlite3_open_v2(common::SafeCStr(file_paths_[x] + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
|
|
|
|
if (SQLITE_OK != rc) {
|
|
|
|
if (SQLITE_OK != rc) {
|
|
|
|
MS_LOG(ERROR) << "Can't open database, error: " << sqlite3_errmsg(db);
|
|
|
|
MS_LOG(ERROR) << "Can't open database, error: " << sqlite3_errmsg(db);
|
|
|
|
return -1;
|
|
|
|
return -1;
|
|
|
@ -756,16 +790,16 @@ int64_t ShardReader::GetNumClasses(const std::string &file_path, const std::stri
|
|
|
|
return categories.size();
|
|
|
|
return categories.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::CountTotalRows(const std::string &file_path, const std::shared_ptr<ShardOperator> &op,
|
|
|
|
MSRStatus ShardReader::CountTotalRows(const std::vector<std::string> &file_paths, bool load_dataset,
|
|
|
|
int64_t *count) {
|
|
|
|
const std::shared_ptr<ShardOperator> &op, int64_t *count) {
|
|
|
|
if (Init(file_path) == FAILED) {
|
|
|
|
if (SUCCESS != Init(file_paths, load_dataset)) {
|
|
|
|
return FAILED;
|
|
|
|
return FAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
int64_t num_samples = num_rows_;
|
|
|
|
int64_t num_samples = num_rows_;
|
|
|
|
if (std::dynamic_pointer_cast<ShardCategory>(op)) {
|
|
|
|
if (std::dynamic_pointer_cast<ShardCategory>(op)) {
|
|
|
|
auto category_op = std::dynamic_pointer_cast<ShardCategory>(op);
|
|
|
|
auto category_op = std::dynamic_pointer_cast<ShardCategory>(op);
|
|
|
|
std::string category_field = category_op->GetCategoryField();
|
|
|
|
std::string category_field = category_op->GetCategoryField();
|
|
|
|
auto num_classes = GetNumClasses(file_path, category_field);
|
|
|
|
auto num_classes = GetNumClasses(category_field);
|
|
|
|
num_samples = category_op->GetNumSamples(num_rows_, num_classes);
|
|
|
|
num_samples = category_op->GetNumSamples(num_rows_, num_classes);
|
|
|
|
} else if (std::dynamic_pointer_cast<ShardSample>(op)) {
|
|
|
|
} else if (std::dynamic_pointer_cast<ShardSample>(op)) {
|
|
|
|
num_samples = op->GetNumSamples(num_rows_, 0);
|
|
|
|
num_samples = op->GetNumSamples(num_rows_, 0);
|
|
|
@ -779,12 +813,13 @@ MSRStatus ShardReader::CountTotalRows(const std::string &file_path, const std::s
|
|
|
|
return SUCCESS;
|
|
|
|
return SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::Open(const std::string &file_path, int n_consumer,
|
|
|
|
MSRStatus ShardReader::Open(const std::vector<std::string> &file_paths, bool load_dataset, int n_consumer,
|
|
|
|
const std::vector<std::string> &selected_columns,
|
|
|
|
const std::vector<std::string> &selected_columns,
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators, const bool &block_reader) {
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators, const bool &block_reader) {
|
|
|
|
// Open file and set header by ShardReader
|
|
|
|
// Open file and set header by ShardReader
|
|
|
|
if (Init(file_path) == FAILED) {
|
|
|
|
auto ret = Init(file_paths, load_dataset);
|
|
|
|
return FAILED;
|
|
|
|
if (SUCCESS != ret) {
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto thread_limit = GetMaxThreadNum();
|
|
|
|
auto thread_limit = GetMaxThreadNum();
|
|
|
|
if (n_consumer > thread_limit) {
|
|
|
|
if (n_consumer > thread_limit) {
|
|
|
@ -837,11 +872,11 @@ MSRStatus ShardReader::Open(const std::string &file_path, int n_consumer,
|
|
|
|
return SUCCESS;
|
|
|
|
return SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::OpenPy(const std::string &file_path, const int &n_consumer,
|
|
|
|
MSRStatus ShardReader::OpenPy(const std::vector<std::string> &file_paths, bool load_dataset, const int &n_consumer,
|
|
|
|
const std::vector<std::string> &selected_columns,
|
|
|
|
const std::vector<std::string> &selected_columns,
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators) {
|
|
|
|
const std::vector<std::shared_ptr<ShardOperator>> &operators) {
|
|
|
|
// Open file and set header by ShardReader
|
|
|
|
// Open file and set header by ShardReader
|
|
|
|
if (Init(file_path) == FAILED) {
|
|
|
|
if (SUCCESS != Init(file_paths, load_dataset)) {
|
|
|
|
return FAILED;
|
|
|
|
return FAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// should remove blob field from selected_columns when call from python
|
|
|
|
// should remove blob field from selected_columns when call from python
|
|
|
|