diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index cb7b5f287a..ee27a7f2f1 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -1548,7 +1548,7 @@ std::vector> TFRecordDataset::Build() { bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); - // Create and initalize TFReaderOp + // Create and initialize TFReaderOp std::shared_ptr tf_reader_op = std::make_shared( num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, std::move(data_schema), connector_que_size_, columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, nullptr); @@ -1672,11 +1672,14 @@ std::vector> BatchDataset::Build() { #ifdef ENABLE_PYTHON py::function noop; node_ops.push_back(std::make_shared(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, - cols_to_map_, noop, noop, pad_map_)); + cols_to_map_, cols_to_map_, noop, noop, pad_map_)); #else node_ops.push_back(std::make_shared(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, cols_to_map_, pad_map_)); #endif + + // Until py::function is implemented for C++ API, there is no need for a project op to be inserted after batch + // because project is only needed when batch op performs per_batch_map. This per_batch_map is a pyfunc return node_ops; } @@ -1685,7 +1688,10 @@ bool BatchDataset::ValidateParams() { MS_LOG(ERROR) << "Batch: batch_size should be positive integer, but got: " << batch_size_; return false; } - + if (!cols_to_map_.empty()) { + MS_LOG(ERROR) << "cols_to_map functionality is not implemented in C++; this should be left empty."; + return false; + } return true; } diff --git a/mindspore/ccsrc/minddata/dataset/api/python/de_pipeline.cc b/mindspore/ccsrc/minddata/dataset/api/python/de_pipeline.cc index 6bfdf0e4ff..5ae2148263 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/de_pipeline.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/de_pipeline.cc @@ -767,7 +767,7 @@ Status DEPipeline::ParseMapOp(const py::dict &args, std::shared_ptr * tensor_op_list.push_back(tensor_op); } } - if (tensor_op_list.empty()) RETURN_STATUS_UNEXPECTED("Error: tensor_op is invalid or not set."); + CHECK_FAIL_RETURN_UNEXPECTED(!tensor_op_list.empty(), "Error: tensor_op is invalid or not set."); (void)map_builder.SetTensorFuncs(std::move(tensor_op_list)); } else if (key == "cache") { cache_client = value.cast>(); @@ -913,6 +913,7 @@ Status DEPipeline::ParseGeneratorOp(const py::dict &args, std::shared_ptr *top, std::shared_ptr *bottom) { std::shared_ptr builder; + std::vector project_columns; if (py::isinstance(args["batch_size"])) { batch_size_ = ToInt(args["batch_size"]); CHECK_FAIL_RETURN_UNEXPECTED(batch_size_ > 0, "Error: batch_size is invalid."); @@ -921,10 +922,8 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr builder = std::make_shared(1); (void)builder->SetBatchSizeFunc(args["batch_size"].cast()); } else { - std::string err_msg = "Error: batch_size is neither an Integer nor a python function"; - RETURN_STATUS_UNEXPECTED(err_msg); + RETURN_STATUS_UNEXPECTED("Error: batch_size is neither an Integer nor a python function."); } - for (auto arg : args) { std::string key = py::str(arg.first); py::handle value = arg.second; @@ -936,7 +935,11 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr } else if (key == "per_batch_map") { (void)builder->SetBatchMapFunc(value.cast()); } else if (key == "input_columns") { - (void)builder->SetColumnsToMap(ToStringVector(value)); + (void)builder->SetInColNames(ToStringVector(value)); + } else if (key == "output_columns") { + (void)builder->SetOutColNames(ToStringVector(value)); + } else if (key == "column_order") { + project_columns = ToStringVector(value); } else if (key == "pad_info") { PadInfo pad_info; RETURN_IF_NOT_OK(ParsePadInfo(value, &pad_info)); @@ -945,9 +948,21 @@ Status DEPipeline::ParseBatchOp(const py::dict &args, std::shared_ptr } } - std::shared_ptr op; - RETURN_IF_NOT_OK(builder->Build(&op)); - *top = op; + std::shared_ptr batch_op; + RETURN_IF_NOT_OK(builder->Build(&batch_op)); + *top = batch_op; + + // Add a project op over top of the batch if the user wanted to reposition the columns after per_batch_map + if (!project_columns.empty()) { + ProjectOp::Builder proj_builder(project_columns); + std::shared_ptr proj_op; + RETURN_IF_NOT_OK(proj_builder.Build(&proj_op)); + RETURN_IF_NOT_OK(tree_->AssociateNode(batch_op)); + RETURN_IF_NOT_OK(tree_->AssociateNode(proj_op)); + RETURN_IF_NOT_OK(proj_op->AddChild(batch_op)); + *top = proj_op; + *bottom = batch_op; + } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc index 75aa5e8844..f8841eef03 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc @@ -95,7 +95,7 @@ Status IteratorBase::GetNextAsOrderedPair(std::vector= 0, "column id out of bounds."); column_order_[ind] = std::make_pair(itr.first, ind); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index 26a077fdd3..d7049ba018 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -40,8 +40,8 @@ Status BatchOp::Builder::Build(std::shared_ptr *ptr) { RETURN_IF_NOT_OK(SanityCheck()); #ifdef ENABLE_PYTHON *ptr = std::make_shared(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_, - builder_num_workers_, builder_cols_to_map_, builder_batch_size_func_, - builder_batch_map_func_, builder_pad_map_); + builder_num_workers_, builder_in_names_, builder_out_names_, + builder_batch_size_func_, builder_batch_map_func_, builder_pad_map_); #else *ptr = std::make_shared(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_, builder_num_workers_, builder_cols_to_map_, builder_pad_map_); @@ -65,18 +65,20 @@ Status BatchOp::Builder::SanityCheck() { #ifdef ENABLE_PYTHON BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers, - const std::vector &cols_to_map, py::function batch_size_func, py::function batch_map_func, - PadInfo pad_map) + const std::vector &in_col, const std::vector &out_col, + py::function batch_size_func, py::function batch_map_func, PadInfo pad_map) : ParallelOp(num_workers, op_queue_size), start_batch_size_(batch_size), drop_(drop), pad_(pad), - pyfunc_column_names_(cols_to_map), + in_col_names_(in_col), + out_col_names_(out_col), batch_size_func_(batch_size_func), batch_map_func_(batch_map_func), pad_info_(pad_map) { worker_queues_.Init(num_workers, op_queue_size); } +// if PYTHON is disabled. per_batch_map can't be used #else BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers, const std::vector &cols_to_map, PadInfo pad_map) @@ -236,7 +238,7 @@ Status BatchOp::MakeBatchedBuffer(std::pair, CBatc std::unique_ptr *db) { RETURN_UNEXPECTED_IF_NULL(table_pair.first); #ifdef ENABLE_PYTHON - if (!pyfunc_column_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc + if (!in_col_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc #endif if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed (*db) = std::make_unique(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone); @@ -264,33 +266,40 @@ Status BatchOp::EoeReceived(int32_t) { #ifdef ENABLE_PYTHON Status BatchOp::MapColumns(std::pair, CBatchInfo> *table_pair) { - TensorBatchTable input_table; - input_table.reserve(pyfunc_column_names_.size()); - for (std::string col_name : pyfunc_column_names_) { - if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) { - RETURN_STATUS_UNEXPECTED("Invalid parameter, column name: '" + col_name + "' does not exist.\n"); - } - TensorBatch tensor_batch; - tensor_batch.reserve(table_pair->first->size()); - size_t col_idx = static_cast(column_name_id_map_[col_name]); - for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) { - tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx])); + std::unique_ptr in_q_table = std::move(table_pair->first); + size_t num_rows = in_q_table->size(); + auto out_q_table = std::make_unique(num_rows, TensorRow(column_name_id_map_.size(), nullptr)); + TensorTable in_cols(in_col_names_.size(), TensorRow(num_rows, nullptr)), out_cols; + + std::unordered_map in_col_name_id; // name of columns that need to be fed to per-batch_map + for (size_t i = 0; i < in_col_names_.size(); i++) in_col_name_id.insert({in_col_names_[i], i}); + + for (const auto &itr : child_map_) { + auto col_itr = in_col_name_id.find(itr.first); + if (col_itr != in_col_name_id.end()) { // col needs to be prepared for per_batch_map + for (size_t i = 0; i < num_rows; i++) { + in_cols[col_itr->second][i] = std::move((*in_q_table)[i][itr.second]); + } + } else { // col needs to be placed into the out table + size_t col_id = column_name_id_map_[itr.first]; + for (size_t i = 0; i < num_rows; i++) { + (*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]); + } } - input_table.push_back(std::move(tensor_batch)); } - // Perform batch map - TensorBatchTable output_table; - RETURN_IF_NOT_OK(InvokeBatchMapFunc(&input_table, &output_table, table_pair->second)); + in_q_table.reset(); // release the input table + RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second)); - // Write back to TensorQTable - for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) { - size_t col_idx = static_cast(column_name_id_map_[pyfunc_column_names_[input_idx]]); + for (size_t i = 0; i < out_cols.size(); i++) { + size_t col_id = column_name_id_map_[out_col_names_[i]]; size_t row_id = 0; - for (TensorRow &row : *(table_pair->first)) { - row[col_idx] = std::move(output_table[input_idx][row_id++]); + for (auto &t_row : *out_q_table) { + t_row[col_id] = out_cols[i][row_id++]; } } + + table_pair->first = std::move(out_q_table); return Status::OK(); } #endif @@ -333,7 +342,7 @@ Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) { return Status(StatusCode::kOK, "Batch size func call succeed"); } -Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *output, CBatchInfo info) { +Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) { { // Acquire Python GIL py::gil_scoped_acquire gil_acquire; @@ -357,11 +366,10 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou py::object ret_py_obj = batch_map_func_(*input_args); // Parse batch map return value py::tuple ret_tuple = py::cast(ret_py_obj); - if (ret_tuple.size() != pyfunc_column_names_.size() || !py::isinstance(ret_tuple)) { - return Status(StatusCode::kPyFuncException, "Invalid parameter, batch map function should return a tuple."); - } + CHECK_FAIL_RETURN_UNEXPECTED(py::isinstance(ret_tuple), "Batch map function should return a tuple"); + CHECK_FAIL_RETURN_UNEXPECTED(ret_tuple.size() == out_col_names_.size(), "Incorrect number of columns returned."); for (size_t i = 0; i < ret_tuple.size(); i++) { - TensorBatch output_batch; + TensorRow output_batch; py::list output_list = py::cast(ret_tuple[i]); for (size_t j = 0; j < output_list.size(); j++) { std::shared_ptr out; @@ -377,7 +385,7 @@ Status BatchOp::InvokeBatchMapFunc(TensorBatchTable *input, TensorBatchTable *ou "Invalid parameter, batch map function should return a tuple of list of numpy array."); } } - return Status(StatusCode::kOK); + return Status::OK(); } #endif @@ -386,7 +394,7 @@ Status BatchOp::PadColumns(std::unique_ptr *table, const PadInfo & RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future CHECK_FAIL_RETURN_UNEXPECTED( (*table)->front().size() == column_name_id_map.size(), - "Invaid parameter, size of column_name_id_map must be equal to num of data columns. map size: " + + "Invalid parameter, size of column_name_id_map must be equal to num of data columns. map size: " + std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size())); std::vector> pad_vals(column_name_id_map.size(), 0); // value to pad each column's tensor with, default 0 @@ -468,5 +476,57 @@ Status BatchOp::Accept(NodePass *p, bool *modified) { return p->RunOnNode(shared_from_base(), modified); } +Status BatchOp::ComputeColMap() { + CHECK_FAIL_RETURN_UNEXPECTED(child_.size() == 1, + "Batch has " + std::to_string(child_.size()) + " child/children, expects only 1 child."); + CHECK_FAIL_RETURN_UNEXPECTED(!(child_[0]->column_name_id_map().empty()), "BatchOp child map is empty."); + + if (in_col_names_.empty()) { // if per_batch_map is not set, do not need to deal with out_col_names + column_name_id_map_ = child_[0]->column_name_id_map(); + return Status::OK(); + } + + // from this point onward, per_batch_map is needed, therefore, child_map_ must be set + child_map_ = child_[0]->column_name_id_map(); + + // following logic deals with per_batch_map + bool col_name_flag = (out_col_names_.empty() || out_col_names_ == in_col_names_); // true if col name is unchanged + + // column names are unchanged + if (col_name_flag) { + if (out_col_names_.empty()) out_col_names_ = in_col_names_; + column_name_id_map_ = child_map_; + return Status::OK(); + } + + // column names are changed from this point onward, this map is the child_map without input cols for per_batch_map + auto child_map_no_in_col = child_map_; + for (const auto &col : in_col_names_) { + const auto itr = child_map_no_in_col.find(col); + CHECK_FAIL_RETURN_UNEXPECTED(itr != child_map_no_in_col.end(), "col:" + col + " doesn't exist."); + child_map_no_in_col.erase(itr); + } + + // col names are changed + if (out_col_names_.size() == in_col_names_.size()) { // column names changed, but same number of columns + // the following code rename the input keys to output keys. ["a","b"] -> ["b", "a"] is allowed + column_name_id_map_ = child_map_no_in_col; + for (auto i = 0; i < in_col_names_.size(); i++) { + column_name_id_map_[out_col_names_[i]] = child_map_[in_col_names_[i]]; + } + } else { // number of columns are different, put the output column names first, then the original ones + for (const std::string &col : out_col_names_) { + column_name_id_map_.insert({col, column_name_id_map_.size()}); + } + for (const auto &itr : child_map_no_in_col) { + column_name_id_map_.insert({itr.first, column_name_id_map_.size()}); + } + } + + CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.size() == (child_map_no_in_col.size() + out_col_names_.size()), + "Key error in column_name_id_map_. output_columns is NOT set correctly!"); + return Status::OK(); +} + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h index 503415704f..1b77856250 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h @@ -36,8 +36,6 @@ namespace mindspore { namespace dataset { class DataBuffer; -using TensorBatch = TensorRow; -using TensorBatchTable = std::vector; using PadInfo = std::map>>; class BatchOp : public ParallelOp { @@ -81,11 +79,17 @@ class BatchOp : public ParallelOp { return *this; } - // set columns to perform map on - // @param const std::vector & cols_to_map - name of columns to perform map on - // @return Builder & reference to builder class object - Builder &SetColumnsToMap(const std::vector &cols_to_map) { - builder_cols_to_map_ = cols_to_map; + /// \param in_col_name + /// \return Builder & reference to builder class object + Builder &SetInColNames(const std::vector &in_col_name) { + builder_in_names_ = in_col_name; + return *this; + } + + /// \param out_col_name + /// \return Builder & reference to builder class object + Builder &SetOutColNames(const std::vector &out_col_name) { + builder_out_names_ = out_col_name; return *this; } @@ -121,7 +125,8 @@ class BatchOp : public ParallelOp { int32_t builder_batch_size_; int32_t builder_num_workers_; int32_t builder_op_connector_size_; - std::vector builder_cols_to_map_; + std::vector builder_in_names_; + std::vector builder_out_names_; PadInfo builder_pad_map_; #ifdef ENABLE_PYTHON py::function builder_batch_size_func_; @@ -149,14 +154,10 @@ class BatchOp : public ParallelOp { }; #ifdef ENABLE_PYTHON - // BatchOp constructor - // @param int32_t batch_size - // @param bool drop - // @param int32_t op_queue_size - // @param int32_t rows_per_buf - // @param int32_t num_workers + BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers, - const std::vector &, py::function batch_size_func, py::function batch_map_func, PadInfo pad_map); + const std::vector &in_col_names, const std::vector &out_col_names, + py::function batch_size_func, py::function batch_map_func, PadInfo pad_map); #else BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers, const std::vector &, PadInfo pad_map); @@ -218,6 +219,9 @@ class BatchOp : public ParallelOp { static Status PadColumns(std::unique_ptr *table, const PadInfo &pad_info, const std::unordered_map &column_name_id_map); + protected: + Status ComputeColMap() override; + private: // Worker thread for doing the memcpy of batch // @param int32_t param workerId @@ -270,11 +274,13 @@ class BatchOp : public ParallelOp { #endif int32_t start_batch_size_; - bool drop_; // bool for whether to drop remainder or not - bool pad_; // bool for whether to perform padding on tensor - std::vector pyfunc_column_names_; // Name of the columns to perform map op on - PadInfo pad_info_; // column names to perform padding on - std::unique_ptr child_iterator_; // child iterator for fetching TensorRows 1 by 1 + const bool drop_; // bool for whether to drop remainder or not + const bool pad_; // bool for whether to perform padding on tensor + const std::vector in_col_names_; // input column name for per_batch_map + std::vector out_col_names_; // output column name for per_batch_map + PadInfo pad_info_; // column names to perform padding on + std::unique_ptr child_iterator_; // child iterator for fetching TensorRows 1 by 1 + std::unordered_map child_map_; // col_name_id_map of the child node QueueList, CBatchInfo>> worker_queues_; // internal queue for syncing worker #ifdef ENABLE_PYTHON py::function batch_size_func_; // Function pointer of batch size function diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 2f0a292c64..59d69edbe8 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -55,6 +55,7 @@ try: except ModuleNotFoundError: context = None + class Shuffle(str, Enum): GLOBAL: str = "global" FILES: str = "file" @@ -271,14 +272,14 @@ class Dataset: of Tensors on a given column. The number of lists should match with number of entries in input_columns. The last parameter of the callable should always be a BatchInfo object. input_columns (list[str], optional): List of names of the input columns. The size of the list should - match with signature of the per_batch_map callable. - output_columns (list[str], optional): [Not currently implemented] List of names assigned to the columns + match with signature of per_batch_map callable. + output_columns (list[str], optional): List of names assigned to the columns outputted by the last operation. This parameter is mandatory if len(input_columns) != len(output_columns). The size of this list must match the number of output columns of the last operation. (default=None, output columns will have the same name as the input columns, i.e., the columns will be replaced). - column_order (list[str], optional): [Not currently implemented] List of all the desired columns to - propagate to the child node. This list must be a subset of all the columns in the dataset after + column_order (list[str], optional): List of all the desired columns to propagate to + the child node. This list must be a subset of all the columns in the dataset after all operations are applied. The order of the columns in each row propagated to the child node follow the order they appear in this list. The parameter is mandatory if the len(input_columns) != len(output_columns). (default=None, all columns @@ -1700,9 +1701,9 @@ class BatchDataset(DatasetOp): self.batch_size = batch_size self.drop_remainder = drop_remainder self.per_batch_map = per_batch_map - self.input_columns = input_columns - self.output_columns = output_columns - self.column_order = column_order + self.input_columns = input_columns if not isinstance(input_columns, str) else [input_columns] + self.output_columns = output_columns if not isinstance(output_columns, str) else [output_columns] + self.column_order = column_order if not isinstance(column_order, str) else [column_order] self.pad_info = pad_info self.children.append(input_dataset) input_dataset.parent.append(self) @@ -1714,6 +1715,8 @@ class BatchDataset(DatasetOp): args["drop_remainder"] = self.drop_remainder args["per_batch_map"] = self.per_batch_map args["input_columns"] = self.input_columns + args["output_columns"] = self.output_columns + args["column_order"] = self.column_order args["pad_info"] = self.pad_info return args diff --git a/mindspore/dataset/engine/validators.py b/mindspore/dataset/engine/validators.py index 2082f5be2c..2f24f6c732 100644 --- a/mindspore/dataset/engine/validators.py +++ b/mindspore/dataset/engine/validators.py @@ -276,6 +276,7 @@ def check_save(method): return new_method + def check_iterator(method): """A wrapper that wraps a parameter checker around the original create_tuple_iterator and create_dict_iterator.""" @@ -529,10 +530,10 @@ def check_batch(method): raise ValueError("the signature of per_batch_map should match with input columns") if output_columns is not None: - raise ValueError("output_columns is currently not implemented.") + check_columns(output_columns, "output_columns") if column_order is not None: - raise ValueError("column_order is currently not implemented.") + check_columns(column_order, "column_order") return method(self, *args, **kwargs) diff --git a/tests/ut/python/dataset/test_batch.py b/tests/ut/python/dataset/test_batch.py index 4130564521..00a918439d 100644 --- a/tests/ut/python/dataset/test_batch.py +++ b/tests/ut/python/dataset/test_batch.py @@ -449,22 +449,6 @@ def test_batch_exception_13(): logger.info("Got an exception in DE: {}".format(str(e))) assert "shard_id" in str(e) - # test non-functional parameters - try: - data1 = data1.batch(batch_size, output_columns="3") - sum([1 for _ in data1]) - - except ValueError as e: - logger.info("Got an exception in DE: {}".format(str(e))) - assert "output_columns is currently not implemented." in str(e) - - try: - data1 = data1.batch(batch_size, column_order="3") - sum([1 for _ in data1]) - - except ValueError as e: - logger.info("Got an exception in DE: {}".format(str(e))) - assert "column_order is currently not implemented." in str(e) def test_batch_exception_14(): batch_size = 2 diff --git a/tests/ut/python/dataset/test_var_batch_map.py b/tests/ut/python/dataset/test_var_batch_map.py index c055d1526e..f4c60b08cd 100644 --- a/tests/ut/python/dataset/test_var_batch_map.py +++ b/tests/ut/python/dataset/test_var_batch_map.py @@ -289,11 +289,11 @@ def test_exception(): def bad_batch_size(batchInfo): raise StopIteration - #return batchInfo.get_batch_num() + # return batchInfo.get_batch_num() def bad_map_func(col, batchInfo): raise StopIteration - #return (col,) + # return (col,) data1 = ds.GeneratorDataset((lambda: gen(100)), ["num"]).batch(bad_batch_size) try: @@ -312,6 +312,68 @@ def test_exception(): pass +def test_multi_col_map(): + def gen_2_cols(num): + for i in range(1, 1 + num): + yield (np.array([i]), np.array([i ** 2])) + + def split_col(col, batchInfo): + return ([np.copy(arr) for arr in col], [np.copy(-arr) for arr in col]) + + def merge_col(col1, col2, batchInfo): + merged = [] + for k, v in enumerate(col1): + merged.append(np.array(v + col2[k])) + return (merged,) + + def swap_col(col1, col2, batchInfo): + return ([np.copy(a) for a in col2], [np.copy(b) for b in col1]) + + def batch_map_config(num, s, f, in_nms, out_nms, col_order=None): + try: + dst = ds.GeneratorDataset((lambda: gen_2_cols(num)), ["col1", "col2"]) + dst = dst.batch(batch_size=s, input_columns=in_nms, output_columns=out_nms, per_batch_map=f, + column_order=col_order) + res = [] + for row in dst.create_dict_iterator(num_epochs=1, output_numpy=True): + res.append(row) + return res + except (ValueError, RuntimeError, TypeError) as e: + return str(e) + + # split 1 col into 2 cols + res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"])[0] + assert np.array_equal(res["col1"], [[1], [2]]) + assert np.array_equal(res["col_x"], [[1], [4]]) and np.array_equal(res["col_y"], [[-1], [-4]]) + + # merge 2 cols into 1 col + res = batch_map_config(4, 4, merge_col, ["col1", "col2"], ["merged"])[0] + assert np.array_equal(res["merged"], [[2], [6], [12], [20]]) + + # swap once + res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col1", "col2"])[0] + assert np.array_equal(res["col1"], [[1], [4], [9]]) and np.array_equal(res["col2"], [[1], [2], [3]]) + + # swap twice + res = batch_map_config(3, 3, swap_col, ["col1", "col2"], ["col2", "col1"])[0] + assert np.array_equal(res["col2"], [[1], [4], [9]]) and np.array_equal(res["col1"], [[1], [2], [3]]) + + # test project after map + res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col_x", "col_y", "col1"])[0] + assert list(res.keys()) == ["col_x", "col_y", "col1"] + + # test the insertion order is maintained + res = batch_map_config(2, 2, split_col, ["col2"], ["col_x", "col_y"], ["col1", "col_x", "col_y"])[0] + assert list(res.keys()) == ["col1", "col_x", "col_y"] + + # test exceptions + assert "output_columns with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], 233) + assert "column_order with value 233 is not of type" in batch_map_config(2, 2, split_col, ["col2"], ["col1"], 233) + assert "output_columns is NOT set correctly" in batch_map_config(2, 2, split_col, ["col2"], ["col1"]) + assert "Incorrect number of columns" in batch_map_config(2, 2, split_col, ["col2"], ["col3", "col4", "col5"]) + assert "col-1 doesn't exist" in batch_map_config(2, 2, split_col, ["col-1"], ["col_x", "col_y"]) + + if __name__ == '__main__': logger.info("Running test_var_batch_map.py test_batch_corner_cases() function") test_batch_corner_cases() @@ -333,3 +395,6 @@ if __name__ == '__main__': logger.info("Running test_var_batch_map.py test_exception() function") test_exception() + + logger.info("Running test_var_batch_map.py test_multi_col_map() function") + test_multi_col_map()