!5842 [Dataset] dataset op log changes

Merge pull request !5842 from luoyang/pylint
pull/5842/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit eb049b0e93

@ -181,7 +181,8 @@ Status BarrierOp::blockCond() {
py::object ret_py_obj = condition_function_(); py::object ret_py_obj = condition_function_();
// Process the return value // Process the return value
if (!py::isinstance<py::bool_>(ret_py_obj)) { if (!py::isinstance<py::bool_>(ret_py_obj)) {
return Status(StatusCode::kPyFuncException, "Condition wait function should return true/false"); return Status(StatusCode::kPyFuncException,
"Invalid parameter, condition wait function should return true/false.");
} }
} catch (const py::error_already_set &e) { } catch (const py::error_already_set &e) {
return Status(StatusCode::kPyFuncException, e.what()); return Status(StatusCode::kPyFuncException, e.what());

@ -51,9 +51,15 @@ Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
Status BatchOp::Builder::SanityCheck() { Status BatchOp::Builder::SanityCheck() {
std::string err; std::string err;
err += builder_op_connector_size_ <= 0 ? "connector size <= 0\n" : ""; err += builder_op_connector_size_ <= 0 ? "Invalid parameter, connector_size must be greater than 0, but got " +
err += builder_batch_size_ <= 0 ? "batch size <= 0\n" : ""; std::to_string(builder_op_connector_size_) + ".\n"
err += builder_num_workers_ <= 0 ? "batch num_parallel_workers <= 0\n" : ""; : "";
err += builder_batch_size_ <= 0 ? "Invalid parameter, batch_size must be greater than 0, but got " +
std::to_string(builder_batch_size_) + ".\n"
: "";
err += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err)); return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err));
} }
@ -184,7 +190,9 @@ Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, const std::u
} }
// Don't do anything if the tensor has no data // Don't do anything if the tensor has no data
} else { } else {
RETURN_STATUS_UNEXPECTED("[Batch ERROR] Inconsistent TensorShapes of Column " + std::to_string(i)); RETURN_STATUS_UNEXPECTED(
"Invalid data, expect same shape for each data row, but got inconsistent data shapes in column " +
std::to_string(i));
} }
} }
} else { // handle string column differently } else { // handle string column differently
@ -239,7 +247,9 @@ Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatc
} }
Status BatchOp::LaunchThreadsAndInitOp() { Status BatchOp::LaunchThreadsAndInitOp() {
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1))); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1)));
return Status::OK(); return Status::OK();
@ -258,7 +268,7 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
input_table.reserve(pyfunc_column_names_.size()); input_table.reserve(pyfunc_column_names_.size());
for (std::string col_name : pyfunc_column_names_) { for (std::string col_name : pyfunc_column_names_) {
if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) { if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) {
RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n"); RETURN_STATUS_UNEXPECTED("Invalid parameter, column name: '" + col_name + "' does not exist.\n");
} }
TensorBatch tensor_batch; TensorBatch tensor_batch;
tensor_batch.reserve(table_pair->first->size()); tensor_batch.reserve(table_pair->first->size());
@ -310,12 +320,14 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
py::object size = batch_size_func_(info); py::object size = batch_size_func_(info);
*batch_size = size.cast<int32_t>(); *batch_size = size.cast<int32_t>();
if (*batch_size <= 0) { if (*batch_size <= 0) {
return Status(StatusCode::kPyFuncException, "Batch size function should return an integer > 0"); return Status(StatusCode::kPyFuncException,
"Invalid parameter, batch size function should return an integer greater than 0.");
} }
} catch (const py::error_already_set &e) { } catch (const py::error_already_set &e) {
return Status(StatusCode::kPyFuncException, e.what()); return Status(StatusCode::kPyFuncException, e.what());
} catch (const py::cast_error &e) { } catch (const py::cast_error &e) {
return Status(StatusCode::kPyFuncException, "Batch size function should return an integer > 0"); return Status(StatusCode::kPyFuncException,
"Invalid parameter, batch size function should return an integer greater than 0.");
} }
} }
return Status(StatusCode::kOK, "Batch size func call succeed"); return Status(StatusCode::kOK, "Batch size func call succeed");
@ -346,7 +358,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
// Parse batch map return value // Parse batch map return value
py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj); py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) { if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance<py::tuple>(ret_tuple)) {
return Status(StatusCode::kPyFuncException, "Batch map function should return a tuple"); return Status(StatusCode::kPyFuncException, "Invalid parameter, batch map function should return a tuple.");
} }
for (size_t i = 0; i < ret_tuple.size(); i++) { for (size_t i = 0; i < ret_tuple.size(); i++) {
TensorBatch output_batch; TensorBatch output_batch;
@ -361,7 +373,8 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
} catch (const py::error_already_set &e) { } catch (const py::error_already_set &e) {
return Status(StatusCode::kPyFuncException, e.what()); return Status(StatusCode::kPyFuncException, e.what());
} catch (const py::cast_error &e) { } catch (const py::cast_error &e) {
return Status(StatusCode::kPyFuncException, "Batch map function should return an tuple of list of numpy array"); return Status(StatusCode::kPyFuncException,
"Invalid parameter, batch map function should return a tuple of list of numpy array.");
} }
} }
return Status(StatusCode::kOK); return Status(StatusCode::kOK);
@ -371,7 +384,10 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou
Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info, Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
const std::unordered_map<std::string, int32_t> &column_name_id_map) { const std::unordered_map<std::string, int32_t> &column_name_id_map) {
RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future
CHECK_FAIL_RETURN_UNEXPECTED((*table)->front().size() == column_name_id_map.size(), "col_name_map mismatch"); CHECK_FAIL_RETURN_UNEXPECTED(
(*table)->front().size() == column_name_id_map.size(),
"Invaid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size()));
std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(), std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
0); // value to pad each column's tensor with, default 0 0); // value to pad each column's tensor with, default 0
std::set<int32_t> pad_cols; std::set<int32_t> pad_cols;
@ -383,14 +399,19 @@ Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &
for (size_t col_id : pad_cols) { for (size_t col_id : pad_cols) {
max_shapes[col_id] = std::vector<dsize_t>((*table)->front()[col_id]->Rank(), -1); max_shapes[col_id] = std::vector<dsize_t>((*table)->front()[col_id]->Rank(), -1);
if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id]; // fill pad shape with -1 if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id]; // fill pad shape with -1
CHECK_FAIL_RETURN_UNEXPECTED(pad_shapes[col_id].size() == max_shapes[col_id].size(), "wrong rank in pad_shape"); CHECK_FAIL_RETURN_UNEXPECTED(
pad_shapes[col_id].size() == max_shapes[col_id].size(),
"Invalid data, rank of pad_shape must be equal to rank of specified column. pad_shapes rank:" +
std::to_string(pad_shapes[col_id].size()) + ", column rank: " + std::to_string(max_shapes[col_id].size()));
} }
// calculate maximum shape for each column that needs to be padded // calculate maximum shape for each column that needs to be padded
for (const TensorRow &row : **table) { // iterator each row in a batch for (const TensorRow &row : **table) { // iterator each row in a batch
for (size_t col_id : pad_cols) { // iterator each tensor in a row for (size_t col_id : pad_cols) { // iterator each tensor in a row
CHECK_FAIL_RETURN_UNEXPECTED(row[col_id]->Rank() == max_shapes[col_id].size(), CHECK_FAIL_RETURN_UNEXPECTED(
"Tensor to be padded together need to have the same rank"); row[col_id]->Rank() == max_shapes[col_id].size(),
"Invalid data, data to be padded together need to have the same rank, got shape 1: " +
std::to_string(row[col_id]->Rank()) + ", shape 2: " + std::to_string(max_shapes[col_id].size()));
for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) { // pick the largest number in each dimension for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) { // pick the largest number in each dimension
max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]); max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]);
} }
@ -426,9 +447,13 @@ Status BatchOp::UnpackPadInfo(const PadInfo &pad_info,
} else { } else {
for (const auto &p : pad_info) { for (const auto &p : pad_info) {
auto location = column_name_id_map.find(p.first); auto location = column_name_id_map.find(p.first);
CHECK_FAIL_RETURN_UNEXPECTED(location != column_name_id_map.end(), "no column exists with name:" + p.first); CHECK_FAIL_RETURN_UNEXPECTED(location != column_name_id_map.end(),
"Invalid parameter, column name: " + p.first + " does not exist.");
auto col_id = static_cast<dsize_t>(location->second); auto col_id = static_cast<dsize_t>(location->second);
CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound"); CHECK_FAIL_RETURN_UNEXPECTED(
col_id < pad_vals->size() && col_id < pad_shapes->size(),
"Invalid parameter, column id must be less than the size of pad_val and pad_shape, but got: " +
std::to_string(col_id));
pad_cols->insert(col_id); pad_cols->insert(col_id);
(*pad_vals)[col_id] = p.second.second; // set pad values (*pad_vals)[col_id] = p.second.second; // set pad values
(*pad_shapes)[col_id] = p.second.first.AsVector(); // empty vector if shape is unknown (*pad_shapes)[col_id] = p.second.first.AsVector(); // empty vector if shape is unknown

@ -52,15 +52,16 @@ Status BucketBatchByLengthOp::Builder::SanityCheck() {
std::string error_message; std::string error_message;
if (builder_length_dependent_columns_.empty()) { if (builder_length_dependent_columns_.empty()) {
error_message += "At least 1 column must be specified for element length calculation.\n"; error_message += "Invalid parameter, at least 1 column must be specified for element length calculation.\n";
} }
if (builder_bucket_boundaries_.empty()) { if (builder_bucket_boundaries_.empty()) {
error_message += "At least 1 bucket boundary must be specified.\n"; error_message += "Invalid parameter, at least 1 bucket boundary must be specified.\n";
} }
if (builder_bucket_batch_sizes_.size() != builder_bucket_boundaries_.size() + 1) { if (builder_bucket_batch_sizes_.size() != builder_bucket_boundaries_.size() + 1) {
error_message += "There must be exactly one bucket batch size specified for each bucket boundary.\n"; error_message +=
"Invalid parameter, there must be exactly one bucket batch size specified for each bucket boundary.\n";
} }
CHECK_FAIL_RETURN_UNEXPECTED(error_message.empty(), error_message); CHECK_FAIL_RETURN_UNEXPECTED(error_message.empty(), error_message);
@ -168,7 +169,8 @@ Status BucketBatchByLengthOp::ObtainElementLength(int32_t *out_element_length, T
RETURN_IF_NOT_OK(element_length_function_->Compute(input, &output)); RETURN_IF_NOT_OK(element_length_function_->Compute(input, &output));
RETURN_IF_NOT_OK(output.at(0)->GetItemAt(out_element_length, {0})); RETURN_IF_NOT_OK(output.at(0)->GetItemAt(out_element_length, {0}));
if (*out_element_length < 0) { if (*out_element_length < 0) {
RETURN_STATUS_UNEXPECTED("BucketBatchByLength: element_length_function returned negative integer"); RETURN_STATUS_UNEXPECTED(
"Invalid parameter, element_length_function must return an integer greater than or equal to 0.");
} }
} else { } else {
*out_element_length = element[0]->shape()[0]; *out_element_length = element[0]->shape()[0];
@ -187,7 +189,8 @@ Status BucketBatchByLengthOp::PadAndBatchBucket(int32_t bucket_index, int32_t ba
for (size_t i = 0; i < pad_shape.size(); i++) { for (size_t i = 0; i < pad_shape.size(); i++) {
if (pad_shape[i] == TensorShape::kDimUnknown) { if (pad_shape[i] == TensorShape::kDimUnknown) {
if (bucket_index + 1 >= bucket_boundaries_.size()) { if (bucket_index + 1 >= bucket_boundaries_.size()) {
std::string error_message = "Requested to pad to bucket boundary, element falls in last bucket"; std::string error_message =
"Invalid data, requested to pad to bucket boundary, element falls in last bucket.";
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, error_message); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, error_message);
} }

@ -41,7 +41,9 @@ BuildSentencePieceVocabOp::BuildSentencePieceVocabOp(std::shared_ptr<SentencePie
} }
Status BuildSentencePieceVocabOp::operator()() { Status BuildSentencePieceVocabOp::operator()() {
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
RETURN_IF_NOT_OK(sentence_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(sentence_queue_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK( RETURN_IF_NOT_OK(
tree_->AllTasks()->CreateAsyncTask("sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this))); tree_->AllTasks()->CreateAsyncTask("sentenceTask", std::bind(&BuildSentencePieceVocabOp::SentenceThread, this)));
@ -69,12 +71,12 @@ Status BuildSentencePieceVocabOp::SentenceThread() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
if (col_names_.empty() == true) { if (col_names_.empty() == true) {
auto itr = column_name_id_map_.find("text"); auto itr = column_name_id_map_.find("text");
CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(), CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(), "Invalid data, 'text' column does not exist.");
"'text' column doesn't exist when column name is empty");
col_id_ = itr->second; col_id_ = itr->second;
} else { } else {
auto itr = column_name_id_map_.find(col_names_[0]); auto itr = column_name_id_map_.find(col_names_[0]);
CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(), col_names_[0] + "column doesn't exist"); CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(),
"Invalid parameter, column name: " + col_names_[0] + "does not exist.");
col_id_ = itr->second; col_id_ = itr->second;
} }
std::unique_ptr<DatasetSentenceIterator> sentence_iter = std::make_unique<DatasetSentenceIterator>(this); std::unique_ptr<DatasetSentenceIterator> sentence_iter = std::make_unique<DatasetSentenceIterator>(this);
@ -85,7 +87,8 @@ Status BuildSentencePieceVocabOp::SentenceThread() {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, s_status.message()); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, s_status.message());
} else { } else {
if (vocab_ == nullptr) { if (vocab_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "sentencepiece vocab ptr must not be nullptr"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, sentencepiece vocab not set.");
} }
vocab_->set_model_proto(model_proto); vocab_->set_model_proto(model_proto);
} }
@ -141,8 +144,10 @@ void BuildSentencePieceVocabOp::Next(std::string *sentence) {
} }
if (new_row[col_id_]->type().IsNumeric() || new_row[col_id_]->Rank() > 1) { if (new_row[col_id_]->type().IsNumeric() || new_row[col_id_]->Rank() > 1) {
ret_status_ = Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, ret_status_ =
"for dataset only words on string columns or must bu scalar"); Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid data, build_sentence_piece_vocab only works on string data with rank equal to 1, got type: " +
new_row[col_id_]->type().ToString() + "and rank: " + std::to_string(new_row[col_id_]->Rank()));
read_done_ = true; read_done_ = true;
return; return;
} }

@ -54,7 +54,9 @@ Status BuildVocabOp::WorkerEntry(int32_t worker_id) {
int32_t row_cnt = 0; int32_t row_cnt = 0;
while (!new_row.empty()) { while (!new_row.empty()) {
for (int32_t col : col_ids_) { for (int32_t col : col_ids_) {
CHECK_FAIL_RETURN_UNEXPECTED(!new_row[col]->type().IsNumeric(), "from_dataset only works on string columns"); CHECK_FAIL_RETURN_UNEXPECTED(!new_row[col]->type().IsNumeric(),
"Invalid data, build_vocab only works on string data, but got numeric data type: " +
new_row[col]->type().ToString());
for (auto itr = new_row[col]->begin<std::string_view>(); itr != new_row[col]->end<std::string_view>(); itr++) { for (auto itr = new_row[col]->begin<std::string_view>(); itr != new_row[col]->end<std::string_view>(); itr++) {
(*wrkr_map)[std::string(*itr)] += 1; (*wrkr_map)[std::string(*itr)] += 1;
} }
@ -77,7 +79,9 @@ Status BuildVocabOp::WorkerEntry(int32_t worker_id) {
Status BuildVocabOp::operator()() { Status BuildVocabOp::operator()() {
// launch the collector thread // launch the collector thread
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
RETURN_IF_NOT_OK(distributor_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(distributor_queue_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(collector_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(collector_queue_->Register(tree_->AllTasks()));
// launch worker threads and collector thread // launch worker threads and collector thread
@ -92,7 +96,8 @@ Status BuildVocabOp::operator()() {
col_ids_.reserve(col_names_.size()); col_ids_.reserve(col_names_.size());
for (std::string col : col_names_) { for (std::string col : col_names_) {
auto itr = column_name_id_map_.find(col); auto itr = column_name_id_map_.find(col);
CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(), col + " column doesn't exist"); CHECK_FAIL_RETURN_UNEXPECTED(itr != column_name_id_map_.end(),
"Invalid parameter, column name: " + col + " does not exist.");
col_ids_.push_back(itr->second); col_ids_.push_back(itr->second);
} }
} else { } else {
@ -131,7 +136,7 @@ Status BuildVocabOp::CollectorThread() {
++num_quited_worker; ++num_quited_worker;
} }
} // all frequencies are obtained } // all frequencies are obtained
CHECK_FAIL_RETURN_UNEXPECTED(!word_cnt_.empty(), "word_cnt is empty"); CHECK_FAIL_RETURN_UNEXPECTED(!word_cnt_.empty(), "Invalid data, no words in the dataset.");
std::vector<std::string> words; std::vector<std::string> words;
// make sure enough is reserved, this will become a partially sorted list eventually // make sure enough is reserved, this will become a partially sorted list eventually
words.reserve(wrkr_map->size()); words.reserve(wrkr_map->size());
@ -151,7 +156,8 @@ Status BuildVocabOp::CollectorThread() {
err_msg += (word_cnt_.find(sp_tk) != word_cnt_.end() ? sp_tk + "\t" : ""); err_msg += (word_cnt_.find(sp_tk) != word_cnt_.end() ? sp_tk + "\t" : "");
} }
CHECK_FAIL_RETURN_UNEXPECTED(err_msg.empty(), "These specials words are already in the dataset: " + err_msg + "."); CHECK_FAIL_RETURN_UNEXPECTED(err_msg.empty(),
"Invalid data, these special words are already in the dataset: " + err_msg + ".");
int64_t num_words = std::min(static_cast<int64_t>(words.size()), top_k_); int64_t num_words = std::min(static_cast<int64_t>(words.size()), top_k_);
if (num_words == 0) { if (num_words == 0) {
@ -185,10 +191,13 @@ Status BuildVocabOp::CollectorThread() {
} }
Status BuildVocabOp::Builder::Build(std::shared_ptr<BuildVocabOp> *op) { Status BuildVocabOp::Builder::Build(std::shared_ptr<BuildVocabOp> *op) {
CHECK_FAIL_RETURN_UNEXPECTED(builder_num_workers_ > 0, "builder num_workers need to be greater than 0"); CHECK_FAIL_RETURN_UNEXPECTED(
CHECK_FAIL_RETURN_UNEXPECTED(builder_top_k_ > 0, "top_k needs to be positive number"); builder_num_workers_ > 0,
"Invalid parameter, num_parallel_workers must be greater than 0, but got " + std::to_string(builder_num_workers_));
CHECK_FAIL_RETURN_UNEXPECTED(
builder_top_k_ > 0, "Invalid parameter, top_k must be greater than 0, but got " + std::to_string(builder_top_k_));
CHECK_FAIL_RETURN_UNEXPECTED(builder_max_freq_ >= builder_min_freq_ && builder_min_freq_ >= 0, CHECK_FAIL_RETURN_UNEXPECTED(builder_max_freq_ >= builder_min_freq_ && builder_min_freq_ >= 0,
"frequency range [a,b] should be 0 <= a <= b (a,b are inclusive)"); "Invalid parameter, frequency range [a,b] must be 0 <= a <= b (a,b are inclusive).");
(*op) = std::make_shared<BuildVocabOp>( (*op) = std::make_shared<BuildVocabOp>(
builder_vocab_, builder_col_names_, std::make_pair(builder_min_freq_, builder_max_freq_), builder_top_k_, builder_vocab_, builder_col_names_, std::make_pair(builder_min_freq_, builder_max_freq_), builder_top_k_,
builder_speical_tokens_, builder_special_first_, builder_num_workers_, builder_connector_size_); builder_speical_tokens_, builder_special_first_, builder_num_workers_, builder_connector_size_);

@ -35,12 +35,13 @@ CacheLookupOp::Builder::Builder() : build_cache_client_(nullptr), build_sampler_
// Check if the required parameters are set by the builder. // Check if the required parameters are set by the builder.
Status CacheLookupOp::Builder::SanityCheck() const { Status CacheLookupOp::Builder::SanityCheck() const {
if (build_cache_client_ == nullptr) { if (build_cache_client_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "CacheLookupOp requires a CacheClient"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, CacheLookupOp requires a CacheClient, but got nullptr.");
} }
// Make sure the cache client has a valid session // Make sure the cache client has a valid session
if (!build_cache_client_->session_id()) { if (!build_cache_client_->session_id()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Cache client for CacheLookupOp is missing session id"); "Invalid parameter, cache client for CacheLookupOp requires a session id which is not equal to 0.");
} }
return Status::OK(); return Status::OK();
} }
@ -55,7 +56,7 @@ Status CacheLookupOp::Builder::Build(std::shared_ptr<CacheLookupOp> *ptr) {
Status CacheLookupOp::operator()() { Status CacheLookupOp::operator()() {
if (!sampler_) { if (!sampler_) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"CacheLookupOp requires a sampler before it can be executed!"); "Invalid parameter, CacheLookupOp requires a sampler before it can be executed, but got nullptr.");
} }
RETURN_IF_NOT_OK(RegisterResources()); RETURN_IF_NOT_OK(RegisterResources());
// Kick off the workers // Kick off the workers

@ -200,11 +200,11 @@ Status CacheMergeOp::PrepareNodePostAction() { // Run any common code from supe
} }
Status CacheMergeOp::ComputeColMap() { Status CacheMergeOp::ComputeColMap() {
CHECK_FAIL_RETURN_UNEXPECTED(child_[kCacheMissChildIdx] != nullptr, "Cache miss stream empty"); CHECK_FAIL_RETURN_UNEXPECTED(child_[kCacheMissChildIdx] != nullptr, "Invalid data, cache miss stream empty.");
if (column_name_id_map().empty()) { if (column_name_id_map().empty()) {
column_name_id_map_ = child_[kCacheMissChildIdx]->column_name_id_map(); column_name_id_map_ = child_[kCacheMissChildIdx]->column_name_id_map();
} }
CHECK_FAIL_RETURN_UNEXPECTED(!column_name_id_map().empty(), "No column map detected"); CHECK_FAIL_RETURN_UNEXPECTED(!column_name_id_map().empty(), "Invalid data, column_name_id_map is empty.");
return Status::OK(); return Status::OK();
} }
@ -219,12 +219,13 @@ CacheMergeOp::Builder::Builder() : build_cache_client_(nullptr), build_sampler_(
// Check if the required parameters are set by the builder. // Check if the required parameters are set by the builder.
Status CacheMergeOp::Builder::SanityCheck() const { Status CacheMergeOp::Builder::SanityCheck() const {
if (build_cache_client_ == nullptr) { if (build_cache_client_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "CacheMergeOp requires a CacheClient"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, CacheMergeOp requires a CacheClient, but got nullptr.");
} }
// Make sure the cache client has a valid session // Make sure the cache client has a valid session
if (!build_cache_client_->session_id()) { if (!build_cache_client_->session_id()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Cache client for CacheMergeOp is missing session id"); "Invalid parameter, cache client for CacheMergeOp requires a session id which is not equal to 0.");
} }
return Status::OK(); return Status::OK();
} }
@ -287,7 +288,7 @@ Status CacheMergeOp::GetRq(row_id_type row_id, CacheMergeOp::TensorRowCacheReque
RETURN_IF_NOT_OK(mem.allocate(1)); RETURN_IF_NOT_OK(mem.allocate(1));
*out = mem.GetMutablePointer(); *out = mem.GetMutablePointer();
} else { } else {
RETURN_STATUS_UNEXPECTED("Map insert fail."); RETURN_STATUS_UNEXPECTED("Invalid data, map insert fail.");
} }
} }
return Status::OK(); return Status::OK();

@ -40,11 +40,13 @@ CacheOp::Builder::Builder() : build_cache_client_(nullptr), build_sampler_(nullp
// Check if the required parameters are set by the builder. // Check if the required parameters are set by the builder.
Status CacheOp::Builder::SanityCheck() const { Status CacheOp::Builder::SanityCheck() const {
if (build_cache_client_ == nullptr) { if (build_cache_client_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "CacheOp requires a CacheClient"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, CacheOp requires a CacheClient, but got nullptr.");
} }
// Make sure the cache client has a valid session // Make sure the cache client has a valid session
if (!build_cache_client_->session_id()) { if (!build_cache_client_->session_id()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Cache client for CacheOp is missing session id"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, cache client for CacheOp requires a session id which is not equal to 0.");
} }
return Status::OK(); return Status::OK();
} }
@ -76,7 +78,7 @@ Status CacheOp::InitCache() { return Status::OK(); }
Status CacheOp::operator()() { Status CacheOp::operator()() {
if (!sampler_) { if (!sampler_) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"CacheOp requires a sampler before it can be executed!"); "Invalid parameter, CacheOp requires a sampler before it can be executed, but got nullptr.");
} }
RETURN_IF_NOT_OK(RegisterResources()); RETURN_IF_NOT_OK(RegisterResources());
// Kick off the workers // Kick off the workers

@ -162,7 +162,7 @@ Status ConcatOp::Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf) {
int32_t index = 0; int32_t index = 0;
for (auto item : new_row) { for (auto item : new_row) {
if ((item->type() != data_type_[index]) || item->Rank() != data_rank_[index++]) { if ((item->type() != data_type_[index]) || item->Rank() != data_rank_[index++]) {
RETURN_STATUS_UNEXPECTED("The data type or data rank is not the same with previous dataset."); RETURN_STATUS_UNEXPECTED("Invalid data, data type or data rank is not the same with previous dataset.");
} }
} }
} }
@ -180,7 +180,7 @@ Status ConcatOp::ComputeColMap() {
// Verify all children have the same column name map // Verify all children have the same column name map
for (int32_t i = 0; i < child_.size(); ++i) { for (int32_t i = 0; i < child_.size(); ++i) {
if (child_[i]->column_name_id_map() != column_name_id_map_) { if (child_[i]->column_name_id_map() != column_name_id_map_) {
RETURN_STATUS_UNEXPECTED("The column name or column order is not the same with previous dataset."); RETURN_STATUS_UNEXPECTED("Invalid data, column name or column order is not the same with previous dataset.");
} }
} }
} else { } else {

@ -262,7 +262,8 @@ Status DatasetOp::GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t wo
if (child_.size() == 0) { if (child_.size() == 0) {
return this->GetNextBuffer(p_buffer, worker_id); return this->GetNextBuffer(p_buffer, worker_id);
} }
CHECK_FAIL_RETURN_UNEXPECTED(child_index < child_.size(), "Child index too big : " + std::to_string(child_index)); CHECK_FAIL_RETURN_UNEXPECTED(child_index < child_.size(),
"Invalid data, child index too big : " + std::to_string(child_index));
std::shared_ptr<DatasetOp> child = child_[child_index]; std::shared_ptr<DatasetOp> child = child_[child_index];
std::unique_ptr<DataBuffer> buf; std::unique_ptr<DataBuffer> buf;
RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id)); RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id));

@ -68,8 +68,8 @@ Status DeviceQueueOp::CheckExceptions(const std::unique_ptr<DataBuffer> &buffer)
TensorRow row; TensorRow row;
buffer->GetRow(0, &row); buffer->GetRow(0, &row);
for (const auto &item : row) { for (const auto &item : row) {
CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Cannot send tensor of string type to device."); CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Cannot send tensor with no data."); CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
} }
} }
return Status::OK(); return Status::OK();
@ -206,7 +206,7 @@ Status DeviceQueueOp::SendDataToGPU() {
if (!is_open) { if (!is_open) {
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function); handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
if (handle == INVALID_HANDLE) { if (handle == INVALID_HANDLE) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Failed to open channel for sending data.");
} }
is_open = true; is_open = true;
} }
@ -249,7 +249,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con
ReleaseData(items[i].data_ptr_); ReleaseData(items[i].data_ptr_);
} }
if (ret == BlockQueueStatus_T::ERROR_INPUT) { if (ret == BlockQueueStatus_T::ERROR_INPUT) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "invalid input Data, please check it."); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it.");
} else { } else {
if (!stop_send_) { if (!stop_send_) {
MS_LOG(DEBUG) << "Retry pushing data..."; MS_LOG(DEBUG) << "Retry pushing data...";
@ -269,7 +269,7 @@ Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items,
for (auto &sub_item : *items) { for (auto &sub_item : *items) {
RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_)); RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_));
if (sub_item.data_ptr_ == nullptr) { if (sub_item.data_ptr_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed."); return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__, "Memory malloc failed.");
} }
(void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_); (void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_);
const unsigned char *column_data = curr_row[i]->GetBuffer(); const unsigned char *column_data = curr_row[i]->GetBuffer();

@ -37,8 +37,12 @@ namespace dataset {
Status FilterOp::Builder::SanityCheck() { Status FilterOp::Builder::SanityCheck() {
std::string err; std::string err;
err += builder_op_connector_size_ <= 0 ? "connector size <= 0\n" : ""; err += builder_op_connector_size_ <= 0 ? "Invalid parameter, connector_size must be greater than 0, but got " +
err += builder_num_workers_ <= 0 ? "filter num_parallel_workers <= 0\n" : ""; std::to_string(builder_op_connector_size_) + ".\n"
: "";
err += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err)); return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, common::SafeCStr(err));
} }
@ -61,7 +65,9 @@ FilterOp::FilterOp(const std::vector<std::string> &in_col_names, int32_t num_wor
Status FilterOp::operator()() { Status FilterOp::operator()() {
// The operator class just starts off threads by calling the tree_ function. // The operator class just starts off threads by calling the tree_ function.
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
filter_queues_.Init(num_workers_, oc_queue_size_); filter_queues_.Init(num_workers_, oc_queue_size_);
RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks()));
Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1)); Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1));
@ -81,7 +87,7 @@ Status FilterOp::ValidateInColumns(const std::vector<std::string> *input_columns
for (const auto &inCol : *input_columns) { for (const auto &inCol : *input_columns) {
bool found = column_name_id_map_.find(inCol) != column_name_id_map_.end() ? true : false; bool found = column_name_id_map_.find(inCol) != column_name_id_map_.end() ? true : false;
if (!found) { if (!found) {
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns."; std::string err_msg = "Invalid parameter, column name: " + inCol + " does not exist in the dataset columns.";
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
} }
@ -224,7 +230,7 @@ Status FilterOp::CheckColumns(const DataBuffer *in_buf, const std::vector<std::s
Status FilterOp::CheckInput(const TensorRow &input) const { Status FilterOp::CheckInput(const TensorRow &input) const {
for (auto &item : input) { for (auto &item : input) {
if (item == nullptr) { if (item == nullptr) {
RETURN_STATUS_UNEXPECTED("input is null."); RETURN_STATUS_UNEXPECTED("Invalid data, input tensor is null.");
} }
} }
return Status::OK(); return Status::OK();
@ -251,7 +257,7 @@ Status FilterOp::InvokePredicateFunc(const TensorRow &input, bool *out_predicate
} catch (const py::error_already_set &e) { } catch (const py::error_already_set &e) {
std::stringstream ss; std::stringstream ss;
ss << e.what() << std::endl; ss << e.what() << std::endl;
ss << "The type of the return value of python predicate function is not bool, or can not be convert to bool."; ss << "Invalid parameter, predicate function function should return true/false.";
return Status(StatusCode::kPyFuncException, ss.str()); return Status(StatusCode::kPyFuncException, ss.str());
} }
return Status(StatusCode::kOK, "FilterOp predicate func call succeed"); return Status(StatusCode::kOK, "FilterOp predicate func call succeed");

@ -35,7 +35,7 @@ ProjectOp::Builder::Builder(const std::vector<std::string> &columns_to_project)
Status ProjectOp::Builder::SanityCheck() const { Status ProjectOp::Builder::SanityCheck() const {
if (builder_columns_to_project_.empty()) { if (builder_columns_to_project_.empty()) {
std::string err_msg("Columns to project is empty."); std::string err_msg("Invalid parameter, no column is specified for project.");
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
return Status::OK(); return Status::OK();
@ -144,7 +144,7 @@ Status ProjectOp::ComputeColMap() {
for (size_t i = 0; i < columns_to_project_.size(); i++) { for (size_t i = 0; i < columns_to_project_.size(); i++) {
std::string &current_column = columns_to_project_[i]; std::string &current_column = columns_to_project_[i];
if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) { if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator."; std::string err_msg = "Invalid parameter, column name: " + current_column + " does not exist.";
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
// Setup the new column name mapping for ourself (base class field) // Setup the new column name mapping for ourself (base class field)

@ -126,7 +126,7 @@ Status RenameOp::ComputeColMap() {
// only checks number of renamed columns have been found, this input check doesn't check everything // only checks number of renamed columns have been found, this input check doesn't check everything
if (found != in_columns_.size()) { if (found != in_columns_.size()) {
MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << "."; MS_LOG(DEBUG) << "Rename operator column names found: " << found << " out of " << in_columns_.size() << ".";
std::string err_msg = "Renamed column doesn't exist in dataset"; std::string err_msg = "Invalid parameter, column to be renamed does not exist in dataset.";
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }

@ -32,7 +32,7 @@ RepeatOp::Builder::Builder(int32_t count) : build_num_repeats_(count) {}
Status RepeatOp::Builder::SanityCheck() const { Status RepeatOp::Builder::SanityCheck() const {
if (build_num_repeats_ < kInfiniteRepeat || build_num_repeats_ == 0) { if (build_num_repeats_ < kInfiniteRepeat || build_num_repeats_ == 0) {
std::string err_msg("Repeat count must be > 0 or -1."); std::string err_msg("Invalid parameter, repeat count must be greater than 0 or equal to -1.");
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
return Status::OK(); return Status::OK();
@ -83,7 +83,7 @@ void RepeatOp::Print(std::ostream &out, bool show_all) const {
// this function will retry to pop the connector again and will get the non-EOE buffer if any. // this function will retry to pop the connector again and will get the non-EOE buffer if any.
Status RepeatOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) { Status RepeatOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) {
if (child_.empty()) { if (child_.empty()) {
RETURN_STATUS_UNEXPECTED("RepeatOp can't be the leaf node."); RETURN_STATUS_UNEXPECTED("Pipeline init failed, RepeatOp can't be the first op in pipeline.");
} }
std::unique_ptr<DataBuffer> buf; std::unique_ptr<DataBuffer> buf;

@ -52,7 +52,7 @@ ShuffleOp::Builder::Builder() : build_shuffle_size_(0), build_reshuffle_each_epo
Status ShuffleOp::Builder::SanityCheck() const { Status ShuffleOp::Builder::SanityCheck() const {
if (build_shuffle_size_ < 2) { if (build_shuffle_size_ < 2) {
RETURN_STATUS_UNEXPECTED("Shuffle buffer size must be greater than 1."); RETURN_STATUS_UNEXPECTED("Invalid parameter, shuffle buffer size must be greater than 1.");
} }
return Status::OK(); return Status::OK();
} }

@ -36,7 +36,7 @@ SkipOp::Builder::Builder(int32_t count) : build_max_skips_(count) {
Status SkipOp::Builder::SanityCheck() const { Status SkipOp::Builder::SanityCheck() const {
if (build_max_skips_ < 0) { if (build_max_skips_ < 0) {
std::string err_msg("Skip count must be positive integer or 0."); std::string err_msg("Invalid parameter, skip count should be greater than or equal to 0.");
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
return Status::OK(); return Status::OK();

@ -44,7 +44,7 @@ Status AlbumOp::Builder::Build(std::shared_ptr<AlbumOp> *ptr) {
builder_schema_ = std::make_unique<DataSchema>(); builder_schema_ = std::make_unique<DataSchema>();
Path schema_file(builder_schema_file_); Path schema_file(builder_schema_file_);
if (builder_schema_file_ == "" || !schema_file.Exists()) { if (builder_schema_file_ == "" || !schema_file.Exists()) {
RETURN_STATUS_UNEXPECTED("Schema not provided"); RETURN_STATUS_UNEXPECTED("Invalid file, schema_file is invalid or not set: " + builder_schema_file_);
} else { } else {
MS_LOG(INFO) << "Schema file provided: " << builder_schema_file_ << "."; MS_LOG(INFO) << "Schema file provided: " << builder_schema_file_ << ".";
builder_schema_->LoadSchemaFile(builder_schema_file_, builder_columns_to_load_); builder_schema_->LoadSchemaFile(builder_schema_file_, builder_columns_to_load_);
@ -58,8 +58,12 @@ Status AlbumOp::Builder::Build(std::shared_ptr<AlbumOp> *ptr) {
Status AlbumOp::Builder::SanityCheck() { Status AlbumOp::Builder::SanityCheck() {
Path dir(builder_dir_); Path dir(builder_dir_);
std::string err_msg; std::string err_msg;
err_msg += dir.IsDirectory() == false ? "Album path is invalid or not set\n" : ""; err_msg += dir.IsDirectory() == false
err_msg += builder_num_workers_ <= 0 ? "Num of parallel workers is set to 0\n" : ""; ? "Invalid parameter, Album path is invalid or not set, path: " + builder_dir_ + ".\n"
: "";
err_msg += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
@ -99,7 +103,7 @@ Status AlbumOp::PrescanEntry() {
dirname_offset_ = folder_path_.length(); dirname_offset_ = folder_path_.length();
std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder); std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
if (folder.Exists() == false || dirItr == nullptr) { if (folder.Exists() == false || dirItr == nullptr) {
RETURN_STATUS_UNEXPECTED("Error unable to open: " + folder_path_); RETURN_STATUS_UNEXPECTED("Invalid file, failed to open folder: " + folder_path_);
} }
MS_LOG(INFO) << "Album folder Path found: " << folder_path_ << "."; MS_LOG(INFO) << "Album folder Path found: " << folder_path_ << ".";
@ -192,7 +196,7 @@ Status AlbumOp::WorkerEntry(int32_t worker_id) {
} }
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
} }
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
} }
// Only support JPEG/PNG/GIF/BMP // Only support JPEG/PNG/GIF/BMP
@ -203,14 +207,14 @@ Status AlbumOp::CheckImageType(const std::string &file_name, bool *valid) {
*valid = false; *valid = false;
file_handle.open(file_name, std::ios::binary | std::ios::in); file_handle.open(file_name, std::ios::binary | std::ios::in);
if (!file_handle.is_open()) { if (!file_handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Can not open image file " + file_name); RETURN_STATUS_UNEXPECTED("Invalid file, can not open image file: " + file_name);
} }
unsigned char file_type[read_num]; unsigned char file_type[read_num];
(void)file_handle.read(reinterpret_cast<char *>(file_type), read_num); (void)file_handle.read(reinterpret_cast<char *>(file_type), read_num);
if (file_handle.fail()) { if (file_handle.fail()) {
file_handle.close(); file_handle.close();
RETURN_STATUS_UNEXPECTED("Read image file failed " + file_name); RETURN_STATUS_UNEXPECTED("Invalid data, failed to read image file: " + file_name);
} }
file_handle.close(); file_handle.close();
if (file_type[0] == 0xff && file_type[1] == 0xd8 && file_type[2] == 0xff) { if (file_type[0] == 0xff && file_type[1] == 0xd8 && file_type[2] == 0xff) {
@ -250,7 +254,7 @@ Status AlbumOp::LoadImageTensor(const std::string &image_file_path, uint32_t col
if (decode_ && valid) { if (decode_ && valid) {
Status rc = Decode(image, &image); Status rc = Decode(image, &image);
if (rc.IsError()) { if (rc.IsError()) {
std::string err = "Fail to decode image:" + image_file_path; std::string err = "Invalid data, failed to decode image: " + image_file_path;
RETURN_STATUS_UNEXPECTED(err); RETURN_STATUS_UNEXPECTED(err);
} }
} }
@ -302,7 +306,8 @@ Status AlbumOp::LoadIntArrayTensor(const nlohmann::json &json_obj, uint32_t col_
MS_LOG(INFO) << "Int array found: " << data << "."; MS_LOG(INFO) << "Int array found: " << data << ".";
RETURN_IF_NOT_OK(Tensor::CreateFromVector(data, &label)); RETURN_IF_NOT_OK(Tensor::CreateFromVector(data, &label));
} else { } else {
RETURN_STATUS_UNEXPECTED("Error in Load Int Tensor"); RETURN_STATUS_UNEXPECTED("Invalid data, column type is neither int32 nor int64, it is " +
data_schema_->column(col_num).type().ToString());
} }
row->push_back(std::move(label)); row->push_back(std::move(label));
return Status::OK(); return Status::OK();
@ -361,7 +366,7 @@ Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) {
std::ifstream file_handle(folder_path_ + file); std::ifstream file_handle(folder_path_ + file);
if (!file_handle.is_open()) { if (!file_handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Json file " + folder_path_ + file + " can not open."); RETURN_STATUS_UNEXPECTED("Invalid file, failed to open json file: " + folder_path_ + file);
} }
std::string line; std::string line;
while (getline(file_handle, line)) { while (getline(file_handle, line)) {
@ -425,7 +430,7 @@ Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) {
} }
} catch (const std::exception &err) { } catch (const std::exception &err) {
file_handle.close(); file_handle.close();
RETURN_STATUS_UNEXPECTED("Parse Json file failed"); RETURN_STATUS_UNEXPECTED("Invalid file, failed to parse json file: " + folder_path_ + file);
} }
} }
file_handle.close(); file_handle.close();
@ -476,7 +481,9 @@ Status AlbumOp::InitSampler() {
} }
Status AlbumOp::LaunchThreadsAndInitOp() { Status AlbumOp::LaunchThreadsAndInitOp() {
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
}
// registers QueueList and individual Queues for interrupt services // registers QueueList and individual Queues for interrupt services
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));

@ -54,7 +54,7 @@ Status CelebAOp::Builder::Build(std::shared_ptr<CelebAOp> *op) {
builder_op_connector_size_, builder_decode_, builder_usage_, builder_extensions_, builder_op_connector_size_, builder_decode_, builder_usage_, builder_extensions_,
std::move(builder_schema_), std::move(builder_sampler_)); std::move(builder_schema_), std::move(builder_sampler_));
if (*op == nullptr) { if (*op == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "CelebAOp is null"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "CelebAOp init failed.");
} }
return Status::OK(); return Status::OK();
@ -63,8 +63,12 @@ Status CelebAOp::Builder::Build(std::shared_ptr<CelebAOp> *op) {
Status CelebAOp::Builder::SanityCheck() { Status CelebAOp::Builder::SanityCheck() {
Path dir(builder_dir_); Path dir(builder_dir_);
std::string err_msg; std::string err_msg;
err_msg += dir.IsDirectory() ? "" : "CelebA path is invalid or not set\n"; err_msg += dir.IsDirectory() == false
err_msg += builder_num_workers_ <= 0 ? "Num of parallel workers is smaller than 1\n" : ""; ? "Invalid parameter, CelebA path is invalid or not set, path: " + builder_dir_ + ".\n"
: "";
err_msg += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
@ -85,7 +89,7 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri
Status CelebAOp::LaunchThreadsAndInitOp() { Status CelebAOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) { if (tree_ == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "tree_ not set"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Pipeline init failed, Execution tree not set.");
} }
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
@ -106,7 +110,9 @@ Status CelebAOp::ParseAttrFile() {
Path folder_path(folder_path_); Path folder_path(folder_path_);
std::ifstream attr_file((folder_path / "list_attr_celeba.txt").toString()); std::ifstream attr_file((folder_path / "list_attr_celeba.txt").toString());
if (!attr_file.is_open()) { if (!attr_file.is_open()) {
return Status(StatusCode::kFileNotExist, __LINE__, __FILE__, "Celeba attr file does not exist"); std::string attr_file_name = (folder_path / "list_attr_celeba.txt").toString();
return Status(StatusCode::kFileNotExist, __LINE__, __FILE__,
"Invalid file, failed to open Celeba attr file: " + attr_file_name);
} }
const auto PushBackToQueue = [this](std::vector<std::string> &vec, std::ifstream &attr_file, const auto PushBackToQueue = [this](std::vector<std::string> &vec, std::ifstream &attr_file,
@ -125,9 +131,11 @@ Status CelebAOp::ParseAttrFile() {
try { try {
num_rows_in_attr_file_ = static_cast<int64_t>(std::stoul(rows_num)); // First line is rows number in attr file num_rows_in_attr_file_ = static_cast<int64_t>(std::stoul(rows_num)); // First line is rows number in attr file
} catch (std::invalid_argument &e) { } catch (std::invalid_argument &e) {
RETURN_STATUS_UNEXPECTED("Conversion to ulong failed, invalid argument."); RETURN_STATUS_UNEXPECTED(
"Invalid data, failed to convert rows_num from attr_file to unsigned long, invalid argument: " + rows_num);
} catch (std::out_of_range &e) { } catch (std::out_of_range &e) {
RETURN_STATUS_UNEXPECTED("Conversion to ulong failed, out of range."); RETURN_STATUS_UNEXPECTED(
"Invalid data, failed to convert rows_num from attr_file to unsigned long, out of range: " + rows_num);
} }
(void)getline(attr_file, attr_name); // Second line is attribute name,ignore it (void)getline(attr_file, attr_name); // Second line is attribute name,ignore it
@ -172,10 +180,10 @@ bool CelebAOp::CheckDatasetTypeValid() {
try { try {
type = std::stoi(vec[1]); type = std::stoi(vec[1]);
} catch (std::invalid_argument &e) { } catch (std::invalid_argument &e) {
MS_LOG(WARNING) << "Conversion to unsigned long failed, invalid argument, " << vec[0] << "."; MS_LOG(WARNING) << "Invalid data, failed to convert to unsigned long, invalid argument: " << vec[1] << ".";
return false; return false;
} catch (std::out_of_range &e) { } catch (std::out_of_range &e) {
MS_LOG(WARNING) << "Conversion to unsigned long failed, out of range, " << vec[0] << "."; MS_LOG(WARNING) << "Invalid data, failed to convert to unsigned long, out of range: " << vec[1] << ".";
return false; return false;
} }
// train:0, valid=1, test=2 // train:0, valid=1, test=2
@ -213,9 +221,9 @@ Status CelebAOp::ParseImageAttrInfo() {
try { try {
value = std::stoi(split[label_index]); value = std::stoi(split[label_index]);
} catch (std::invalid_argument &e) { } catch (std::invalid_argument &e) {
RETURN_STATUS_UNEXPECTED("Conversion to int failed, invalid argument."); RETURN_STATUS_UNEXPECTED("Invalid data, failed to convert to ulong, invalid argument: " + split[label_index]);
} catch (std::out_of_range &e) { } catch (std::out_of_range &e) {
RETURN_STATUS_UNEXPECTED("Conversion to int failed, out of range."); RETURN_STATUS_UNEXPECTED("Conversion to int failed, out of range: " + split[label_index]);
} }
image_labels.second.push_back(value); image_labels.second.push_back(value);
} }
@ -229,8 +237,8 @@ Status CelebAOp::ParseImageAttrInfo() {
num_rows_ = image_labels_vec_.size(); num_rows_ = image_labels_vec_.size();
if (num_rows_ == 0) { if (num_rows_ == 0) {
RETURN_STATUS_UNEXPECTED( RETURN_STATUS_UNEXPECTED(
"There is no valid data matching the dataset API CelebADataset.Please check file path or dataset API " "Invalid data, no valid data matching the dataset API CelebADataset. "
"validation first."); "Please check file path or dataset API validation first");
} }
MS_LOG(DEBUG) << "Celeba dataset rows number is " << num_rows_ << "."; MS_LOG(DEBUG) << "Celeba dataset rows number is " << num_rows_ << ".";
return Status::OK(); return Status::OK();
@ -338,7 +346,7 @@ Status CelebAOp::WorkerEntry(int32_t worker_id) {
} }
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
} }
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Unexpected nullptr received in worker"); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Unexpected nullptr received in worker.");
} }
Status CelebAOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) { Status CelebAOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataBuffer> *db) {
@ -365,7 +373,7 @@ Status CelebAOp::LoadTensorRow(row_id_type row_id, const std::pair<std::string,
Status rc = Decode(image, &image); Status rc = Decode(image, &image);
if (rc.IsError()) { if (rc.IsError()) {
image = nullptr; image = nullptr;
std::string err_msg = "Fail to decode image: " + image_path.toString(); std::string err_msg = "Invalid data, failed to decode image: " + image_path.toString();
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
} }

@ -75,9 +75,14 @@ Status CifarOp::Builder::SanityCheck() {
const std::set<std::string> valid = {"test", "train", "all", ""}; const std::set<std::string> valid = {"test", "train", "all", ""};
Path dir(dir_); Path dir(dir_);
std::string err_msg; std::string err_msg;
err_msg += dir.IsDirectory() == false ? "Cifar path is invalid or not set\n" : ""; err_msg +=
err_msg += num_workers_ <= 0 ? "Num of parallel workers is negative or 0\n" : ""; dir.IsDirectory() == false ? "Invalid parameter, Cifar path is invalid or not set, path: " + dir_ + ".\n" : "";
err_msg += valid.find(usage_) == valid.end() ? "usage needs to be 'train','test' or 'all'\n" : ""; err_msg += num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(num_workers_) + ".\n"
: "";
err_msg += valid.find(usage_) == valid.end()
? "Invalid parameter, usage must be 'train','test' or 'all', but got " + usage_ + ".\n"
: "";
return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
@ -148,7 +153,7 @@ Status CifarOp::operator()() {
Status CifarOp::LaunchThreadsAndInitOp() { Status CifarOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) { if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("tree_ not set"); RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
} }
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
@ -188,7 +193,7 @@ Status CifarOp::WorkerEntry(int32_t worker_id) {
} }
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block)); RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
} }
RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker"); RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
} }
// Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow in a DataBuffer // Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow in a DataBuffer
@ -272,7 +277,8 @@ Status CifarOp::ReadCifar10BlockData() {
for (auto &file : cifar_files_) { for (auto &file : cifar_files_) {
// check the validity of the file path // check the validity of the file path
Path file_path(file); Path file_path(file);
CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(), "invalid file:" + file); CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
"Invalid file, failed to find cifar10 file: " + file);
std::string file_name = file_path.Basename(); std::string file_name = file_path.Basename();
if (usage_ == "train") { if (usage_ == "train") {
@ -284,11 +290,11 @@ Status CifarOp::ReadCifar10BlockData() {
} }
std::ifstream in(file, std::ios::binary); std::ifstream in(file, std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), file + " can not be opened."); CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar10 file: " + file);
for (uint32_t index = 0; index < num_cifar10_records / kCifarBlockImageNum; ++index) { for (uint32_t index = 0; index < num_cifar10_records / kCifarBlockImageNum; ++index) {
(void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char)); (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Fail to read cifar file" + file); CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar10 file: " + file);
(void)cifar_raw_data_block_->EmplaceBack(image_data); (void)cifar_raw_data_block_->EmplaceBack(image_data);
} }
in.close(); in.close();
@ -307,7 +313,8 @@ Status CifarOp::ReadCifar100BlockData() {
for (auto &file : cifar_files_) { for (auto &file : cifar_files_) {
// check the validity of the file path // check the validity of the file path
Path file_path(file); Path file_path(file);
CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(), "invalid file:" + file); CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
"Invalid file, failed to find cifar100 file: " + file);
std::string file_name = file_path.Basename(); std::string file_name = file_path.Basename();
// if usage is train/test, get only these 2 files // if usage is train/test, get only these 2 files
@ -319,15 +326,15 @@ Status CifarOp::ReadCifar100BlockData() {
} else if (file_name.find("train") != std::string::npos) { } else if (file_name.find("train") != std::string::npos) {
num_cifar100_records = 50000; num_cifar100_records = 50000;
} else { } else {
RETURN_STATUS_UNEXPECTED("Cifar 100 file not found!"); RETURN_STATUS_UNEXPECTED("Invalid file, Cifar100 train/test file not found in: " + file_name);
} }
std::ifstream in(file, std::ios::binary); std::ifstream in(file, std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), file + " can not be opened."); CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar100 file: " + file);
for (uint32_t index = 0; index < num_cifar100_records / kCifarBlockImageNum; index++) { for (uint32_t index = 0; index < num_cifar100_records / kCifarBlockImageNum; index++) {
(void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char)); (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Fail to read cifar file" + file); CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar100 file: " + file);
(void)cifar_raw_data_block_->EmplaceBack(image_data); (void)cifar_raw_data_block_->EmplaceBack(image_data);
} }
in.close(); in.close();
@ -348,9 +355,9 @@ Status CifarOp::GetCifarFiles() {
} }
} }
} else { } else {
RETURN_STATUS_UNEXPECTED("Unable to open directory " + dir_path.toString()); RETURN_STATUS_UNEXPECTED("Invalid file, failed to open directory: " + dir_path.toString());
} }
CHECK_FAIL_RETURN_UNEXPECTED(!cifar_files_.empty(), "No .bin files found under " + folder_path_); CHECK_FAIL_RETURN_UNEXPECTED(!cifar_files_.empty(), "Invalid file, no .bin files found under " + folder_path_);
std::sort(cifar_files_.begin(), cifar_files_.end()); std::sort(cifar_files_.begin(), cifar_files_.end());
return Status::OK(); return Status::OK();
} }
@ -390,8 +397,8 @@ Status CifarOp::ParseCifarData() {
num_rows_ = cifar_image_label_pairs_.size(); num_rows_ = cifar_image_label_pairs_.size();
if (num_rows_ == 0) { if (num_rows_ == 0) {
std::string api = cifar_type_ == kCifar10 ? "Cifar10Dataset" : "Cifar100Dataset"; std::string api = cifar_type_ == kCifar10 ? "Cifar10Dataset" : "Cifar100Dataset";
RETURN_STATUS_UNEXPECTED("There is no valid data matching the dataset API " + api + RETURN_STATUS_UNEXPECTED("Invalid data, no valid data matching the dataset API " + api +
".Please check file path or dataset API validation first."); ". Please check file path or dataset API validation first.");
} }
cifar_raw_data_block_->Reset(); cifar_raw_data_block_->Reset();
return Status::OK(); return Status::OK();
@ -400,7 +407,9 @@ Status CifarOp::ParseCifarData() {
// Derived from RandomAccessOp // Derived from RandomAccessOp
Status CifarOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const { Status CifarOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
if (cls_ids == nullptr || !cls_ids->empty()) { if (cls_ids == nullptr || !cls_ids->empty()) {
RETURN_STATUS_UNEXPECTED("ImageLabelPair not set"); RETURN_STATUS_UNEXPECTED(
"Map for storaging image-index pair is nullptr or has been set in other place,"
"it must be empty before using GetClassIds.");
} }
for (uint64_t index = 0; index < cifar_image_label_pairs_.size(); ++index) { for (uint64_t index = 0; index < cifar_image_label_pairs_.size(); ++index) {
@ -424,7 +433,8 @@ Status CifarOp::CountTotalRows(const std::string &dir, const std::string &usage,
constexpr int64_t num_cifar10_records = 10000; constexpr int64_t num_cifar10_records = 10000;
for (auto &file : op->cifar_files_) { for (auto &file : op->cifar_files_) {
Path file_path(file); Path file_path(file);
CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(), "invalid file:" + file); CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
"Invalid file, failed to open cifar file: " + file);
std::string file_name = file_path.Basename(); std::string file_name = file_path.Basename();
if (op->usage_ == "train") { if (op->usage_ == "train") {
@ -447,7 +457,8 @@ Status CifarOp::CountTotalRows(const std::string &dir, const std::string &usage,
Path file_path(file); Path file_path(file);
std::string file_name = file_path.Basename(); std::string file_name = file_path.Basename();
CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(), "invalid file:" + file); CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
"Invalid file, failed to find cifar file: " + file);
if (op->usage_ == "train" && file_path.Basename().find("train") == std::string::npos) continue; if (op->usage_ == "train" && file_path.Basename().find("train") == std::string::npos) continue;
if (op->usage_ == "test" && file_path.Basename().find("test") == std::string::npos) continue; if (op->usage_ == "test" && file_path.Basename().find("test") == std::string::npos) continue;
@ -458,7 +469,7 @@ Status CifarOp::CountTotalRows(const std::string &dir, const std::string &usage,
num_cifar100_records += 50000; num_cifar100_records += 50000;
} }
std::ifstream in(file, std::ios::binary); std::ifstream in(file, std::ios::binary);
CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), file + " can not be opened."); CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open file: " + file);
} }
*count = num_cifar100_records; *count = num_cifar100_records;
return Status::OK(); return Status::OK();

@ -41,8 +41,13 @@ ClueOp::Builder::Builder()
Status ClueOp::Builder::ValidateInputs() const { Status ClueOp::Builder::ValidateInputs() const {
std::string err; std::string err;
err += builder_num_workers_ <= 0 ? "Number of parallel workers should be greater than 0\n" : ""; err += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
err += (builder_device_id_ >= builder_num_devices_ || builder_num_devices_ < 1) ? "Wrong sharding configs\n" : ""; std::to_string(builder_num_workers_) + ".\n"
: "";
err += (builder_device_id_ >= builder_num_devices_ || builder_num_devices_ < 1)
? "Invalid parameter, num_shard must be greater than shard_id and greater than 0, got num_shard: " +
std::to_string(builder_num_devices_) + ", shard_id: " + std::to_string(builder_device_id_) + ".\n"
: "";
return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err); return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err);
} }
@ -128,7 +133,7 @@ Status ClueOp::GetValue(const nlohmann::json &js, std::vector<std::string> key_c
if (cursor.find(key_chain[i]) != cursor.end()) { if (cursor.find(key_chain[i]) != cursor.end()) {
cursor = cursor[key_chain[i]]; cursor = cursor[key_chain[i]];
} else { } else {
RETURN_STATUS_UNEXPECTED("Failed to find key: " + key_chain[i]); RETURN_STATUS_UNEXPECTED("Invalid data, failed to find key: " + key_chain[i]);
} }
} }
std::string final_str = key_chain.back(); std::string final_str = key_chain.back();
@ -158,7 +163,7 @@ Status ClueOp::LoadFile(const std::string &file, const int64_t start_offset, con
const int32_t worker_id) { const int32_t worker_id) {
std::ifstream handle(file); std::ifstream handle(file);
if (!handle.is_open()) { if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Failed to open file " + file); RETURN_STATUS_UNEXPECTED("Invalid file, failed to open file: " + file);
} }
int64_t rows_each_buffer = 0; int64_t rows_each_buffer = 0;
@ -186,7 +191,7 @@ Status ClueOp::LoadFile(const std::string &file, const int64_t start_offset, con
js = nlohmann::json::parse(line); js = nlohmann::json::parse(line);
} catch (const std::exception &err) { } catch (const std::exception &err) {
// Catch any exception and convert to Status return code // Catch any exception and convert to Status return code
RETURN_STATUS_UNEXPECTED("Failed to load json file"); RETURN_STATUS_UNEXPECTED("Invalid file, failed to parse json file: " + line);
} }
int cols_count = cols_to_keyword_.size(); int cols_count = cols_to_keyword_.size();
TensorRow tRow(cols_count, nullptr); TensorRow tRow(cols_count, nullptr);
@ -474,7 +479,7 @@ Status ClueOp::CalculateNumRowsPerShard() {
} }
if (all_num_rows_ == 0) { if (all_num_rows_ == 0) {
RETURN_STATUS_UNEXPECTED( RETURN_STATUS_UNEXPECTED(
"There is no valid data matching the dataset API CLUEDataset. Please check file path or dataset API " "Invalid data, no valid data matching the dataset API CLUEDataset. Please check file path or dataset API "
"validation first."); "validation first.");
} }
@ -486,7 +491,7 @@ Status ClueOp::CalculateNumRowsPerShard() {
int64_t ClueOp::CountTotalRows(const std::string &file) { int64_t ClueOp::CountTotalRows(const std::string &file) {
std::ifstream handle(file); std::ifstream handle(file);
if (!handle.is_open()) { if (!handle.is_open()) {
MS_LOG(ERROR) << "Failed to open file: " << file; MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
return 0; return 0;
} }

@ -97,7 +97,7 @@ Status CocoOp::Builder::Build(std::shared_ptr<CocoOp> *ptr) {
ColDescriptor(std::string(kJsonAnnoArea), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); ColDescriptor(std::string(kJsonAnnoArea), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1)));
break; break;
default: default:
RETURN_STATUS_UNEXPECTED("Invalid task type"); RETURN_STATUS_UNEXPECTED("Invalid parameter, task type shoule be Detection, Stuff, Keypoint or Panoptic.");
} }
*ptr = std::make_shared<CocoOp>(builder_task_type_, builder_dir_, builder_file_, builder_num_workers_, *ptr = std::make_shared<CocoOp>(builder_task_type_, builder_dir_, builder_file_, builder_num_workers_,
builder_rows_per_buffer_, builder_op_connector_size_, builder_decode_, builder_rows_per_buffer_, builder_op_connector_size_, builder_decode_,
@ -109,9 +109,15 @@ Status CocoOp::Builder::SanityCheck() {
Path dir(builder_dir_); Path dir(builder_dir_);
Path file(builder_file_); Path file(builder_file_);
std::string err_msg; std::string err_msg;
err_msg += dir.IsDirectory() == false ? "Coco image folder path is invalid or not set\n" : ""; err_msg += dir.IsDirectory() == false
err_msg += file.Exists() == false ? "Coco annotation json path is invalid or not set\n" : ""; ? "Invalid parameter, Coco image folder path is invalid or not set, path: " + builder_dir_ + ".\n"
err_msg += builder_num_workers_ <= 0 ? "Num of parallel workers is set to 0 or negative\n" : ""; : "";
err_msg += file.Exists() == false
? "Invalid parameter, Coco annotation json path is invalid or not set, path: " + builder_dir_ + ".\n"
: "";
err_msg += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
@ -156,7 +162,8 @@ Status CocoOp::operator()() {
std::shared_ptr<Tensor> sample_ids; std::shared_ptr<Tensor> sample_ids;
RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0)); RETURN_IF_NOT_OK(sampler_buffer->GetTensor(&sample_ids, 0, 0));
if (sample_ids->type() != DataType(DataType::DE_INT64)) { if (sample_ids->type() != DataType(DataType::DE_INT64)) {
RETURN_STATUS_UNEXPECTED("Sampler Tensor isn't int64"); RETURN_STATUS_UNEXPECTED("Invalid parameter, data type of Sampler Tensor isn't int64, got " +
sample_ids->type().ToString());
} }
RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys)); RETURN_IF_NOT_OK(TraverseSampleIds(sample_ids, &keys));
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer));
@ -210,7 +217,10 @@ Status CocoOp::Reset() {
Status CocoOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) { Status CocoOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, TensorRow *trow) {
std::shared_ptr<Tensor> image, coordinate; std::shared_ptr<Tensor> image, coordinate;
auto itr = coordinate_map_.find(image_id); auto itr = coordinate_map_.find(image_id);
if (itr == coordinate_map_.end()) RETURN_STATUS_UNEXPECTED("Invalid image_id found :" + image_id); if (itr == coordinate_map_.end()) {
RETURN_STATUS_UNEXPECTED("Invalid data, image_id: " + image_id +
" in annotation node is not found in image node in json file.");
}
std::string kImageFile = image_folder_path_ + std::string("/") + image_id; std::string kImageFile = image_folder_path_ + std::string("/") + image_id;
RETURN_IF_NOT_OK(ReadImageToTensor(kImageFile, data_schema_->column(0), &image)); RETURN_IF_NOT_OK(ReadImageToTensor(kImageFile, data_schema_->column(0), &image));
@ -245,7 +255,7 @@ Status CocoOp::LoadTensorRow(row_id_type row_id, const std::string &image_id, Te
} else if (task_type_ == TaskType::Panoptic) { } else if (task_type_ == TaskType::Panoptic) {
RETURN_IF_NOT_OK(LoadMixTensorRow(row_id, image_id, image, coordinate, trow)); RETURN_IF_NOT_OK(LoadMixTensorRow(row_id, image_id, image, coordinate, trow));
} else { } else {
RETURN_STATUS_UNEXPECTED("Invalid task type."); RETURN_STATUS_UNEXPECTED("Invalid parameter, task type shoule be Detection, Stuff or Panoptic.");
} }
return Status::OK(); return Status::OK();
@ -264,7 +274,10 @@ Status CocoOp::LoadDetectionTensorRow(row_id_type row_id, const std::string &ima
std::vector<uint32_t> category_id_row; std::vector<uint32_t> category_id_row;
std::vector<uint32_t> iscrowd_row; std::vector<uint32_t> iscrowd_row;
auto itr_item = simple_item_map_.find(image_id); auto itr_item = simple_item_map_.find(image_id);
if (itr_item == simple_item_map_.end()) RETURN_STATUS_UNEXPECTED("Invalid image_id found :" + image_id); if (itr_item == simple_item_map_.end()) {
RETURN_STATUS_UNEXPECTED("Invalid data, image_id: " + image_id +
" in annotation node is not found in image node in json file.");
}
std::vector<uint32_t> annotation = itr_item->second; std::vector<uint32_t> annotation = itr_item->second;
for (int64_t i = 0; i < annotation.size(); i++) { for (int64_t i = 0; i < annotation.size(); i++) {
@ -293,7 +306,10 @@ Status CocoOp::LoadSimpleTensorRow(row_id_type row_id, const std::string &image_
std::shared_ptr<Tensor> item; std::shared_ptr<Tensor> item;
std::vector<uint32_t> item_queue; std::vector<uint32_t> item_queue;
auto itr_item = simple_item_map_.find(image_id); auto itr_item = simple_item_map_.find(image_id);
if (itr_item == simple_item_map_.end()) RETURN_STATUS_UNEXPECTED("Invalid image_id found :" + image_id); if (itr_item == simple_item_map_.end()) {
RETURN_STATUS_UNEXPECTED("Invalid data, image_id: " + image_id +
" in annotation node is not found in image node in json file.");
}
item_queue = itr_item->second; item_queue = itr_item->second;
std::vector<dsize_t> bbox_dim = {static_cast<dsize_t>(item_queue.size()), 1}; std::vector<dsize_t> bbox_dim = {static_cast<dsize_t>(item_queue.size()), 1};
@ -316,7 +332,10 @@ Status CocoOp::LoadMixTensorRow(row_id_type row_id, const std::string &image_id,
std::vector<uint32_t> iscrowd_row; std::vector<uint32_t> iscrowd_row;
std::vector<uint32_t> area_row; std::vector<uint32_t> area_row;
auto itr_item = simple_item_map_.find(image_id); auto itr_item = simple_item_map_.find(image_id);
if (itr_item == simple_item_map_.end()) RETURN_STATUS_UNEXPECTED("Invalid image_id found :" + image_id); if (itr_item == simple_item_map_.end()) {
RETURN_STATUS_UNEXPECTED("Invalid data, image_id: " + image_id +
" in annotation node is not found in image node in json file.");
}
std::vector<uint32_t> annotation = itr_item->second; std::vector<uint32_t> annotation = itr_item->second;
for (int64_t i = 0; i < annotation.size(); i++) { for (int64_t i = 0; i < annotation.size(); i++) {
@ -380,7 +399,7 @@ Status CocoOp::WorkerEntry(int32_t worker_id) {
template <typename T> template <typename T>
Status CocoOp::SearchNodeInJson(const nlohmann::json &input_tree, std::string node_name, T *output_node) { Status CocoOp::SearchNodeInJson(const nlohmann::json &input_tree, std::string node_name, T *output_node) {
auto node = input_tree.find(node_name); auto node = input_tree.find(node_name);
CHECK_FAIL_RETURN_UNEXPECTED(node != input_tree.end(), "Invalid node found in json : " + node_name); CHECK_FAIL_RETURN_UNEXPECTED(node != input_tree.end(), "Invalid data, invalid node found in json: " + node_name);
(*output_node) = *node; (*output_node) = *node;
return Status::OK(); return Status::OK();
} }
@ -406,8 +425,10 @@ Status CocoOp::ParseAnnotationIds() {
std::string file_name; std::string file_name;
RETURN_IF_NOT_OK(SearchNodeInJson(annotation, std::string(kJsonAnnoImageId), &image_id)); RETURN_IF_NOT_OK(SearchNodeInJson(annotation, std::string(kJsonAnnoImageId), &image_id));
auto itr_file = image_index_.find(image_id); auto itr_file = image_index_.find(image_id);
if (itr_file == image_index_.end()) if (itr_file == image_index_.end()) {
RETURN_STATUS_UNEXPECTED("Invalid image id of annotations : " + std::to_string(image_id)); RETURN_STATUS_UNEXPECTED("Invalid data, image_id: " + std::to_string(image_id) +
" in annotation node is not found in image node in json file.");
}
file_name = itr_file->second; file_name = itr_file->second;
switch (task_type_) { switch (task_type_) {
case TaskType::Detection: case TaskType::Detection:
@ -426,7 +447,7 @@ Status CocoOp::ParseAnnotationIds() {
RETURN_IF_NOT_OK(PanopticColumnLoad(annotation, file_name, image_id)); RETURN_IF_NOT_OK(PanopticColumnLoad(annotation, file_name, image_id));
break; break;
default: default:
RETURN_STATUS_UNEXPECTED("Invalid task type"); RETURN_STATUS_UNEXPECTED("Invalid parameter, task type shoule be Detection, Stuff, Keypoint or Panoptic.");
} }
} }
for (auto img : image_que) { for (auto img : image_que) {
@ -438,7 +459,7 @@ Status CocoOp::ParseAnnotationIds() {
Status CocoOp::ImageColumnLoad(const nlohmann::json &image_tree, std::vector<std::string> *image_vec) { Status CocoOp::ImageColumnLoad(const nlohmann::json &image_tree, std::vector<std::string> *image_vec) {
if (image_tree.size() == 0) { if (image_tree.size() == 0) {
RETURN_STATUS_UNEXPECTED("No images found in " + annotation_path_); RETURN_STATUS_UNEXPECTED("Invalid data, no \"image\" node found in json file: " + annotation_path_);
} }
for (auto img : image_tree) { for (auto img : image_tree) {
std::string file_name; std::string file_name;
@ -461,7 +482,8 @@ Status CocoOp::DetectionColumnLoad(const nlohmann::json &annotation_tree, const
RETURN_IF_NOT_OK(SearchNodeInJson(annotation_tree, std::string(kJsonAnnoCategoryId), &category_id)); RETURN_IF_NOT_OK(SearchNodeInJson(annotation_tree, std::string(kJsonAnnoCategoryId), &category_id));
auto search_category = category_set_.find(category_id); auto search_category = category_set_.find(category_id);
if (search_category == category_set_.end()) if (search_category == category_set_.end())
RETURN_STATUS_UNEXPECTED("category_id can't find in categories where category_id: " + std::to_string(category_id)); RETURN_STATUS_UNEXPECTED("Invalid data, category_id can't find in categories where category_id: " +
std::to_string(category_id));
auto node_iscrowd = annotation_tree.find(kJsonAnnoIscrowd); auto node_iscrowd = annotation_tree.find(kJsonAnnoIscrowd);
if (node_iscrowd != annotation_tree.end()) iscrowd = *node_iscrowd; if (node_iscrowd != annotation_tree.end()) iscrowd = *node_iscrowd;
bbox.insert(bbox.end(), node_bbox.begin(), node_bbox.end()); bbox.insert(bbox.end(), node_bbox.begin(), node_bbox.end());
@ -498,11 +520,12 @@ Status CocoOp::KeypointColumnLoad(const nlohmann::json &annotation_tree, const s
const int32_t &unique_id) { const int32_t &unique_id) {
auto itr_num_keypoint = annotation_tree.find(kJsonAnnoNumKeypoints); auto itr_num_keypoint = annotation_tree.find(kJsonAnnoNumKeypoints);
if (itr_num_keypoint == annotation_tree.end()) if (itr_num_keypoint == annotation_tree.end())
RETURN_STATUS_UNEXPECTED("No num_keypoint found in annotations where id: " + std::to_string(unique_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no num_keypoint found in annotations where id: " +
std::to_string(unique_id));
simple_item_map_[image_file].push_back(*itr_num_keypoint); simple_item_map_[image_file].push_back(*itr_num_keypoint);
auto itr_keypoint = annotation_tree.find(kJsonAnnoKeypoints); auto itr_keypoint = annotation_tree.find(kJsonAnnoKeypoints);
if (itr_keypoint == annotation_tree.end()) if (itr_keypoint == annotation_tree.end())
RETURN_STATUS_UNEXPECTED("No keypoint found in annotations where id: " + std::to_string(unique_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no keypoint found in annotations where id: " + std::to_string(unique_id));
coordinate_map_[image_file].push_back(*itr_keypoint); coordinate_map_[image_file].push_back(*itr_keypoint);
return Status::OK(); return Status::OK();
} }
@ -511,27 +534,31 @@ Status CocoOp::PanopticColumnLoad(const nlohmann::json &annotation_tree, const s
const int32_t &image_id) { const int32_t &image_id) {
auto itr_segments = annotation_tree.find(kJsonAnnoSegmentsInfo); auto itr_segments = annotation_tree.find(kJsonAnnoSegmentsInfo);
if (itr_segments == annotation_tree.end()) if (itr_segments == annotation_tree.end())
RETURN_STATUS_UNEXPECTED("No segments_info found in annotations where image_id: " + std::to_string(image_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no segments_info found in annotations where image_id: " +
std::to_string(image_id));
for (auto info : *itr_segments) { for (auto info : *itr_segments) {
std::vector<float> bbox; std::vector<float> bbox;
uint32_t category_id = 0; uint32_t category_id = 0;
auto itr_bbox = info.find(kJsonAnnoBbox); auto itr_bbox = info.find(kJsonAnnoBbox);
if (itr_bbox == info.end()) if (itr_bbox == info.end())
RETURN_STATUS_UNEXPECTED("No bbox found in segments_info where image_id: " + std::to_string(image_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no bbox found in segments_info where image_id: " +
std::to_string(image_id));
bbox.insert(bbox.end(), itr_bbox->begin(), itr_bbox->end()); bbox.insert(bbox.end(), itr_bbox->begin(), itr_bbox->end());
coordinate_map_[image_file].push_back(bbox); coordinate_map_[image_file].push_back(bbox);
RETURN_IF_NOT_OK(SearchNodeInJson(info, std::string(kJsonAnnoCategoryId), &category_id)); RETURN_IF_NOT_OK(SearchNodeInJson(info, std::string(kJsonAnnoCategoryId), &category_id));
auto search_category = category_set_.find(category_id); auto search_category = category_set_.find(category_id);
if (search_category == category_set_.end()) if (search_category == category_set_.end())
RETURN_STATUS_UNEXPECTED("category_id can't find in categories where category_id: " + RETURN_STATUS_UNEXPECTED("Invalid data, category_id can't find in categories where category_id: " +
std::to_string(category_id)); std::to_string(category_id));
auto itr_iscrowd = info.find(kJsonAnnoIscrowd); auto itr_iscrowd = info.find(kJsonAnnoIscrowd);
if (itr_iscrowd == info.end()) if (itr_iscrowd == info.end())
RETURN_STATUS_UNEXPECTED("No iscrowd found in segments_info where image_id: " + std::to_string(image_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no iscrowd found in segments_info where image_id: " +
std::to_string(image_id));
auto itr_area = info.find(kJsonAnnoArea); auto itr_area = info.find(kJsonAnnoArea);
if (itr_area == info.end()) if (itr_area == info.end())
RETURN_STATUS_UNEXPECTED("No area found in segments_info where image_id: " + std::to_string(image_id)); RETURN_STATUS_UNEXPECTED("Invalid data, no area found in segments_info where image_id: " +
std::to_string(image_id));
simple_item_map_[image_file].push_back(category_id); simple_item_map_[image_file].push_back(category_id);
simple_item_map_[image_file].push_back(*itr_iscrowd); simple_item_map_[image_file].push_back(*itr_iscrowd);
simple_item_map_[image_file].push_back(*itr_area); simple_item_map_[image_file].push_back(*itr_area);
@ -540,26 +567,31 @@ Status CocoOp::PanopticColumnLoad(const nlohmann::json &annotation_tree, const s
} }
Status CocoOp::CategoriesColumnLoad(const nlohmann::json &categories_tree) { Status CocoOp::CategoriesColumnLoad(const nlohmann::json &categories_tree) {
if (categories_tree.size() == 0) RETURN_STATUS_UNEXPECTED("No categories found in " + annotation_path_); if (categories_tree.size() == 0) {
RETURN_STATUS_UNEXPECTED("Invalid file, no categories found in annotation_path: " + annotation_path_);
}
for (auto category : categories_tree) { for (auto category : categories_tree) {
int32_t id = 0; int32_t id = 0;
std::string name; std::string name;
std::vector<int32_t> label_info; std::vector<int32_t> label_info;
auto itr_id = category.find(kJsonId); auto itr_id = category.find(kJsonId);
if (itr_id == category.end()) RETURN_STATUS_UNEXPECTED("No id found in categories of " + annotation_path_); if (itr_id == category.end()) {
RETURN_STATUS_UNEXPECTED("Invalid data, no json id found in categories of " + annotation_path_);
}
id = *itr_id; id = *itr_id;
label_info.push_back(id); label_info.push_back(id);
category_set_.insert(id); category_set_.insert(id);
auto itr_name = category.find(kJsonCategoriesName); auto itr_name = category.find(kJsonCategoriesName);
CHECK_FAIL_RETURN_UNEXPECTED(itr_name != category.end(), CHECK_FAIL_RETURN_UNEXPECTED(
"No name found in categories where id: " + std::to_string(id)); itr_name != category.end(),
"Invalid data, no categories name found in categories where id: " + std::to_string(id));
name = *itr_name; name = *itr_name;
if (task_type_ == TaskType::Panoptic) { if (task_type_ == TaskType::Panoptic) {
auto itr_isthing = category.find(kJsonCategoriesIsthing); auto itr_isthing = category.find(kJsonCategoriesIsthing);
CHECK_FAIL_RETURN_UNEXPECTED(itr_isthing != category.end(), CHECK_FAIL_RETURN_UNEXPECTED(itr_isthing != category.end(),
"No isthing found in categories of " + annotation_path_); "Invalid data, no isthing found in categories of " + annotation_path_);
label_info.push_back(*itr_isthing); label_info.push_back(*itr_isthing);
} }
label_index_.emplace_back(std::make_pair(name, label_info)); label_index_.emplace_back(std::make_pair(name, label_info));
@ -574,7 +606,7 @@ Status CocoOp::InitSampler() {
Status CocoOp::LaunchThreadsAndInitOp() { Status CocoOp::LaunchThreadsAndInitOp() {
if (tree_ == nullptr) { if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("tree_ not set"); RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
} }
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(wp_.Register(tree_->AllTasks()));
@ -590,7 +622,7 @@ Status CocoOp::ReadImageToTensor(const std::string &path, const ColDescriptor &c
if (decode_ == true) { if (decode_ == true) {
Status rc = Decode(*tensor, tensor); Status rc = Decode(*tensor, tensor);
CHECK_FAIL_RETURN_UNEXPECTED(rc.IsOk(), "fail to decode file: " + path); CHECK_FAIL_RETURN_UNEXPECTED(rc.IsOk(), "Invalid data, failed to decode image: " + path);
} }
return Status::OK(); return Status::OK();
} }

@ -37,8 +37,13 @@ CsvOp::Builder::Builder()
Status CsvOp::Builder::ValidateInputs() const { Status CsvOp::Builder::ValidateInputs() const {
std::string err; std::string err;
err += builder_num_workers_ <= 0 ? "Number of parallel workers should be greater than 0\n" : ""; err += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
err += (builder_device_id_ >= builder_num_devices_ || builder_num_devices_ < 1) ? "Wrong sharding configs\n" : ""; std::to_string(builder_num_workers_) + ".\n"
: "";
err += (builder_device_id_ >= builder_num_devices_ || builder_num_devices_ < 1)
? "Invalid parameter, num_shard must be greater than shard_id and greater than 0, got num_shard: " +
std::to_string(builder_num_devices_) + ", shard_id: " + std::to_string(builder_device_id_) + ".\n"
: "";
return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err); return err.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err);
} }
@ -501,16 +506,17 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons
// int to receive its return value. // int to receive its return value.
int chr = ifs.get(); int chr = ifs.get();
if (csv_parser.ProcessMessage(chr) != 0) { if (csv_parser.ProcessMessage(chr) != 0) {
RETURN_STATUS_UNEXPECTED("Failed to parse file " + file + ":" + std::to_string(csv_parser.GetTotalRows() + 1) + RETURN_STATUS_UNEXPECTED("Invalid file, failed to parse file: " + file + ":" +
". error message: " + csv_parser.GetErrorMessage()); std::to_string(csv_parser.GetTotalRows() + 1) +
". Error message: " + csv_parser.GetErrorMessage());
} }
} }
} catch (std::invalid_argument &ia) { } catch (std::invalid_argument &ia) {
std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1); std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1);
RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", type does not match"); RETURN_STATUS_UNEXPECTED("Invalid data, " + file + ":" + err_row + ", type does not match.");
} catch (std::out_of_range &oor) { } catch (std::out_of_range &oor) {
std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1); std::string err_row = std::to_string(csv_parser.GetTotalRows() + 1);
RETURN_STATUS_UNEXPECTED(file + ":" + err_row + ", out of range"); RETURN_STATUS_UNEXPECTED("Invalid data, " + file + ":" + err_row + ", out of range.");
} }
return Status::OK(); return Status::OK();
} }
@ -771,7 +777,7 @@ Status CsvOp::CalculateNumRowsPerShard() {
} }
if (all_num_rows_ == 0) { if (all_num_rows_ == 0) {
RETURN_STATUS_UNEXPECTED( RETURN_STATUS_UNEXPECTED(
"There is no valid data matching the dataset API CsvDataset. Please check file path or CSV format " "Invalid data, no valid data matching the dataset API CsvDataset. Please check file path or CSV format "
"validation first."); "validation first.");
} }
@ -849,7 +855,7 @@ Status CsvOp::ComputeColMap() {
if (column_name_id_map_.find(col_names[i]) == column_name_id_map_.end()) { if (column_name_id_map_.find(col_names[i]) == column_name_id_map_.end()) {
column_name_id_map_[col_names[i]] = i; column_name_id_map_[col_names[i]] = i;
} else { } else {
RETURN_STATUS_UNEXPECTED("Duplicate column names are not allowed"); RETURN_STATUS_UNEXPECTED("Invalid parameter, duplicate column names are not allowed: " + col_names[i]);
} }
} }
} else { } else {
@ -857,7 +863,8 @@ Status CsvOp::ComputeColMap() {
if (column_name_id_map_.find(column_name_list_[i]) == column_name_id_map_.end()) { if (column_name_id_map_.find(column_name_list_[i]) == column_name_id_map_.end()) {
column_name_id_map_[column_name_list_[i]] = i; column_name_id_map_[column_name_list_[i]] = i;
} else { } else {
RETURN_STATUS_UNEXPECTED("Duplicate column names are not allowed"); RETURN_STATUS_UNEXPECTED("Invalid parameter, duplicate column names are not allowed: " +
column_name_list_[i]);
} }
} }
} }
@ -870,7 +877,10 @@ Status CsvOp::ComputeColMap() {
} }
} }
if (column_default_list_.size() != column_name_id_map_.size()) { if (column_default_list_.size() != column_name_id_map_.size()) {
RETURN_STATUS_UNEXPECTED("The number of column names does not match the column defaults"); RETURN_STATUS_UNEXPECTED(
"Invalid parameter, the number of column names does not match the column defaults, column_default_list: " +
std::to_string(column_default_list_.size()) +
", column_name_id_map: " + std::to_string(column_name_id_map_.size()));
} }
return Status::OK(); return Status::OK();
} }

@ -112,26 +112,29 @@ Status GeneratorOp::Init() {
Status GeneratorOp::PyRowToTensorRow(py::object py_data, TensorRow *tensor_row) { Status GeneratorOp::PyRowToTensorRow(py::object py_data, TensorRow *tensor_row) {
if (!py::isinstance<py::tuple>(py_data)) { if (!py::isinstance<py::tuple>(py_data)) {
return Status(StatusCode::kPyFuncException, __LINE__, __FILE__, "Generator should return a tuple of numpy arrays."); return Status(StatusCode::kPyFuncException, __LINE__, __FILE__,
"Invalid parameter, Generator should return a tuple of numpy arrays.");
} }
py::tuple py_row = py_data.cast<py::tuple>(); py::tuple py_row = py_data.cast<py::tuple>();
// Check if returned number of columns matches with column names // Check if returned number of columns matches with column names
if (py_row.size() != column_names_.size()) { if (py_row.size() != column_names_.size()) {
return Status(StatusCode::kPyFuncException, __LINE__, __FILE__, return Status(
"Generator should return same number of numpy arrays as specified in column names."); StatusCode::kPyFuncException, __LINE__, __FILE__,
"Invalid parameter, Generator should return same number of numpy arrays as specified in column names.");
} }
// Iterate over two containers simultaneously for memory copy // Iterate over two containers simultaneously for memory copy
for (int i = 0; i < py_row.size(); ++i) { for (int i = 0; i < py_row.size(); ++i) {
py::object ret_py_ele = py_row[i]; py::object ret_py_ele = py_row[i];
if (!py::isinstance<py::array>(ret_py_ele)) { if (!py::isinstance<py::array>(ret_py_ele)) {
return Status(StatusCode::kPyFuncException, __LINE__, __FILE__, return Status(StatusCode::kPyFuncException, __LINE__, __FILE__,
"Generator should return a tuple of numpy arrays."); "Invalid parameter, Generator should return a tuple of numpy arrays.");
} }
std::shared_ptr<Tensor> tensor; std::shared_ptr<Tensor> tensor;
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &tensor)); RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &tensor));
if ((!column_types_.empty()) && (column_types_[i] != DataType::DE_UNKNOWN) && if ((!column_types_.empty()) && (column_types_[i] != DataType::DE_UNKNOWN) &&
(column_types_[i] != tensor->type())) { (column_types_[i] != tensor->type())) {
return Status(StatusCode::kPyFuncException, __LINE__, __FILE__, "Generator type check failed."); return Status(StatusCode::kPyFuncException, __LINE__, __FILE__,
"Invalid parameter, input column type is not same with output tensor type.");
} }
tensor_row->push_back(tensor); tensor_row->push_back(tensor);
} }

@ -56,8 +56,12 @@ Status ImageFolderOp::Builder::Build(std::shared_ptr<ImageFolderOp> *ptr) {
Status ImageFolderOp::Builder::SanityCheck() { Status ImageFolderOp::Builder::SanityCheck() {
Path dir(builder_dir_); Path dir(builder_dir_);
std::string err_msg; std::string err_msg;
err_msg += dir.IsDirectory() == false ? "ImageFolder path is invalid or not set\n" : ""; err_msg += dir.IsDirectory() == false
err_msg += builder_num_workers_ <= 0 ? "Num of parallel workers is set to 0\n" : ""; ? "Invalid parameter, ImageFolder path is invalid or not set, path: " + builder_dir_ + ".\n"
: "";
err_msg += builder_num_workers_ <= 0 ? "Invalid parameter, num_parallel_workers must be greater than 0, but got " +
std::to_string(builder_num_workers_) + ".\n"
: "";
return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg); return err_msg.empty() ? Status::OK() : Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, err_msg);
} }
@ -113,7 +117,7 @@ Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) {
num_rows_ = image_label_pairs_.size(); num_rows_ = image_label_pairs_.size();
if (num_rows_ == 0) { if (num_rows_ == 0) {
RETURN_STATUS_UNEXPECTED( RETURN_STATUS_UNEXPECTED(
"There is no valid data matching the dataset API ImageFolderDataset. Please check file path or dataset " "Invalid data, no valid data matching the dataset API ImageFolderDataset. Please check file path or dataset "
"API validation first."); "API validation first.");
} }
// free memory of two queues used for pre-scan // free memory of two queues used for pre-scan
@ -207,7 +211,7 @@ Status ImageFolderOp::LoadTensorRow(row_id_type row_id, ImageLabelPair pairPtr,
if (decode_ == true) { if (decode_ == true) {
Status rc = Decode(image, &image); Status rc = Decode(image, &image);
if (rc.IsError()) { if (rc.IsError()) {
std::string err = "Fail to decode image:" + folder_path_ + (pairPtr->first); std::string err = "Invalid data, failed to decode image: " + folder_path_ + (pairPtr->first);
RETURN_STATUS_UNEXPECTED(err); RETURN_STATUS_UNEXPECTED(err);
} }
} }
@ -258,7 +262,13 @@ Status ImageFolderOp::InitSampler() {
// Derived from RandomAccessOp // Derived from RandomAccessOp
Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const { Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) { if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {
RETURN_STATUS_UNEXPECTED("ImageLabelPair not set"); if (image_label_pairs_.empty()) {
RETURN_STATUS_UNEXPECTED("No images found in dataset, please check if Op read images successfully or not.");
} else {
RETURN_STATUS_UNEXPECTED(
"Map for storaging image-index pair is nullptr or has been set in other place,"
"it must be empty before using GetClassIds.");
}
} }
for (size_t i = 0; i < image_label_pairs_.size(); ++i) { for (size_t i = 0; i < image_label_pairs_.size(); ++i) {
(*cls_ids)[image_label_pairs_[i]->second].push_back(i); (*cls_ids)[image_label_pairs_[i]->second].push_back(i);
@ -286,7 +296,7 @@ Status ImageFolderOp::PrescanWorkerEntry(int32_t worker_id) {
Path folder(folder_path_ + folder_name); Path folder(folder_path_ + folder_name);
std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder); std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
if (folder.Exists() == false || dirItr == nullptr) { if (folder.Exists() == false || dirItr == nullptr) {
RETURN_STATUS_UNEXPECTED("Error unable to open: " + folder_name); RETURN_STATUS_UNEXPECTED("Invalid file, failed to open folder: " + folder_name);
} }
std::set<std::string> imgs; // use this for ordering std::set<std::string> imgs; // use this for ordering
while (dirItr->hasNext()) { while (dirItr->hasNext()) {
@ -335,7 +345,7 @@ Status ImageFolderOp::startAsyncWalk() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
Path dir(folder_path_); Path dir(folder_path_);
if (dir.Exists() == false || dir.IsDirectory() == false) { if (dir.Exists() == false || dir.IsDirectory() == false) {
RETURN_STATUS_UNEXPECTED("Error unable to open: " + folder_path_); RETURN_STATUS_UNEXPECTED("Invalid parameter, failed to open image folder: " + folder_path_);
} }
dirname_offset_ = folder_path_.length(); dirname_offset_ = folder_path_.length();
RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir)); RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir));
@ -348,7 +358,9 @@ Status ImageFolderOp::startAsyncWalk() {
} }
Status ImageFolderOp::LaunchThreadsAndInitOp() { Status ImageFolderOp::LaunchThreadsAndInitOp() {
RETURN_UNEXPECTED_IF_NULL(tree_); if (tree_ == nullptr) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
}
// Registers QueueList and individual Queues for interrupt services // Registers QueueList and individual Queues for interrupt services
RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
@ -375,9 +387,15 @@ Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const std::se
Path dir(path); Path dir(path);
std::string err_msg = ""; std::string err_msg = "";
int64_t row_cnt = 0; int64_t row_cnt = 0;
err_msg += (dir.Exists() == false || dir.IsDirectory() == false) ? "unable to open dir " + path : ""; err_msg += (dir.Exists() == false || dir.IsDirectory() == false)
err_msg += (num_classes == nullptr || num_rows == nullptr) ? "num_class/num_rows is null\n" : ""; ? "Invalid parameter, image folde path is invalid or not set, path: " + path
err_msg += (dev_id >= num_dev || num_dev <= 0) ? "invalid sharding config\n" : ""; : "";
err_msg +=
(num_classes == nullptr || num_rows == nullptr) ? "Invalid parameter, num_class or num_rows cannot be null.\n" : "";
err_msg += (dev_id >= num_dev || num_dev <= 0)
? "Invalid parameter, num_shard must be greater than shard_id and greater than 0, got num_shard: " +
std::to_string(num_dev) + ", shard_id: " + std::to_string(dev_id) + ".\n"
: "";
if (err_msg.empty() == false) { if (err_msg.empty() == false) {
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save