diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index f0d042d9ce..4f8de0ab8c 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -23,6 +23,7 @@ // Source dataset headers (in alphabetical order) #include "minddata/dataset/engine/datasetops/source/celeba_op.h" #include "minddata/dataset/engine/datasetops/source/cifar_op.h" +#include "minddata/dataset/engine/datasetops/source/clue_op.h" #include "minddata/dataset/engine/datasetops/source/coco_op.h" #include "minddata/dataset/engine/datasetops/source/image_folder_op.h" #include "minddata/dataset/engine/datasetops/source/mnist_op.h" @@ -95,6 +96,7 @@ Dataset::Dataset() { num_workers_ = cfg->num_parallel_workers(); rows_per_buffer_ = cfg->rows_per_buffer(); connector_que_size_ = cfg->op_connector_size(); + worker_connector_size_ = cfg->worker_connector_size(); } // FUNCTIONS TO CREATE DATASETS FOR LEAF-NODE DATASETS @@ -126,6 +128,16 @@ std::shared_ptr Cifar100(const std::string &dataset_dir, std::s return ds->ValidateParams() ? ds : nullptr; } +// Function to create a CLUEDataset. +std::shared_ptr CLUE(const std::vector &clue_files, const std::string &task, + const std::string &usage, int64_t num_samples, ShuffleMode shuffle, int num_shards, + int shard_id) { + auto ds = std::make_shared(clue_files, task, usage, num_samples, shuffle, num_shards, shard_id); + + // Call derived class validation method. + return ds->ValidateParams() ? ds : nullptr; +} + // Function to create a CocoDataset. std::shared_ptr Coco(const std::string &dataset_dir, const std::string &annotation_file, const std::string &task, const bool &decode, @@ -340,6 +352,34 @@ std::shared_ptr CreateDefaultSampler() { return std::make_shared(replacement, num_samples); } +// Helper function to compute a default shuffle size +int64_t ComputeShuffleSize(int64_t num_files, int64_t num_devices, int64_t num_rows, int64_t total_rows) { + const int64_t average_files_multiplier = 4; + const int64_t shuffle_max = 10000; + int64_t avg_rows_per_file = 0; + int64_t shuffle_size = 0; + + // Adjust the num rows per shard if sharding was given + if (num_devices > 0) { + if (num_rows % num_devices == 0) { + num_rows = num_rows / num_devices; + } else { + num_rows = (num_rows / num_devices) + 1; + } + } + + // Cap based on total rows directive. Some ops do not have this and give value of 0. + if (total_rows > 0) { + num_rows = std::min(num_rows, total_rows); + } + + // get the average per file + avg_rows_per_file = num_rows / num_files; + + shuffle_size = std::max(avg_rows_per_file * average_files_multiplier, shuffle_max); + return shuffle_size; +} + // Helper function to validate dataset params bool ValidateCommonDatasetParams(std::string dataset_dir) { if (dataset_dir.empty()) { @@ -461,6 +501,206 @@ std::vector> Cifar100Dataset::Build() { return node_ops; } +// Constructor for CLUEDataset +CLUEDataset::CLUEDataset(const std::vector clue_files, std::string task, std::string usage, + int64_t num_samples, ShuffleMode shuffle, int num_shards, int shard_id) + : dataset_files_(clue_files), + task_(task), + usage_(usage), + num_samples_(num_samples), + shuffle_(shuffle), + num_shards_(num_shards), + shard_id_(shard_id) {} + +bool CLUEDataset::ValidateParams() { + if (dataset_files_.empty()) { + MS_LOG(ERROR) << "CLUEDataset: dataset_files is not specified."; + return false; + } + + for (auto f : dataset_files_) { + Path clue_file(f); + if (!clue_file.Exists()) { + MS_LOG(ERROR) << "dataset file: [" << f << "] is invalid or not exist"; + return false; + } + } + + std::vector task_list = {"AFQMC", "TNEWS", "IFLYTEK", "CMNLI", "WSC", "CSL"}; + std::vector usage_list = {"train", "test", "eval"}; + + if (find(task_list.begin(), task_list.end(), task_) == task_list.end()) { + MS_LOG(ERROR) << "task should be AFQMC, TNEWS, IFLYTEK, CMNLI, WSC or CSL."; + return false; + } + + if (find(usage_list.begin(), usage_list.end(), usage_) == usage_list.end()) { + MS_LOG(ERROR) << "usage should be train, test or eval."; + return false; + } + + if (num_samples_ < 0) { + MS_LOG(ERROR) << "CLUEDataset: Invalid number of samples: " << num_samples_; + return false; + } + + if (num_shards_ <= 0) { + MS_LOG(ERROR) << "CLUEDataset: Invalid num_shards: " << num_shards_; + return false; + } + + if (shard_id_ < 0 || shard_id_ >= num_shards_) { + MS_LOG(ERROR) << "CLUEDataset: Invalid input, shard_id: " << shard_id_ << ", num_shards: " << num_shards_; + return false; + } + + return true; +} + +// Function to split string based on a character delimiter +std::vector CLUEDataset::split(const std::string &s, char delim) { + std::vector res; + std::stringstream ss(s); + std::string item; + + while (getline(ss, item, delim)) { + res.push_back(item); + } + return res; +} + +// Function to build CLUEDataset +std::vector> CLUEDataset::Build() { + // A vector containing shared pointer to the Dataset Ops that this object will create + std::vector> node_ops; + std::map key_map; + if (task_ == "AFQMC") { + if (usage_ == "train") { + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + key_map["label"] = "label"; + } else if (usage_ == "test") { + key_map["id"] = "id"; + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + } else if (usage_ == "eval") { + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + key_map["label"] = "label"; + } + } else if (task_ == "CMNLI") { + if (usage_ == "train") { + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + key_map["label"] = "label"; + } else if (usage_ == "test") { + key_map["id"] = "id"; + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + } else if (usage_ == "eval") { + key_map["sentence1"] = "sentence1"; + key_map["sentence2"] = "sentence2"; + key_map["label"] = "label"; + } + } else if (task_ == "CSL") { + if (usage_ == "train") { + key_map["id"] = "id"; + key_map["abst"] = "abst"; + key_map["keyword"] = "keyword"; + key_map["label"] = "label"; + } else if (usage_ == "test") { + key_map["id"] = "id"; + key_map["abst"] = "abst"; + key_map["keyword"] = "keyword"; + } else if (usage_ == "eval") { + key_map["id"] = "id"; + key_map["abst"] = "abst"; + key_map["keyword"] = "keyword"; + key_map["label"] = "label"; + } + } else if (task_ == "IFLYTEK") { + if (usage_ == "train") { + key_map["label"] = "label"; + key_map["label_des"] = "label_des"; + key_map["sentence"] = "sentence"; + } else if (usage_ == "test") { + key_map["id"] = "id"; + key_map["sentence"] = "sentence"; + } else if (usage_ == "eval") { + key_map["label"] = "label"; + key_map["label_des"] = "label_des"; + key_map["sentence"] = "sentence"; + } + } else if (task_ == "TNEWS") { + if (usage_ == "train") { + key_map["label"] = "label"; + key_map["label_desc"] = "label_desc"; + key_map["sentence"] = "sentence"; + key_map["keywords"] = "keywords"; + } else if (usage_ == "test") { + key_map["id"] = "id"; + key_map["sentence"] = "sentence"; + key_map["keywords"] = "keywords"; + } else if (usage_ == "eval") { + key_map["label"] = "label"; + key_map["label_desc"] = "label_desc"; + key_map["sentence"] = "sentence"; + key_map["keywords"] = "keywords"; + } + } else if (task_ == "WSC") { + if (usage_ == "train") { + key_map["span1_index"] = "target/span1_index"; + key_map["span2_index"] = "target/span2_index"; + key_map["span1_text"] = "target/span1_text"; + key_map["span2_text"] = "target/span2_text"; + key_map["idx"] = "idx"; + key_map["label"] = "label"; + key_map["text"] = "text"; + } else if (usage_ == "test") { + key_map["span1_index"] = "target/span1_index"; + key_map["span2_index"] = "target/span2_index"; + key_map["span1_text"] = "target/span1_text"; + key_map["span2_text"] = "target/span2_text"; + key_map["idx"] = "idx"; + key_map["text"] = "text"; + } else if (usage_ == "eval") { + key_map["span1_index"] = "target/span1_index"; + key_map["span2_index"] = "target/span2_index"; + key_map["span1_text"] = "target/span1_text"; + key_map["span2_text"] = "target/span2_text"; + key_map["idx"] = "idx"; + key_map["label"] = "label"; + key_map["text"] = "text"; + } + } + + ColKeyMap ck_map; + for (auto &p : key_map) { + ck_map.insert({p.first, split(p.second, '/')}); + } + + bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); + std::shared_ptr clue_op = + std::make_shared(num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, + dataset_files_, connector_que_size_, shuffle_files, num_shards_, shard_id_); + clue_op->Init(); + if (shuffle_ == ShuffleMode::kGlobal) { + // Inject ShuffleOp + int64_t shuffle_size = 0; + int64_t num_rows = 0; + + // First, get the number of rows in the datset and then compute the shuffle size + RETURN_EMPTY_IF_ERROR(ClueOp::CountAllFileRows(dataset_files_, &num_rows)); + shuffle_size = ComputeShuffleSize(dataset_files_.size(), num_shards_, num_rows, 0); + MS_LOG(INFO) << "CLUEDataset::Build - num_rows: " << num_rows << ", shuffle_size: " << shuffle_size; + std::shared_ptr op = + std::make_shared(shuffle_size, GetSeed(), worker_connector_size_, true, rows_per_buffer_); + node_ops.push_back(op); + } + node_ops.push_back(clue_op); + return node_ops; +} + // Constructor for CocoDataset CocoDataset::CocoDataset(const std::string &dataset_dir, const std::string &annotation_file, const std::string &task, const bool &decode, const std::shared_ptr &sampler) diff --git a/mindspore/ccsrc/minddata/dataset/core/constants.h b/mindspore/ccsrc/minddata/dataset/core/constants.h index 8c8c0044a6..61c9711d4b 100644 --- a/mindspore/ccsrc/minddata/dataset/core/constants.h +++ b/mindspore/ccsrc/minddata/dataset/core/constants.h @@ -35,6 +35,9 @@ enum class DatasetType { kUnknown, kArrow, kTf }; // Possible flavours of Tensor implementations enum class TensorImpl { kNone, kFlexible, kCv, kNP }; +// Possible values for shuffle +enum class ShuffleMode { kFalse = 0, kFiles = 1, kGlobal = 2 }; + // Possible values for Border types enum class BorderType { kConstant = 0, kEdge = 1, kReflect = 2, kSymmetric = 3 }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h index d4873ec697..da7f86b34e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.h @@ -267,7 +267,7 @@ class ClueOp : public ParallelOp { std::unique_ptr filename_index_; std::vector clue_files_list_; WaitPost io_block_queue_wait_post_; - std::unique_ptr jagged_buffer_connector_; + std::shared_ptr jagged_buffer_connector_; QueueList> io_block_queues_; bool load_jagged_connector_; ColKeyMap cols_to_keyword_; diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 20193e39b2..108b4a7baa 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -44,6 +44,7 @@ class SamplerObj; class CelebADataset; class Cifar10Dataset; class Cifar100Dataset; +class CLUEDataset; class CocoDataset; class ImageFolderDataset; class MnistDataset; @@ -91,6 +92,27 @@ std::shared_ptr Cifar10(const std::string &dataset_dir, std::sha std::shared_ptr Cifar100(const std::string &dataset_dir, std::shared_ptr sampler = nullptr); +/// \brief Function to create a CLUEDataset +/// \notes The generated dataset has a variable number of columns depending on the task and usage +/// \param[in] dataset_files List of files to be read to search for a pattern of files. The list +/// will be sorted in a lexicographical order. +/// \param[in] task The kind of task, one of "AFQMC", "TNEWS", "IFLYTEK", "CMNLI", "WSC" and "CSL" (default="AFQMC"). +/// \param[in] usage Be used to "train", "test" or "eval" data (default="train"). +/// \param[in] num_samples The number of samples to be included in the dataset. +/// (Default = 0 means all samples.) +/// \param[in] shuffle The mode for shuffling data every epoch. (Default=ShuffleMode.kGlobal) +/// Can be any of: +/// ShuffleMode.kFalse - No shuffling is performed. +/// ShuffleMode.kFiles - Shuffle files only. +/// ShuffleMode.kGlobal - Shuffle both the files and samples. +/// \param[in] num_shards Number of shards that the dataset should be divided into. (Default = 1) +/// \param[in] shard_id The shard ID within num_shards. This argument should be +/// specified only when num_shards is also specified. (Default = 0) +/// \return Shared pointer to the current CLUEDataset +std::shared_ptr CLUE(const std::vector &dataset_files, const std::string &task = "AFQMC", + const std::string &usage = "train", int64_t num_samples = 0, + ShuffleMode shuffle = ShuffleMode::kGlobal, int num_shards = 1, int shard_id = 0); + /// \brief Function to create a CocoDataset /// \notes The generated dataset has multi-columns : /// - task='Detection', column: [['image', dtype=uint8], ['bbox', dtype=float32], ['category_id', dtype=uint32], @@ -289,6 +311,7 @@ class Dataset : public std::enable_shared_from_this { int32_t num_workers_; int32_t rows_per_buffer_; int32_t connector_que_size_; + int32_t worker_connector_size_; }; /* ####################################### Derived Dataset classes ################################# */ @@ -361,6 +384,39 @@ class Cifar100Dataset : public Dataset { std::shared_ptr sampler_; }; +/// \class CLUEDataset +/// \brief A Dataset derived class to represent CLUE dataset +class CLUEDataset : public Dataset { + public: + /// \brief Constructor + CLUEDataset(const std::vector dataset_files, std::string task, std::string usage, int64_t num_samples, + ShuffleMode shuffle, int num_shards, int shard_id); + + /// \brief Destructor + ~CLUEDataset() = default; + + /// \brief a base class override function to create the required runtime dataset op objects for this class + /// \return The list of shared pointers to the newly created DatasetOps + std::vector> Build() override; + + /// \brief Parameters validation + /// \return bool true if all the params are valid + bool ValidateParams() override; + + private: + /// \brief Split string based on a character delimiter + /// \return A string vector + std::vector split(const std::string &s, char delim); + + std::vector dataset_files_; + std::string task_; + std::string usage_; + int64_t num_samples_; + ShuffleMode shuffle_; + int num_shards_; + int shard_id_; +}; + class CocoDataset : public Dataset { public: /// \brief Constructor diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index f33a6ca49d..83ba7f7f6a 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -96,6 +96,7 @@ SET(DE_UT_SRCS c_api_transforms_test.cc c_api_dataset_ops_test.cc c_api_dataset_cifar_test.cc + c_api_dataset_clue_test.cc c_api_dataset_coco_test.cc c_api_dataset_voc_test.cc c_api_datasets_test.cc diff --git a/tests/ut/cpp/dataset/c_api_dataset_clue_test.cc b/tests/ut/cpp/dataset/c_api_dataset_clue_test.cc new file mode 100644 index 0000000000..d57fae3fe2 --- /dev/null +++ b/tests/ut/cpp/dataset/c_api_dataset_clue_test.cc @@ -0,0 +1,576 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/include/datasets.h" +#include "minddata/dataset/core/global_context.h" +#include "minddata/dataset/core/config_manager.h" + +using namespace mindspore::dataset::api; +using mindspore::dataset::ShuffleMode; +using mindspore::dataset::Tensor; +using mindspore::dataset::GlobalContext; + +class MindDataTestPipeline : public UT::DatasetOpTesting { + protected: +}; + +TEST_F(MindDataTestPipeline, TestCLUEDatasetBasic) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetBasic."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 2); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + i++; + iter->GetNextRow(&row); + } + + // Expect 2 samples + EXPECT_EQ(i, 2); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetDistribution) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetDistribution."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kGlobal, 3, 0); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + i++; + iter->GetNextRow(&row); + } + + // Expect 1 samples + EXPECT_EQ(i, 1); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetAFQMC) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetAFQMC."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string train_file = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string test_file = datasets_root_path_ + "/testCLUE/afqmc/test.json"; + std::string eval_file = datasets_root_path_ + "/testCLUE/afqmc/dev.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({train_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + std::vector expected_result = { + "蚂蚁借呗等额还款能否换成先息后本", + "蚂蚁花呗说我违约了", + "帮我看看本月花呗账单结清了没" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); + + // test + usage = "test"; + expected_result = { + "借呗取消的时间", + "网商贷用什么方法转变成借呗", + "我的借呗为什么开通不了" + }; + ds = CLUE({test_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + iter->GetNextRow(&row); + EXPECT_NE(row.find("sentence1"), row.end()); + i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + iter->GetNextRow(&row); + i++; + } + iter->Stop(); + + // eval + usage = "eval"; + expected_result = { + "你有花呗吗", + "吃饭能用花呗吗", + "蚂蚁花呗支付金额有什么限制" + }; + ds = CLUE({eval_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + iter->GetNextRow(&row); + EXPECT_NE(row.find("sentence1"), row.end()); + i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + iter->GetNextRow(&row); + i++; + } + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetCMNLI) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetCMNLI."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/cmnli/train.json"; + std::string task = "CMNLI"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + std::vector expected_result = { + "你应该给这件衣服定一个价格。", + "我怎么知道他要说什么", + "向左。" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetCSL) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetCSL."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/csl/train.json"; + std::string task = "CSL"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("abst"), row.end()); + std::vector expected_result = { + "这是一段长文本", + "这是一段长文本", + "这是一段长文本" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["abst"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetIFLYTEK) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetIFLYTEK."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/iflytek/train.json"; + std::string task = "IFLYTEK"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence"), row.end()); + std::vector expected_result = { + "第一个文本", + "第二个文本", + "第三个文本" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetTNEWS) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetTNEWS."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/tnews/train.json"; + std::string task = "TNEWS"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence"), row.end()); + std::vector expected_result = { + "新闻1", + "新闻2", + "新闻3" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetWSC) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetWSC."; + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/wsc/train.json"; + std::string task = "WSC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kFalse); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("text"), row.end()); + std::vector expected_result = { + "小明呢,他在哪?", + "小红刚刚看到小明,他在操场", + "等小明回来,小张你叫他交作业" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["text"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + iter->GetNextRow(&row); + i++; + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetShuffleGlobal) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetShuffleGlobal."; + // Test CLUE Dataset with GLOBLE shuffle + + // Set configuration + uint32_t original_seed = GlobalContext::config_manager()->seed(); + uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers(); + MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers; + GlobalContext::config_manager()->set_seed(135); + GlobalContext::config_manager()->set_num_parallel_workers(4); + + // Create a CLUEFile Dataset, with single CLUE file + std::string clue_file = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file}, task, usage, 0, ShuffleMode::kGlobal); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + std::vector expected_result = { + "蚂蚁花呗说我违约了", + "帮我看看本月花呗账单结清了没", + "蚂蚁借呗等额还款能否换成先息后本" + }; + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + MS_LOG(INFO) << "Tensor text shape: " << text->shape(); + i++; + iter->GetNextRow(&row); + } + + // Expect 3 samples + EXPECT_EQ(i, 3); + + // Manually terminate the pipeline + iter->Stop(); + + // Restore configuration + GlobalContext::config_manager()->set_seed(original_seed); + GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetShuffleFiles) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetShuffleFiles."; + // Test CLUE Dataset with files shuffle, num_parallel_workers=1 + + // Set configuration + uint32_t original_seed = GlobalContext::config_manager()->seed(); + uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers(); + MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers; + GlobalContext::config_manager()->set_seed(135); + GlobalContext::config_manager()->set_num_parallel_workers(1); + + // Create a CLUE Dataset, with two text files + // Note: train.json has 3 rows + // Note: dev.json has 3 rows + // Use default of all samples + // They have the same keywords + // Set shuffle to files shuffle + std::string clue_file1 = datasets_root_path_ + "/testCLUE/afqmc/train.json"; + std::string clue_file2 = datasets_root_path_ + "/testCLUE/afqmc/dev.json"; + std::string task = "AFQMC"; + std::string usage = "train"; + std::shared_ptr ds = CLUE({clue_file1, clue_file2}, task, usage, 0, ShuffleMode::kFiles); + EXPECT_NE(ds, nullptr); + + // Create an iterator over the result of the above dataset. + // This will trigger the creation of the Execution Tree and launch it. + std::shared_ptr iter = ds->CreateIterator(); + EXPECT_NE(iter, nullptr); + + // Iterate the dataset and get each row + std::unordered_map> row; + iter->GetNextRow(&row); + + EXPECT_NE(row.find("sentence1"), row.end()); + std::vector expected_result = { + "蚂蚁借呗等额还款能否换成先息后本", + "蚂蚁花呗说我违约了", + "帮我看看本月花呗账单结清了没", + "你有花呗吗", + "吃饭能用花呗吗", + "蚂蚁花呗支付金额有什么限制" + }; + + uint64_t i = 0; + while (row.size() != 0) { + auto text = row["sentence1"]; + std::string_view sv; + text->GetItemAt(&sv, {0}); + std::string ss(sv); + // Compare against expected result + EXPECT_STREQ(ss.c_str(), expected_result[i].c_str()); + i++; + iter->GetNextRow(&row); + } + + // Expect 3 + 3 = 6 samples + EXPECT_EQ(i, 6); + + // Manually terminate the pipeline + iter->Stop(); + + // Restore configuration + GlobalContext::config_manager()->set_seed(original_seed); + GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers); +} + +TEST_F(MindDataTestPipeline, TestCLUEDatasetException) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCLUEDatasetException."; + // Create a CLUE Dataset + std::string clue_file = datasets_root_path_ + "/testCLUE/wsc/train.json"; + std::string task = "WSC"; + std::string usage = "train"; + std::string invalid_clue_file = "./NotExistFile"; + + std::shared_ptr ds0 = CLUE({}, task, usage); + EXPECT_EQ(ds0, nullptr); + + std::shared_ptr ds1 = CLUE({invalid_clue_file}, task, usage); + EXPECT_EQ(ds1, nullptr); + + std::shared_ptr ds2 = CLUE({clue_file}, "invalid_task", usage); + EXPECT_EQ(ds2, nullptr); + + std::shared_ptr ds3 = CLUE({clue_file}, task, "invalid_usage"); + EXPECT_EQ(ds3, nullptr); + + std::shared_ptr ds4 = CLUE({clue_file}, task, usage, 0, ShuffleMode::kGlobal, 2, 2); + EXPECT_EQ(ds4, nullptr); + + std::shared_ptr ds5 = CLUE({clue_file}, task, usage, -1, ShuffleMode::kGlobal); + EXPECT_EQ(ds5, nullptr); + + std::shared_ptr ds6 = CLUE({clue_file}, task, usage, 0, ShuffleMode::kGlobal, -1); + EXPECT_EQ(ds6, nullptr); + + std::shared_ptr ds7 = CLUE({clue_file}, task, usage, 0, ShuffleMode::kGlobal, 0, -1); + EXPECT_EQ(ds7, nullptr); +}