|
|
|
@ -125,14 +125,11 @@ MSRStatus ShardReader::Open() {
|
|
|
|
|
|
|
|
|
|
for (const auto &file : file_paths_) {
|
|
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary);
|
|
|
|
|
if (!fs->good()) {
|
|
|
|
|
MS_LOG(ERROR) << "File could not opened";
|
|
|
|
|
return FAILED;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
MS_LOG(INFO) << "Open shard file successfully.";
|
|
|
|
|
file_streams_.push_back(fs);
|
|
|
|
|
}
|
|
|
|
@ -146,14 +143,11 @@ MSRStatus ShardReader::Open(int n_consumer) {
|
|
|
|
|
for (const auto &file : file_paths_) {
|
|
|
|
|
for (int j = 0; j < n_consumer; ++j) {
|
|
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary);
|
|
|
|
|
if (!fs->good()) {
|
|
|
|
|
MS_LOG(ERROR) << "File could not opened";
|
|
|
|
|
return FAILED;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
file_streams_random_[j].push_back(fs);
|
|
|
|
|
}
|
|
|
|
|
MS_LOG(INFO) << "Open shard file successfully.";
|
|
|
|
@ -311,12 +305,10 @@ MSRStatus ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql,
|
|
|
|
|
std::string file_name = file_paths_[shard_id];
|
|
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
|
|
|
|
|
if (!all_in_index_) {
|
|
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary);
|
|
|
|
|
if (!fs->good()) {
|
|
|
|
|
MS_LOG(ERROR) << "File could not opened";
|
|
|
|
|
}
|
|
|
|
|
return FAILED;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sqlite3_free(errmsg);
|
|
|
|
@ -520,8 +512,8 @@ std::pair<MSRStatus, std::vector<json>> ShardReader::GetLabelsFromBinaryFile(
|
|
|
|
|
std::string file_name = file_paths_[shard_id];
|
|
|
|
|
std::vector<json> res;
|
|
|
|
|
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
|
|
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::out | std::ios::binary);
|
|
|
|
|
if (fs->fail()) {
|
|
|
|
|
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary);
|
|
|
|
|
if (!fs->good()) {
|
|
|
|
|
MS_LOG(ERROR) << "File could not opened";
|
|
|
|
|
return {FAILED, {}};
|
|
|
|
|
}
|
|
|
|
|