|
|
|
@ -56,9 +56,9 @@ MSRStatus ShardReader::Init(const std::string &file_path) {
|
|
|
|
|
return FAILED;
|
|
|
|
|
}
|
|
|
|
|
shard_header_ = std::make_shared<ShardHeader>(sh);
|
|
|
|
|
header_size_ = shard_header_->get_header_size();
|
|
|
|
|
page_size_ = shard_header_->get_page_size();
|
|
|
|
|
file_paths_ = shard_header_->get_shard_addresses();
|
|
|
|
|
header_size_ = shard_header_->GetHeaderSize();
|
|
|
|
|
page_size_ = shard_header_->GetPageSize();
|
|
|
|
|
file_paths_ = shard_header_->GetShardAddresses();
|
|
|
|
|
|
|
|
|
|
for (const auto &file : file_paths_) {
|
|
|
|
|
sqlite3 *db = nullptr;
|
|
|
|
@ -105,7 +105,7 @@ MSRStatus ShardReader::Init(const std::string &file_path) {
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::CheckColumnList(const std::vector<std::string> &selected_columns) {
|
|
|
|
|
vector<int> inSchema(selected_columns.size(), 0);
|
|
|
|
|
for (auto &p : get_shard_header()->get_schemas()) {
|
|
|
|
|
for (auto &p : GetShardHeader()->GetSchemas()) {
|
|
|
|
|
auto schema = p->GetSchema()["schema"];
|
|
|
|
|
for (unsigned int i = 0; i < selected_columns.size(); ++i) {
|
|
|
|
|
if (schema.find(selected_columns[i]) != schema.end()) {
|
|
|
|
@ -183,15 +183,15 @@ void ShardReader::Close() {
|
|
|
|
|
FileStreamsOperator();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::shared_ptr<ShardHeader> ShardReader::get_shard_header() const { return shard_header_; }
|
|
|
|
|
std::shared_ptr<ShardHeader> ShardReader::GetShardHeader() const { return shard_header_; }
|
|
|
|
|
|
|
|
|
|
int ShardReader::get_shard_count() const { return shard_header_->get_shard_count(); }
|
|
|
|
|
int ShardReader::GetShardCount() const { return shard_header_->GetShardCount(); }
|
|
|
|
|
|
|
|
|
|
int ShardReader::get_num_rows() const { return num_rows_; }
|
|
|
|
|
int ShardReader::GetNumRows() const { return num_rows_; }
|
|
|
|
|
|
|
|
|
|
std::vector<std::tuple<int, int, int, uint64_t>> ShardReader::ReadRowGroupSummary() {
|
|
|
|
|
std::vector<std::tuple<int, int, int, uint64_t>> row_group_summary;
|
|
|
|
|
int shard_count = shard_header_->get_shard_count();
|
|
|
|
|
int shard_count = shard_header_->GetShardCount();
|
|
|
|
|
if (shard_count <= 0) {
|
|
|
|
|
return row_group_summary;
|
|
|
|
|
}
|
|
|
|
@ -205,13 +205,13 @@ std::vector<std::tuple<int, int, int, uint64_t>> ShardReader::ReadRowGroupSummar
|
|
|
|
|
for (uint64_t page_id = 0; page_id <= last_page_id; ++page_id) {
|
|
|
|
|
const auto &page_t = shard_header_->GetPage(shard_id, page_id);
|
|
|
|
|
const auto &page = page_t.first;
|
|
|
|
|
if (page->get_page_type() != kPageTypeBlob) continue;
|
|
|
|
|
uint64_t start_row_id = page->get_start_row_id();
|
|
|
|
|
if (start_row_id > page->get_end_row_id()) {
|
|
|
|
|
if (page->GetPageType() != kPageTypeBlob) continue;
|
|
|
|
|
uint64_t start_row_id = page->GetStartRowID();
|
|
|
|
|
if (start_row_id > page->GetEndRowID()) {
|
|
|
|
|
return std::vector<std::tuple<int, int, int, uint64_t>>();
|
|
|
|
|
}
|
|
|
|
|
uint64_t number_of_rows = page->get_end_row_id() - start_row_id;
|
|
|
|
|
row_group_summary.emplace_back(shard_id, page->get_page_type_id(), start_row_id, number_of_rows);
|
|
|
|
|
uint64_t number_of_rows = page->GetEndRowID() - start_row_id;
|
|
|
|
|
row_group_summary.emplace_back(shard_id, page->GetPageTypeID(), start_row_id, number_of_rows);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -265,7 +265,7 @@ MSRStatus ShardReader::ConvertLabelToJson(const std::vector<std::vector<std::str
|
|
|
|
|
json construct_json;
|
|
|
|
|
for (unsigned int j = 0; j < columns.size(); ++j) {
|
|
|
|
|
// construct json "f1": value
|
|
|
|
|
auto schema = shard_header_->get_schemas()[0]->GetSchema()["schema"];
|
|
|
|
|
auto schema = shard_header_->GetSchemas()[0]->GetSchema()["schema"];
|
|
|
|
|
|
|
|
|
|
// convert the string to base type by schema
|
|
|
|
|
if (schema[columns[j]]["type"] == "int32") {
|
|
|
|
@ -317,7 +317,7 @@ MSRStatus ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql,
|
|
|
|
|
|
|
|
|
|
MSRStatus ShardReader::GetAllClasses(const std::string &category_field, std::set<std::string> &categories) {
|
|
|
|
|
std::map<std::string, uint64_t> index_columns;
|
|
|
|
|
for (auto &field : get_shard_header()->get_fields()) {
|
|
|
|
|
for (auto &field : GetShardHeader()->GetFields()) {
|
|
|
|
|
index_columns[field.second] = field.first;
|
|
|
|
|
}
|
|
|
|
|
if (index_columns.find(category_field) == index_columns.end()) {
|
|
|
|
@ -400,11 +400,11 @@ ROW_GROUP_BRIEF ShardReader::ReadRowGroupBrief(int group_id, int shard_id, const
|
|
|
|
|
}
|
|
|
|
|
const std::shared_ptr<Page> &page = ret.second;
|
|
|
|
|
std::string file_name = file_paths_[shard_id];
|
|
|
|
|
uint64_t page_length = page->get_page_size();
|
|
|
|
|
uint64_t page_offset = page_size_ * page->get_page_id() + header_size_;
|
|
|
|
|
std::vector<std::vector<uint64_t>> image_offset = GetImageOffset(page->get_page_id(), shard_id);
|
|
|
|
|
uint64_t page_length = page->GetPageSize();
|
|
|
|
|
uint64_t page_offset = page_size_ * page->GetPageID() + header_size_;
|
|
|
|
|
std::vector<std::vector<uint64_t>> image_offset = GetImageOffset(page->GetPageID(), shard_id);
|
|
|
|
|
|
|
|
|
|
auto status_labels = GetLabels(page->get_page_id(), shard_id, columns);
|
|
|
|
|
auto status_labels = GetLabels(page->GetPageID(), shard_id, columns);
|
|
|
|
|
if (status_labels.first != SUCCESS) {
|
|
|
|
|
return std::make_tuple(FAILED, "", 0, 0, std::vector<std::vector<uint64_t>>(), std::vector<json>());
|
|
|
|
|
}
|
|
|
|
@ -426,11 +426,11 @@ ROW_GROUP_BRIEF ShardReader::ReadRowGroupCriteria(int group_id, int shard_id,
|
|
|
|
|
}
|
|
|
|
|
const std::shared_ptr<Page> &page = ret.second;
|
|
|
|
|
std::string file_name = file_paths_[shard_id];
|
|
|
|
|
uint64_t page_length = page->get_page_size();
|
|
|
|
|
uint64_t page_offset = page_size_ * page->get_page_id() + header_size_;
|
|
|
|
|
std::vector<std::vector<uint64_t>> image_offset = GetImageOffset(page->get_page_id(), shard_id, criteria);
|
|
|
|
|
uint64_t page_length = page->GetPageSize();
|
|
|
|
|
uint64_t page_offset = page_size_ * page->GetPageID() + header_size_;
|
|
|
|
|
std::vector<std::vector<uint64_t>> image_offset = GetImageOffset(page->GetPageID(), shard_id, criteria);
|
|
|
|
|
|
|
|
|
|
auto status_labels = GetLabels(page->get_page_id(), shard_id, columns, criteria);
|
|
|
|
|
auto status_labels = GetLabels(page->GetPageID(), shard_id, columns, criteria);
|
|
|
|
|
if (status_labels.first != SUCCESS) {
|
|
|
|
|
return std::make_tuple(FAILED, "", 0, 0, std::vector<std::vector<uint64_t>>(), std::vector<json>());
|
|
|
|
|
}
|
|
|
|
@ -458,7 +458,7 @@ std::vector<std::vector<uint64_t>> ShardReader::GetImageOffset(int page_id, int
|
|
|
|
|
|
|
|
|
|
// whether use index search
|
|
|
|
|
if (!criteria.first.empty()) {
|
|
|
|
|
auto schema = shard_header_->get_schemas()[0]->GetSchema();
|
|
|
|
|
auto schema = shard_header_->GetSchemas()[0]->GetSchema();
|
|
|
|
|
|
|
|
|
|
// not number field should add '' in sql
|
|
|
|
|
if (kNumberFieldTypeSet.find(schema["schema"][criteria.first]["type"]) != kNumberFieldTypeSet.end()) {
|
|
|
|
@ -497,13 +497,13 @@ void ShardReader::CheckNlp() {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ShardReader::get_nlp_flag() { return nlp_; }
|
|
|
|
|
bool ShardReader::GetNlpFlag() { return nlp_; }
|
|
|
|
|
|
|
|
|
|
std::pair<ShardType, std::vector<std::string>> ShardReader::get_blob_fields() {
|
|
|
|
|
std::pair<ShardType, std::vector<std::string>> ShardReader::GetBlobFields() {
|
|
|
|
|
std::vector<std::string> blob_fields;
|
|
|
|
|
for (auto &p : get_shard_header()->get_schemas()) {
|
|
|
|
|
for (auto &p : GetShardHeader()->GetSchemas()) {
|
|
|
|
|
// assume one schema
|
|
|
|
|
const auto &fields = p->get_blob_fields();
|
|
|
|
|
const auto &fields = p->GetBlobFields();
|
|
|
|
|
blob_fields.assign(fields.begin(), fields.end());
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -516,7 +516,7 @@ void ShardReader::CheckIfColumnInIndex(const std::vector<std::string> &columns)
|
|
|
|
|
all_in_index_ = false;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
for (auto &field : get_shard_header()->get_fields()) {
|
|
|
|
|
for (auto &field : GetShardHeader()->GetFields()) {
|
|
|
|
|
column_schema_id_[field.second] = field.first;
|
|
|
|
|
}
|
|
|
|
|
for (auto &col : columns) {
|
|
|
|
@ -671,7 +671,7 @@ std::pair<MSRStatus, std::vector<json>> ShardReader::GetLabels(int page_id, int
|
|
|
|
|
json construct_json;
|
|
|
|
|
for (unsigned int j = 0; j < columns.size(); ++j) {
|
|
|
|
|
// construct json "f1": value
|
|
|
|
|
auto schema = shard_header_->get_schemas()[0]->GetSchema()["schema"];
|
|
|
|
|
auto schema = shard_header_->GetSchemas()[0]->GetSchema()["schema"];
|
|
|
|
|
|
|
|
|
|
// convert the string to base type by schema
|
|
|
|
|
if (schema[columns[j]]["type"] == "int32") {
|
|
|
|
@ -719,9 +719,9 @@ int64_t ShardReader::GetNumClasses(const std::string &file_path, const std::stri
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
auto header = std::make_shared<ShardHeader>(sh);
|
|
|
|
|
auto file_paths = header->get_shard_addresses();
|
|
|
|
|
auto file_paths = header->GetShardAddresses();
|
|
|
|
|
auto shard_count = file_paths.size();
|
|
|
|
|
auto index_fields = header->get_fields();
|
|
|
|
|
auto index_fields = header->GetFields();
|
|
|
|
|
|
|
|
|
|
std::map<std::string, int64_t> map_schema_id_fields;
|
|
|
|
|
for (auto &field : index_fields) {
|
|
|
|
@ -799,7 +799,7 @@ MSRStatus ShardReader::Open(const std::string &file_path, int n_consumer,
|
|
|
|
|
if (nlp_) {
|
|
|
|
|
selected_columns_ = selected_columns;
|
|
|
|
|
} else {
|
|
|
|
|
vector<std::string> blob_fields = get_blob_fields().second;
|
|
|
|
|
vector<std::string> blob_fields = GetBlobFields().second;
|
|
|
|
|
for (unsigned int i = 0; i < selected_columns.size(); ++i) {
|
|
|
|
|
if (!std::any_of(blob_fields.begin(), blob_fields.end(),
|
|
|
|
|
[&selected_columns, i](std::string item) { return selected_columns[i] == item; })) {
|
|
|
|
@ -846,7 +846,7 @@ MSRStatus ShardReader::OpenPy(const std::string &file_path, const int &n_consume
|
|
|
|
|
}
|
|
|
|
|
// should remove blob field from selected_columns when call from python
|
|
|
|
|
std::vector<std::string> columns(selected_columns);
|
|
|
|
|
auto blob_fields = get_blob_fields().second;
|
|
|
|
|
auto blob_fields = GetBlobFields().second;
|
|
|
|
|
for (auto &blob_field : blob_fields) {
|
|
|
|
|
auto it = std::find(selected_columns.begin(), selected_columns.end(), blob_field);
|
|
|
|
|
if (it != selected_columns.end()) {
|
|
|
|
@ -909,7 +909,7 @@ vector<std::string> ShardReader::GetAllColumns() {
|
|
|
|
|
vector<std::string> columns;
|
|
|
|
|
if (nlp_) {
|
|
|
|
|
for (auto &c : selected_columns_) {
|
|
|
|
|
for (auto &p : get_shard_header()->get_schemas()) {
|
|
|
|
|
for (auto &p : GetShardHeader()->GetSchemas()) {
|
|
|
|
|
auto schema = p->GetSchema()["schema"]; // make sure schema is not reference since error occurred in arm.
|
|
|
|
|
for (auto it = schema.begin(); it != schema.end(); ++it) {
|
|
|
|
|
if (it.key() == c) {
|
|
|
|
@ -943,7 +943,7 @@ MSRStatus ShardReader::CreateTasksByCategory(const std::vector<std::tuple<int, i
|
|
|
|
|
CheckIfColumnInIndex(columns);
|
|
|
|
|
|
|
|
|
|
auto category_op = std::dynamic_pointer_cast<ShardCategory>(op);
|
|
|
|
|
auto categories = category_op->get_categories();
|
|
|
|
|
auto categories = category_op->GetCategories();
|
|
|
|
|
int64_t num_elements = category_op->GetNumElements();
|
|
|
|
|
if (num_elements <= 0) {
|
|
|
|
|
MS_LOG(ERROR) << "Parameter num_element is not positive";
|
|
|
|
@ -1104,7 +1104,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pick up task from task list
|
|
|
|
|
auto task = tasks_.get_task_by_id(tasks_.permutation_[task_id]);
|
|
|
|
|
auto task = tasks_.GetTaskByID(tasks_.permutation_[task_id]);
|
|
|
|
|
|
|
|
|
|
auto shard_id = std::get<0>(std::get<0>(task));
|
|
|
|
|
auto group_id = std::get<1>(std::get<0>(task));
|
|
|
|
@ -1117,7 +1117,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
|
|
|
|
|
// Pack image list
|
|
|
|
|
std::vector<uint8_t> images(addr[1] - addr[0]);
|
|
|
|
|
auto file_offset = header_size_ + page_size_ * (page->get_page_id()) + addr[0];
|
|
|
|
|
auto file_offset = header_size_ + page_size_ * (page->GetPageID()) + addr[0];
|
|
|
|
|
|
|
|
|
|
auto &io_seekg = file_streams_random_[consumer_id][shard_id]->seekg(file_offset, std::ios::beg);
|
|
|
|
|
if (!io_seekg.good() || io_seekg.fail() || io_seekg.bad()) {
|
|
|
|
@ -1139,7 +1139,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
if (selected_columns_.size() == 0) {
|
|
|
|
|
images_with_exact_columns = images;
|
|
|
|
|
} else {
|
|
|
|
|
auto blob_fields = get_blob_fields();
|
|
|
|
|
auto blob_fields = GetBlobFields();
|
|
|
|
|
|
|
|
|
|
std::vector<uint32_t> ordered_selected_columns_index;
|
|
|
|
|
uint32_t index = 0;
|
|
|
|
@ -1272,7 +1272,7 @@ MSRStatus ShardReader::ConsumerByBlock(int consumer_id) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pick up task from task list
|
|
|
|
|
auto task = tasks_.get_task_by_id(tasks_.permutation_[task_id]);
|
|
|
|
|
auto task = tasks_.GetTaskByID(tasks_.permutation_[task_id]);
|
|
|
|
|
|
|
|
|
|
auto shard_id = std::get<0>(std::get<0>(task));
|
|
|
|
|
auto group_id = std::get<1>(std::get<0>(task));
|
|
|
|
|