From 02ddddfe7e1d59973ca7b90efc9bebb1e60bf1aa Mon Sep 17 00:00:00 2001 From: Lixia Chen Date: Mon, 26 Oct 2020 14:22:03 -0400 Subject: [PATCH] Enable caching over non-mappable dataset c++ api and caching over Album dataset --- .../ccsrc/minddata/dataset/api/datasets.cc | 23 +- .../engine/datasetops/source/album_op.cc | 8 +- .../engine/datasetops/source/album_op.h | 3 +- .../engine/datasetops/source/clue_op.h | 4 + .../engine/ir/datasetops/source/album_node.cc | 7 +- .../engine/ir/datasetops/source/album_node.h | 3 +- .../engine/ir/datasetops/source/clue_node.cc | 14 +- .../engine/ir/datasetops/source/csv_node.cc | 15 +- .../ir/datasetops/source/text_file_node.cc | 10 +- .../ir/datasetops/source/tf_record_node.cc | 15 +- .../engine/opt/pre/cache_transform_pass.cc | 5 +- .../ccsrc/minddata/dataset/include/datasets.h | 16 +- tests/ut/cpp/dataset/c_api_cache_test.cc | 363 +++++++++++++++++- 13 files changed, 455 insertions(+), 31 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index 8e245662ec..6914d2e920 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -301,8 +301,9 @@ std::shared_ptr Schema(const std::string &schema_file) { // Function to create a AlbumNode. std::shared_ptr Album(const std::string &dataset_dir, const std::string &data_schema, const std::vector &column_names, bool decode, - const std::shared_ptr &sampler) { - auto ds = std::make_shared(dataset_dir, data_schema, column_names, decode, sampler); + const std::shared_ptr &sampler, + const std::shared_ptr &cache) { + auto ds = std::make_shared(dataset_dir, data_schema, column_names, decode, sampler, cache); return ds->ValidateParams() ? ds : nullptr; } @@ -1021,9 +1022,25 @@ std::shared_ptr CreateDatasetCache(session_id_type id, uint64_t me auto cache = std::make_shared(id, mem_sz, spill, hostname, port, num_connections, prefetch_sz); return cache->ValidateParams() ? cache : nullptr; } - #endif +std::shared_ptr SelectSampler(int64_t num_samples, bool shuffle, int32_t num_shards, int32_t shard_id) { + if (shuffle) { + if (num_shards > 1) { + // If shuffle enabled, sharding enabled, use distributed random sampler + return DistributedSampler(num_shards, shard_id, shuffle, num_samples); + } + // If shuffle enabled, sharding disabled, use random sampler + return RandomSampler(num_samples >= 0, num_samples); + } + if (num_shards > 1) { + // If shuffle disabled, sharding enabled, use distributed sequential sampler + return DistributedSampler(num_shards, shard_id, shuffle, num_samples); + } + // If shuffle disabled, sharding disabled, use sequential sampler + return SequentialSampler(0, num_samples); +} + } // namespace api } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index 0aa9c3c932..8458a5db30 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -425,9 +425,9 @@ Status AlbumOp::LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num, // to take a reference to a column descriptor? // the design of this class is to make the code more readable, forgoing minor perfomance gain like // getting rid of duplicated checks -Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) { +Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row) { // testing here is to just print out file path - (*row) = {}; + (*row) = TensorRow(row_id, {}); MS_LOG(INFO) << "Image row file: " << file << "."; std::ifstream file_handle(folder_path_ + file); @@ -444,7 +444,7 @@ Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) { // get columns in schema: int32_t columns = data_schema_->NumColumns(); - // loop over each column descriptor, this can optimized by swtich cases + // loop over each column descriptor, this can optimized by switch cases for (int32_t i = 0; i < columns; i++) { // special case to handle if (data_schema_->column(i).name() == "id") { @@ -521,7 +521,7 @@ Status AlbumOp::LoadBuffer(const std::vector &keys, std::unique_ptrLoadTensorRow(image_rows_[key], &trow)); + RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_rows_[key], &trow)); deq->push_back(std::move(trow)); } (*db)->set_tensor_table(std::move(deq)); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index 401e543d2b..2d87160fb7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -269,10 +269,11 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { Status LoadIDTensor(const std::string &file, uint32_t col_num, TensorRow *row); /// \brief Load a tensor row according to a json file + /// \param[in] row_id_type row_id - id for this tensor row /// \param[in] ImageColumns file Json file location /// \param[inout] TensorRow row Json content stored into a tensor row /// \return Status The error code returned - Status LoadTensorRow(const std::string &file, TensorRow *row); + Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row); /// \param[in] const std::vector &keys Keys in ioblock /// \param[inout] std::unique_ptr db Databuffer to push to diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h index d2825dc1b6..9c2ae5ac03 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h @@ -187,6 +187,10 @@ class ClueOp : public ParallelOp { /// that this clue op will produce the full set of data into the cache. void MakeSimpleProducer(); + // Op name getter + // @return Name of the current Op + std::string Name() const override { return "ClueOp"; } + // Base-class override for NodePass visitor acceptor. // @param p - Pointer to the NodePass to be accepted. // @param modified - Whether this node visit modified the pipeline. diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc index 1baa78c2dc..5bcab68789 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.cc @@ -31,8 +31,9 @@ namespace api { // Constructor for AlbumNode AlbumNode::AlbumNode(const std::string &dataset_dir, const std::string &data_schema, const std::vector &column_names, bool decode, - const std::shared_ptr &sampler) - : dataset_dir_(dataset_dir), + const std::shared_ptr &sampler, const std::shared_ptr &cache) + : Dataset(std::move(cache)), + dataset_dir_(dataset_dir), schema_path_(data_schema), column_names_(column_names), decode_(decode), @@ -63,6 +64,8 @@ std::vector> AlbumNode::Build() { // Argument that is not exposed to user in the API. std::set extensions = {}; + RETURN_EMPTY_IF_ERROR(AddCacheOp(&node_ops)); + node_ops.push_back(std::make_shared(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, decode_, extensions, std::move(schema), std::move(sampler_->Build()))); return node_ops; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.h index 498535296e..6ef0d159fb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/album_node.h @@ -31,7 +31,8 @@ class AlbumNode : public Dataset { public: /// \brief Constructor AlbumNode(const std::string &dataset_dir, const std::string &data_schema, - const std::vector &column_names, bool decode, const std::shared_ptr &sampler); + const std::vector &column_names, bool decode, const std::shared_ptr &sampler, + const std::shared_ptr &cache); /// \brief Destructor ~AlbumNode() = default; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc index af6a5d75b2..538fa63817 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/clue_node.cc @@ -185,15 +185,21 @@ std::vector> CLUENode::Build() { bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + // ClueOp by itself is a non-mappable dataset that does not support sampling. + // However, if a cache operator is injected at some other place higher in the tree, that cache can + // inherit this sampler from the leaf, providing sampling support from the caching layer. + // That is why we save the sampler here in a leaf node that does not use sampling. + std::shared_ptr sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_); + // Sort the dataset files in a lexicographical order std::vector sorted_dataset_files = dataset_files_; std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end()); - std::shared_ptr clue_op = - std::make_shared(num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, - sorted_dataset_files, connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr); + std::shared_ptr clue_op = std::make_shared( + num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, sorted_dataset_files, + connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build())); RETURN_EMPTY_IF_ERROR(clue_op->Init()); - if (shuffle_ == ShuffleMode::kGlobal) { + if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc index 222b9d6d74..20f3e3701a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/csv_node.cc @@ -80,6 +80,12 @@ std::vector> CSVNode::Build() { bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + // CSVOp by itself is a non-mappable dataset that does not support sampling. + // However, if a cache operator is injected at some other place higher in the tree, that cache can + // inherit this sampler from the leaf, providing sampling support from the caching layer. + // That is why we save the sampler here in a leaf node that does not use sampling. + std::shared_ptr sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_); + // Sort the dataset files in a lexicographical order std::vector sorted_dataset_files = dataset_files_; std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end()); @@ -98,11 +104,12 @@ std::vector> CSVNode::Build() { } } - std::shared_ptr csv_op = std::make_shared( - sorted_dataset_files, field_delim_, column_default_list, column_names_, num_workers_, rows_per_buffer_, - num_samples_, worker_connector_size_, connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr); + std::shared_ptr csv_op = + std::make_shared(sorted_dataset_files, field_delim_, column_default_list, column_names_, num_workers_, + rows_per_buffer_, num_samples_, worker_connector_size_, connector_que_size_, shuffle_files, + num_shards_, shard_id_, std::move(sampler_->Build())); RETURN_EMPTY_IF_ERROR(csv_op->Init()); - if (shuffle_ == ShuffleMode::kGlobal) { + if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc index 519d2b6c30..8af0388d4a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/text_file_node.cc @@ -59,6 +59,12 @@ std::vector> TextFileNode::Build() { bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + // TextFileOp by itself is a non-mappable dataset that does not support sampling. + // However, if a cache operator is injected at some other place higher in the tree, that cache can + // inherit this sampler from the leaf, providing sampling support from the caching layer. + // That is why we save the sampler here in a leaf node that does not use sampling. + std::shared_ptr sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_); + // Sort the dataset files in a lexicographical order std::vector sorted_dataset_files = dataset_files_; std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end()); @@ -71,10 +77,10 @@ std::vector> TextFileNode::Build() { // Create and initalize TextFileOp std::shared_ptr text_file_op = std::make_shared( num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, std::move(schema), sorted_dataset_files, - connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr); + connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build())); RETURN_EMPTY_IF_ERROR(text_file_op->Init()); - if (shuffle_ == ShuffleMode::kGlobal) { + if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; int64_t num_rows = 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc index 1dac924d60..2ea431ada6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/source/tf_record_node.cc @@ -52,14 +52,21 @@ std::vector> TFRecordNode::Build() { bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + // TFReaderOp by itself is a non-mappable dataset that does not support sampling. + // However, if a cache operator is injected at some other place higher in the tree, that cache can + // inherit this sampler from the leaf, providing sampling support from the caching layer. + // That is why we save the sampler here in a leaf node that does not use sampling. + std::shared_ptr sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_); + // Create and initialize TFReaderOp - std::shared_ptr tf_reader_op = std::make_shared( - num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, std::move(data_schema), - connector_que_size_, columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, nullptr); + std::shared_ptr tf_reader_op = + std::make_shared(num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, + std::move(data_schema), connector_que_size_, columns_list_, shuffle_files, num_shards_, + shard_id_, shard_equal_rows_, std::move(sampler_->Build())); RETURN_EMPTY_IF_ERROR(tf_reader_op->Init()); - if (shuffle_ == ShuffleMode::kGlobal) { + if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { // Inject ShuffleOp std::shared_ptr shuffle_op = nullptr; diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc index a2728e4a8c..30a2e33cd1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/pre/cache_transform_pass.cc @@ -188,10 +188,7 @@ Status CacheTransformPass::CachePass::RunOnNode(std::shared_ptr n // Perform leaf node cache transform identification Status CacheTransformPass::CachePass::RunOnNode(std::shared_ptr node, bool *modified) { - if (is_caching_) { - RETURN_STATUS_UNEXPECTED("There is currently no support for AlbumOp under cache."); - } - return Status::OK(); + return MappableCacheLeafSetup(std::static_pointer_cast(node)); } // Perform leaf node cache transform identification diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 8a39ac7b4c..75c46905e5 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -144,10 +144,13 @@ std::shared_ptr Schema(const std::string &schema_file = ""); /// \param[in] decode the option to decode the images in dataset (default = false) /// \param[in] sampler Object used to choose samples from the dataset. If sampler is not given, /// a `RandomSampler` will be used to randomly iterate the entire dataset (default = RandomSampler()) +/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). +/// The cache feature is under development and is not recommended. /// \return Shared pointer to the current Dataset std::shared_ptr Album(const std::string &dataset_dir, const std::string &data_schema, const std::vector &column_names = {}, bool decode = false, - const std::shared_ptr &sampler = RandomSampler()); + const std::shared_ptr &sampler = RandomSampler(), + const std::shared_ptr &cache = nullptr); /// \brief Function to create a CelebANode /// \notes The generated dataset has two columns ['image', 'attr']. @@ -549,6 +552,17 @@ std::shared_ptr CreateDatasetCache(session_id_type id, uint64_t me std::optional prefetch_sz = std::nullopt); #endif +/// \brief Function to create a sampler for non-mappable dataset (to be used by cache op later). +/// \notes Non-mappable dataset does not directly support a sampler. It has provided sampling arguments (shuffle, +/// num_samples, num_shards, shard_id) and it DOES support sampling if somewhere above it in the pipeline contains +/// a cache. If there is no cache above it, then the sampler is not used. +/// \param[in] num_samples The number of samples to be included in the dataset. +/// \param[in] shuffle If true, the indices are shuffled. +/// \param[in] num_shards Number of shards to divide the dataset into. +/// \param[in] shard_id Shard ID of the current shard within num_shards. +/// \return Shared pointer to the current Sampler. +std::shared_ptr SelectSampler(int64_t num_samples, bool shuffle, int32_t num_shards, int32_t shard_id); + /// \brief Function to create a ZipNode /// \notes Applies zip to the dataset /// \param[in] datasets List of shared pointers to the datasets that we want to zip diff --git a/tests/ut/cpp/dataset/c_api_cache_test.cc b/tests/ut/cpp/dataset/c_api_cache_test.cc index cb2ed8b2a7..6c34250d99 100644 --- a/tests/ut/cpp/dataset/c_api_cache_test.cc +++ b/tests/ut/cpp/dataset/c_api_cache_test.cc @@ -17,14 +17,19 @@ #include "minddata/dataset/include/datasets.h" // IR leaf nodes - +#include "minddata/dataset/engine/ir/datasetops/source/album_node.h" #include "minddata/dataset/engine/ir/datasetops/source/celeba_node.h" #include "minddata/dataset/engine/ir/datasetops/source/cifar10_node.h" #include "minddata/dataset/engine/ir/datasetops/source/cifar100_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/clue_node.h" #include "minddata/dataset/engine/ir/datasetops/source/coco_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/csv_node.h" #include "minddata/dataset/engine/ir/datasetops/source/image_folder_node.h" #include "minddata/dataset/engine/ir/datasetops/source/manifest_node.h" #include "minddata/dataset/engine/ir/datasetops/source/mnist_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/random_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/text_file_node.h" +#include "minddata/dataset/engine/ir/datasetops/source/tf_record_node.h" #include "minddata/dataset/engine/ir/datasetops/source/voc_node.h" using namespace mindspore::dataset; @@ -384,3 +389,359 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheVocCApi) { // Manually terminate the pipeline iter->Stop(); } + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + std::string folder_path = datasets_root_path_ + "/testAlbum/images"; + std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json"; + std::vector column_names = {"image", "label", "id"}; + // Create a Album Dataset, 7 records in it + std::shared_ptr ds = Album(folder_path, schema_file, column_names, false, RandomSampler(), some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 14); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheRandomDataCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a RandomDataset + std::shared_ptr schema = Schema(); + schema->add_column("image", mindspore::TypeId::kNumberTypeUInt8, {2}); + schema->add_column("label", mindspore::TypeId::kNumberTypeUInt8, {1}); + std::shared_ptr ds = RandomData(4, schema, {}, RandomSampler(), some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 8); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi1) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a TFRecord Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/test_tf_file_3_images2/train-0000-of-0001.data"; + std::string schema_path = datasets_root_path_ + "/test_tf_file_3_images2/datasetSchema.json"; + std::shared_ptr ds = + TFRecord({file_path}, schema_path, {"image"}, 0, ShuffleMode::kFalse, 1, 0, false, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row["image"]; + MS_LOG(INFO) << "Tensor image shape: " << image->shape(); + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi2) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a TFRecord Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/test_tf_file_3_images2/train-0000-of-0001.data"; + std::string schema_path = datasets_root_path_ + "/test_tf_file_3_images2/datasetSchema.json"; + + // In this one, the TFRecord dataset will be given sharding configuration, however since a cache is + // used, the tree prepare should undo the sharding configuration and instead, a distributed + // sampler will be chosen with the same shard config. + // With only 3 records shard into 3, we expect only 1 record returned for this shard + // However, the sharding will be done by the sampler, not by the TFRecord leaf node + // In this case, it is a row-based sharding, not the file-based sharding that would happen if + // there was not any cache. + std::shared_ptr ds = + TFRecord({file_path}, schema_path, {"image"}, 0, ShuffleMode::kFalse, 3, 0, false, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row["image"]; + MS_LOG(INFO) << "Tensor image shape: " << image->shape(); + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi3) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a TFRecord Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/test_tf_file_3_images2/train-0000-of-0001.data"; + std::string schema_path = datasets_root_path_ + "/test_tf_file_3_images2/datasetSchema.json"; + + // In this one, a num_samples argument is given. + // In this case, a sequential sampler would be chosen with the same num_samples argument. + // The samples will be selected by the sequential sampler, not by the TFRecord leaf node. + std::shared_ptr ds = + TFRecord({file_path}, schema_path, {"image"}, 2, ShuffleMode::kFalse, 1, 0, false, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + auto image = row["image"]; + MS_LOG(INFO) << "Tensor image shape: " << image->shape(); + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTextfileCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a TextFile Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/testTextFileDataset/1.txt"; + + // In this one, a num_samples=2 argument is given. + // In this case, a sequential sampler would be chosen with the same num_samples argument. + // The samples will be selected by the sequential sampler, not by the TextFile leaf node. + std::shared_ptr ds = TextFile({file_path}, 2, ShuffleMode::kGlobal, 1, 0, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCsvCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a CSV Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/testCSV/1.csv"; + std::vector column_names = {"col1", "col2", "col3", "col4"}; + + // In this one, a num_samples=2 argument is given. + // In this case, a sequential sampler would be chosen with the same num_samples argument. + // The samples will be selected by the sequential sampler, not by the CSV leaf node. + std::shared_ptr ds = CSV({file_path}, ',', {}, column_names, 2, ShuffleMode::kFalse, 1, 0, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) { + session_id_type env_session; + Status s = GetSessionFromEnv(&env_session); + EXPECT_EQ(s, Status::OK()); + + std::shared_ptr some_cache = CreateDatasetCache(env_session, 0, true); + EXPECT_NE(some_cache, nullptr); + + // Create a CLUE Dataset, this file_path has 3 records in it + std::string file_path = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + + // In this one, a num_samples=2 argument is given. + // In this case, a sequential sampler would be chosen with the same num_samples argument. + // The samples will be selected by the sequential sampler, not by the CLUE leaf node. + std::shared_ptr ds = CLUE({file_path}, task, usage, 2, ShuffleMode::kFalse, 1, 0, some_cache); + EXPECT_NE(ds, nullptr); + + // Create a Repeat operation on ds + int32_t repeat_num = 2; + ds = ds->Repeat(repeat_num); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + uint64_t i = 0; + while (row.size() != 0) { + i++; + iter->GetNextRow(&row); + } + + EXPECT_EQ(i, 4); + + // Manually terminate the pipeline + iter->Stop(); +} +