|
|
|
@ -794,6 +794,8 @@ MSRStatus ShardReader::Open(const std::string &file_path, int n_consumer,
|
|
|
|
|
n_consumer = kMinConsumerCount;
|
|
|
|
|
}
|
|
|
|
|
CheckNlp();
|
|
|
|
|
|
|
|
|
|
// dead code
|
|
|
|
|
if (nlp_) {
|
|
|
|
|
selected_columns_ = selected_columns;
|
|
|
|
|
} else {
|
|
|
|
@ -805,6 +807,7 @@ MSRStatus ShardReader::Open(const std::string &file_path, int n_consumer,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
selected_columns_ = selected_columns;
|
|
|
|
|
|
|
|
|
|
if (CheckColumnList(selected_columns_) == FAILED) {
|
|
|
|
|
MS_LOG(ERROR) << "Illegal column list";
|
|
|
|
@ -1064,6 +1067,36 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
|
|
|
|
|
return SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::vector<uint8_t> ShardReader::ExtractBlobFieldBySelectColumns(
|
|
|
|
|
std::vector<uint8_t> &blob_fields_bytes, std::vector<uint32_t> &ordered_selected_columns_index) {
|
|
|
|
|
std::vector<uint8_t> exactly_blob_fields_bytes;
|
|
|
|
|
|
|
|
|
|
auto uint64_from_bytes = [&](int64_t pos) {
|
|
|
|
|
uint64_t result = 0;
|
|
|
|
|
for (uint64_t n = 0; n < kInt64Len; n++) {
|
|
|
|
|
result = (result << 8) + blob_fields_bytes[pos + n];
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// get the exactly blob fields
|
|
|
|
|
uint32_t current_index = 0;
|
|
|
|
|
uint64_t current_offset = 0;
|
|
|
|
|
uint64_t data_len = uint64_from_bytes(current_offset);
|
|
|
|
|
while (current_offset < blob_fields_bytes.size()) {
|
|
|
|
|
if (std::any_of(ordered_selected_columns_index.begin(), ordered_selected_columns_index.end(),
|
|
|
|
|
[¤t_index](uint32_t &index) { return index == current_index; })) {
|
|
|
|
|
exactly_blob_fields_bytes.insert(exactly_blob_fields_bytes.end(), blob_fields_bytes.begin() + current_offset,
|
|
|
|
|
blob_fields_bytes.begin() + current_offset + kInt64Len + data_len);
|
|
|
|
|
}
|
|
|
|
|
current_index++;
|
|
|
|
|
current_offset += kInt64Len + data_len;
|
|
|
|
|
data_len = uint64_from_bytes(current_offset);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return exactly_blob_fields_bytes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_id) {
|
|
|
|
|
// All tasks are done
|
|
|
|
|
if (task_id >= static_cast<int>(tasks_.Size())) {
|
|
|
|
@ -1081,6 +1114,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
return std::make_pair(FAILED, std::vector<std::tuple<std::vector<uint8_t>, json>>());
|
|
|
|
|
}
|
|
|
|
|
const std::shared_ptr<Page> &page = ret.second;
|
|
|
|
|
|
|
|
|
|
// Pack image list
|
|
|
|
|
std::vector<uint8_t> images(addr[1] - addr[0]);
|
|
|
|
|
auto file_offset = header_size_ + page_size_ * (page->get_page_id()) + addr[0];
|
|
|
|
@ -1100,10 +1134,42 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
return std::make_pair(FAILED, std::vector<std::tuple<std::vector<uint8_t>, json>>());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// extract the exactly blob bytes by selected columns
|
|
|
|
|
std::vector<uint8_t> images_with_exact_columns;
|
|
|
|
|
if (selected_columns_.size() == 0) {
|
|
|
|
|
images_with_exact_columns = images;
|
|
|
|
|
} else {
|
|
|
|
|
auto blob_fields = get_blob_fields();
|
|
|
|
|
|
|
|
|
|
std::vector<uint32_t> ordered_selected_columns_index;
|
|
|
|
|
uint32_t index = 0;
|
|
|
|
|
for (auto &blob_field : blob_fields.second) {
|
|
|
|
|
for (auto &field : selected_columns_) {
|
|
|
|
|
if (field.compare(blob_field) == 0) {
|
|
|
|
|
ordered_selected_columns_index.push_back(index);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
index++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ordered_selected_columns_index.size() != 0) {
|
|
|
|
|
// extract the images
|
|
|
|
|
if (blob_fields.second.size() == 1) {
|
|
|
|
|
if (ordered_selected_columns_index.size() == 1) {
|
|
|
|
|
images_with_exact_columns = images;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
images_with_exact_columns = ExtractBlobFieldBySelectColumns(images, ordered_selected_columns_index);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Deliver batch data to output map
|
|
|
|
|
std::vector<std::tuple<std::vector<uint8_t>, json>> batch;
|
|
|
|
|
if (nlp_) {
|
|
|
|
|
json blob_fields = json::from_msgpack(images);
|
|
|
|
|
// dead code
|
|
|
|
|
json blob_fields = json::from_msgpack(images_with_exact_columns);
|
|
|
|
|
|
|
|
|
|
json merge;
|
|
|
|
|
if (selected_columns_.size() > 0) {
|
|
|
|
@ -1121,7 +1187,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
|
|
|
|
|
}
|
|
|
|
|
batch.emplace_back(std::vector<uint8_t>{}, std::move(merge));
|
|
|
|
|
} else {
|
|
|
|
|
batch.emplace_back(std::move(images), std::move(std::get<2>(task)));
|
|
|
|
|
batch.emplace_back(std::move(images_with_exact_columns), std::move(std::get<2>(task)));
|
|
|
|
|
}
|
|
|
|
|
return std::make_pair(SUCCESS, std::move(batch));
|
|
|
|
|
}
|
|
|
|
|