[MD] Log update for mindrecord shard reader

pull/6603/head
YangLuo 4 years ago
parent e25e818f45
commit 0770eef37b

@ -184,14 +184,14 @@ std::pair<MSRStatus, sqlite3 *> ShardIndexGenerator::CheckDatabase(const std::st
sqlite3 *db = nullptr; sqlite3 *db = nullptr;
std::ifstream fin(common::SafeCStr(shard_address)); std::ifstream fin(common::SafeCStr(shard_address));
if (!append_ && fin.good()) { if (!append_ && fin.good()) {
MS_LOG(ERROR) << "DB file already exist"; MS_LOG(ERROR) << "Invalid file, DB file already exist: " << shard_address;
fin.close(); fin.close();
return {FAILED, nullptr}; return {FAILED, nullptr};
} }
fin.close(); fin.close();
int rc = sqlite3_open_v2(common::SafeCStr(shard_address), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr); int rc = sqlite3_open_v2(common::SafeCStr(shard_address), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (rc) { if (rc) {
MS_LOG(ERROR) << "Can't open database, error: " << sqlite3_errmsg(db); MS_LOG(ERROR) << "Invalid file, failed to open database: " << shard_address << ", error" << sqlite3_errmsg(db);
return {FAILED, nullptr}; return {FAILED, nullptr};
} else { } else {
MS_LOG(DEBUG) << "Opened database successfully"; MS_LOG(DEBUG) << "Opened database successfully";
@ -522,14 +522,14 @@ MSRStatus ShardIndexGenerator::ExecuteTransaction(const int &shard_no, std::pair
// Add index data to database // Add index data to database
std::string shard_address = shard_header_.GetShardAddressByID(shard_no); std::string shard_address = shard_header_.GetShardAddressByID(shard_no);
if (shard_address.empty()) { if (shard_address.empty()) {
MS_LOG(ERROR) << "Shard address is null"; MS_LOG(ERROR) << "Invalid data, shard address is null";
return FAILED; return FAILED;
} }
std::fstream in; std::fstream in;
in.open(common::SafeCStr(shard_address), std::ios::in | std::ios::binary); in.open(common::SafeCStr(shard_address), std::ios::in | std::ios::binary);
if (!in.good()) { if (!in.good()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << shard_address;
return FAILED; return FAILED;
} }
(void)sqlite3_exec(db.second, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr); (void)sqlite3_exec(db.second, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr);

@ -101,7 +101,7 @@ MSRStatus ShardReader::Init(const std::vector<std::string> &file_paths, bool loa
// sqlite3_open create a database if not found, use sqlite3_open_v2 instead of it // sqlite3_open create a database if not found, use sqlite3_open_v2 instead of it
int rc = sqlite3_open_v2(common::SafeCStr(file + ".db"), &db, SQLITE_OPEN_READONLY, nullptr); int rc = sqlite3_open_v2(common::SafeCStr(file + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
if (rc != SQLITE_OK) { if (rc != SQLITE_OK) {
MS_LOG(ERROR) << "Can't open database, error: " << sqlite3_errmsg(db); MS_LOG(ERROR) << "Invalid file, failed to open database: " << file + ".db, error: " << sqlite3_errmsg(db);
return FAILED; return FAILED;
} }
MS_LOG(DEBUG) << "Opened database successfully"; MS_LOG(DEBUG) << "Opened database successfully";
@ -120,7 +120,7 @@ MSRStatus ShardReader::Init(const std::vector<std::string> &file_paths, bool loa
MS_LOG(DEBUG) << "Get " << static_cast<int>(name.size()) << " records from index."; MS_LOG(DEBUG) << "Get " << static_cast<int>(name.size()) << " records from index.";
string shardName = GetFileName(file).second; string shardName = GetFileName(file).second;
if (name.empty() || name[0][0] != shardName) { if (name.empty() || name[0][0] != shardName) {
MS_LOG(ERROR) << "DB file can not match file " << file; MS_LOG(ERROR) << "Invalid file, DB file can not match file: " << file;
sqlite3_free(errmsg); sqlite3_free(errmsg);
sqlite3_close(db); sqlite3_close(db);
db = nullptr; db = nullptr;
@ -182,7 +182,7 @@ MSRStatus ShardReader::Open() {
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
return FAILED; return FAILED;
} }
MS_LOG(INFO) << "Open shard file successfully."; MS_LOG(INFO) << "Open shard file successfully.";
@ -200,7 +200,7 @@ MSRStatus ShardReader::Open(int n_consumer) {
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file), std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
return FAILED; return FAILED;
} }
file_streams_random_[j].push_back(fs); file_streams_random_[j].push_back(fs);
@ -385,7 +385,7 @@ MSRStatus ShardReader::ReadAllRowsInShard(int shard_id, const std::string &sql,
if (!all_in_index_) { if (!all_in_index_) {
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file_name;
return FAILED; return FAILED;
} }
} }
@ -430,7 +430,7 @@ void ShardReader::GetClassesInShard(sqlite3 *db, int shard_id, const std::string
sqlite3_free(errmsg); sqlite3_free(errmsg);
sqlite3_close(db); sqlite3_close(db);
db = nullptr; db = nullptr;
MS_LOG(ERROR) << "Error in select sql statement, sql:" << common::SafeCStr(sql) << ", error: " << errmsg; MS_LOG(ERROR) << "Error in select sql statement, sql: " << common::SafeCStr(sql) << ", error: " << errmsg;
return; return;
} }
MS_LOG(INFO) << "Get " << static_cast<int>(columns.size()) << " records from shard " << shard_id << " index."; MS_LOG(INFO) << "Get " << static_cast<int>(columns.size()) << " records from shard " << shard_id << " index.";
@ -602,7 +602,7 @@ MSRStatus ShardReader::QueryWithCriteria(sqlite3 *db, string &sql, string criter
std::vector<std::vector<std::string>> &labels) { std::vector<std::vector<std::string>> &labels) {
sqlite3_stmt *stmt = nullptr; sqlite3_stmt *stmt = nullptr;
if (sqlite3_prepare_v2(db, common::SafeCStr(sql), -1, &stmt, 0) != SQLITE_OK) { if (sqlite3_prepare_v2(db, common::SafeCStr(sql), -1, &stmt, 0) != SQLITE_OK) {
MS_LOG(ERROR) << "SQL error: could not prepare statement"; MS_LOG(ERROR) << "SQL error: could not prepare statement, sql: " << sql;
return FAILED; return FAILED;
} }
int index = sqlite3_bind_parameter_index(stmt, ":criteria"); int index = sqlite3_bind_parameter_index(stmt, ":criteria");
@ -631,7 +631,7 @@ std::pair<MSRStatus, std::vector<json>> ShardReader::GetLabelsFromBinaryFile(
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file_name), std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file_name;
return {FAILED, {}}; return {FAILED, {}};
} }
@ -795,7 +795,8 @@ int64_t ShardReader::GetNumClasses(const std::string &category_field) {
sqlite3 *db = nullptr; sqlite3 *db = nullptr;
int rc = sqlite3_open_v2(common::SafeCStr(file_paths_[x] + ".db"), &db, SQLITE_OPEN_READONLY, nullptr); int rc = sqlite3_open_v2(common::SafeCStr(file_paths_[x] + ".db"), &db, SQLITE_OPEN_READONLY, nullptr);
if (SQLITE_OK != rc) { if (SQLITE_OK != rc) {
MS_LOG(ERROR) << "Can't open database, error: " << sqlite3_errmsg(db); MS_LOG(ERROR) << "Invalid file, failed to open database: " << file_paths_[x] + ".db, error: "
<< sqlite3_errmsg(db);
return -1; return -1;
} }
threads[x] = std::thread(&ShardReader::GetClassesInShard, this, db, x, sql, std::ref(categories)); threads[x] = std::thread(&ShardReader::GetClassesInShard, this, db, x, sql, std::ref(categories));
@ -970,19 +971,19 @@ MSRStatus ShardReader::CreateTasksByCategory(const std::vector<std::tuple<int, i
if (std::dynamic_pointer_cast<ShardPkSample>(op)) { if (std::dynamic_pointer_cast<ShardPkSample>(op)) {
num_samples = std::dynamic_pointer_cast<ShardPkSample>(op)->GetNumSamples(); num_samples = std::dynamic_pointer_cast<ShardPkSample>(op)->GetNumSamples();
if (num_samples < 0) { if (num_samples < 0) {
MS_LOG(ERROR) << "Parameter num_samples is not positive or zero"; MS_LOG(ERROR) << "Invalid parameter, num_samples must be greater than or equal to 0, but got " << num_samples;
return FAILED; return FAILED;
} }
} }
if (num_elements <= 0) { if (num_elements <= 0) {
MS_LOG(ERROR) << "Parameter num_element is not positive"; MS_LOG(ERROR) << "Invalid parameter, num_elements must be greater than 0, but got " << num_elements;
return FAILED; return FAILED;
} }
if (categories.empty() == true) { if (categories.empty() == true) {
std::string category_field = category_op->GetCategoryField(); std::string category_field = category_op->GetCategoryField();
int64_t num_categories = category_op->GetNumCategories(); int64_t num_categories = category_op->GetNumCategories();
if (num_categories <= 0) { if (num_categories <= 0) {
MS_LOG(ERROR) << "Parameter num_categories is not positive"; MS_LOG(ERROR) << "Invalid parameter, num_categories must be greater than 0, but got " << num_elements;
return FAILED; return FAILED;
} }
std::set<std::string> categories_set; std::set<std::string> categories_set;

@ -90,14 +90,14 @@ MSRStatus ShardWriter::OpenDataFiles(bool append) {
// open the mindrecord file to write // open the mindrecord file to write
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc); fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "MindRecord file could not opened."; MS_LOG(ERROR) << "MindRecord file could not opened: " << file;
return FAILED; return FAILED;
} }
} else { } else {
// open the mindrecord file to append // open the mindrecord file to append
fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary); fs->open(common::SafeCStr(file), std::ios::out | std::ios::in | std::ios::binary);
if (!fs->good()) { if (!fs->good()) {
MS_LOG(ERROR) << "MindRecord file could not opened for append."; MS_LOG(ERROR) << "MindRecord file could not opened for append: " << file;
return FAILED; return FAILED;
} }
} }
@ -140,11 +140,11 @@ MSRStatus ShardWriter::InitLockFile() {
MSRStatus ShardWriter::Open(const std::vector<std::string> &paths, bool append) { MSRStatus ShardWriter::Open(const std::vector<std::string> &paths, bool append) {
shard_count_ = paths.size(); shard_count_ = paths.size();
if (shard_count_ > kMaxShardCount || shard_count_ == 0) { if (shard_count_ > kMaxShardCount || shard_count_ == 0) {
MS_LOG(ERROR) << "The Shard Count greater than max value or equal to 0."; MS_LOG(ERROR) << "The Shard Count greater than max value(1000) or equal to 0, but got " << shard_count_;
return FAILED; return FAILED;
} }
if (schema_count_ > kMaxSchemaCount) { if (schema_count_ > kMaxSchemaCount) {
MS_LOG(ERROR) << "The schema Count greater than max value."; MS_LOG(ERROR) << "The schema Count greater than max value(1), but got " << schema_count_;
return FAILED; return FAILED;
} }
@ -202,7 +202,7 @@ MSRStatus ShardWriter::OpenForAppend(const std::string &path) {
compression_size_ = shard_header_->GetCompressionSize(); compression_size_ = shard_header_->GetCompressionSize();
ret = Open(real_addresses, true); ret = Open(real_addresses, true);
if (ret == FAILED) { if (ret == FAILED) {
MS_LOG(ERROR) << "Open file failed"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << real_addresses;
return FAILED; return FAILED;
} }
shard_column_ = std::make_shared<ShardColumn>(shard_header_); shard_column_ = std::make_shared<ShardColumn>(shard_header_);
@ -564,14 +564,14 @@ int ShardWriter::LockWriter(bool parallel_writer) {
std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>(); std::shared_ptr<std::fstream> fs = std::make_shared<std::fstream>();
fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary); fs->open(common::SafeCStr(file), std::ios::in | std::ios::out | std::ios::binary);
if (fs->fail()) { if (fs->fail()) {
MS_LOG(ERROR) << "File could not opened"; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
return -1; return -1;
} }
file_streams_.push_back(fs); file_streams_.push_back(fs);
} }
if (shard_header_->FileToPages(pages_file_) == FAILED) { if (shard_header_->FileToPages(pages_file_) == FAILED) {
MS_LOG(ERROR) << "Read pages from file failed"; MS_LOG(ERROR) << "Invalid data, failed to read pages from file.";
return -1; return -1;
} }
return fd; return fd;

Loading…
Cancel
Save