/** * Copyright 2020 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "minddata/dataset/core/client.h" #include "minddata/dataset/engine/cache/cache_client.h" #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/datasetops/cache_op.h" #include "minddata/dataset/engine/datasetops/cache_lookup_op.h" #include "minddata/dataset/engine/datasetops/cache_merge_op.h" #include "minddata/dataset/engine/datasetops/source/image_folder_op.h" #include "common/common.h" #include "gtest/gtest.h" #include "utils/log_adapter.h" #include "minddata/dataset/util/storage_container.h" // lint !e322 #include "minddata/dataset/engine/datasetops/source/random_data_op.h" #include "minddata/dataset/engine/data_schema.h" using namespace mindspore::dataset; using mindspore::LogStream; using mindspore::dataset::CacheClient; using mindspore::dataset::TaskGroup; using mindspore::ExceptionType::NoExceptionType; using mindspore::MsLogLevel::INFO; class MindDataTestCacheOp : public UT::DatasetOpTesting { public: void SetUp() override { DatasetOpTesting::SetUp(); GlobalInit(); } }; TEST_F(MindDataTestCacheOp, TestCacheServer) { Status rc; CacheClient myClient(1, 0, true); // use arbitrary session of 1, size of 0, spilling is true // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated. rc = myClient.CreateCache(1, true); EXPECT_TRUE(rc.IsOk()); std::cout << myClient << std::endl; // Create a schema using the C api's int32_t rank = 0; // not used std::unique_ptr testSchema = std::make_unique(); // 2 columns. First column is an "image" 640,480,3 TensorShape c1Shape({640, 480, 3}); ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible, rank, // not used &c1Shape); // Column 2 will just be a scalar label number TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape); testSchema->AddColumn(c1); testSchema->AddColumn(c2); std::unordered_map map; rc = testSchema->GetColumnNameMap(&map); EXPECT_TRUE(rc.IsOk()); // Test the CacheSchema api rc = myClient.CacheSchema(map); EXPECT_TRUE(rc.IsOk()); // Create a tensor, take a snapshot and restore it back, and compare. std::shared_ptr t = std::make_shared(TensorShape({2, 3}), DataType(DataType::DE_UINT64)); t->SetItemAt({0, 0}, 1); t->SetItemAt({0, 1}, 2); t->SetItemAt({0, 2}, 3); t->SetItemAt({1, 0}, 4); t->SetItemAt({1, 1}, 5); t->SetItemAt({1, 2}, 6); std::cout << *t << std::endl; TensorTable tbl; TensorRow row; row.push_back(t); int64_t row_id; rc = myClient.WriteRow(row, &row_id); EXPECT_TRUE(rc.IsOk()); // Switch off build phase. rc = myClient.BuildPhaseDone(); EXPECT_TRUE(rc.IsOk()); // Now restore from cache. row.clear(); rc = myClient.GetRows({row_id}, &tbl); row = tbl.front(); EXPECT_TRUE(rc.IsOk()); auto r = row.front(); std::cout << *r << std::endl; // Compare bool cmp = (*t == *r); EXPECT_TRUE(cmp); // Get back the schema and verify std::unordered_map map_out; rc = myClient.FetchSchema(&map_out); EXPECT_TRUE(rc.IsOk()); cmp = (map_out == map); EXPECT_TRUE(cmp); // Test Purge and Destroy rc = myClient.PurgeCache(); EXPECT_TRUE(rc.IsOk()); rc = myClient.DestroyCache(); EXPECT_TRUE(rc.IsOk()); } TEST_F(MindDataTestCacheOp, TestConcurrencyRequest) { // Clear the rc of the master thread if any (void)TaskManager::GetMasterThreadRc(); TaskGroup vg; Status rc; CacheClient myClient(1, 1, true); // use arbitrary session of 1, size 1, spilling is true // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated. rc = myClient.CreateCache(1, true); EXPECT_TRUE(rc.IsOk()); std::cout << myClient << std::endl; std::shared_ptr t = std::make_shared(TensorShape({2, 3}), DataType(DataType::DE_UINT64)); t->SetItemAt({0, 0}, 1); t->SetItemAt({0, 1}, 2); t->SetItemAt({0, 2}, 3); t->SetItemAt({1, 0}, 4); t->SetItemAt({1, 1}, 5); t->SetItemAt({1, 2}, 6); TensorTable tbl; TensorRow row; row.push_back(t); // Cache tensor row t 5000 times using 10 threads. for (auto k = 0; k < 10; ++k) { Status vg_rc = vg.CreateAsyncTask("Test agent", [&myClient, &row]() -> Status { TaskManager::FindMe()->Post(); for (auto i = 0; i < 500; i++) { RETURN_IF_NOT_OK(myClient.WriteRow(row)); } return Status::OK(); }); EXPECT_TRUE(vg_rc.IsOk()); } ASSERT_TRUE(vg.join_all().IsOk()); ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOk()); rc = myClient.BuildPhaseDone(); ASSERT_TRUE(rc.IsOk()); // Get statistics from the server. CacheClient::ServiceStat stat{}; rc = myClient.GetStat(&stat); ASSERT_TRUE(rc.IsOk()); std::cout << stat.min_row_id << ":" << stat.max_row_id << ":" << stat.num_mem_cached << ":" << stat.num_disk_cached << "\n"; // Expect there are 5000 rows there. EXPECT_EQ(5000, stat.max_row_id - stat.min_row_id + 1); // Get them all back using row id and compare with tensor t. for (auto i = stat.min_row_id; i <= stat.max_row_id; ++i) { tbl.clear(); row.clear(); rc = myClient.GetRows({i}, &tbl); EXPECT_TRUE(rc.IsOk()); row = tbl.front(); auto r = row.front(); bool cmp = (*t == *r); EXPECT_TRUE(cmp); } rc = myClient.DestroyCache(); EXPECT_TRUE(rc.IsOk()); } // Simple test with a repeated cache op over random data producer // // RepeatOp // | // CacheOp // | // RandomDataOp // TEST_F(MindDataTestCacheOp, TestRandomDataCache1) { Status rc; int32_t rank = 0; // not used MS_LOG(INFO) << "UT test TestRandomDataCache1"; // Start with an empty execution tree auto myTree = std::make_shared(); // Create a schema using the C api's std::unique_ptr testSchema = std::make_unique(); // 2 columns. First column is an "image" 640,480,3 TensorShape c1Shape({640, 480, 3}); ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible, rank, // not used &c1Shape); // Column 2 will just be a scalar label number TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape); testSchema->AddColumn(c1); testSchema->AddColumn(c2); // RandomDataOp std::shared_ptr myRandomDataOp; rc = RandomDataOp::Builder() .SetRowsPerBuffer(4) .SetNumWorkers(4) .SetDataSchema(std::move(testSchema)) .SetTotalRows(50) // 50 samples for now .Build(&myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); // CacheOp // size of 0, spilling is true std::shared_ptr myClient = std::make_shared(1, 0, true); std::shared_ptr myCacheOp; int64_t num_samples = 0; int64_t start_index = 0; auto seq_sampler = std::make_shared(num_samples, start_index); rc = CacheOp::Builder() .SetNumWorkers(5) .SetClient(myClient) .SetRowsPerBuffer(4) .SetSampler(std::move(seq_sampler)) .Build(&myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myCacheOp); EXPECT_TRUE(rc.IsOk()); // RepeatOp uint32_t numRepeats = 4; std::shared_ptr myRepeatOp; rc = RepeatOp::Builder(numRepeats).Build(&myRepeatOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRepeatOp); EXPECT_TRUE(rc.IsOk()); // Assign tree relations and root rc = myRepeatOp->AddChild(myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myCacheOp->AddChild(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssignRoot(myRepeatOp); EXPECT_TRUE(rc.IsOk()); MS_LOG(INFO) << "Launching tree and begin iteration"; rc = myTree->Prepare(); EXPECT_TRUE(rc.IsOk()); // quick check to see what tree looks like std::ostringstream ss; ss << *myTree; // some funny const error if I try to write directly to ms log stream MS_LOG(INFO) << "Here's the tree:\n" << ss.str(); std::cout << *myClient << std::endl; rc = myTree->Launch(); EXPECT_TRUE(rc.IsOk()); // Start the loop of reading tensors from our pipeline DatasetIterator dI(myTree); TensorRow tensorList; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); int rowCount = 0; while (!tensorList.empty()) { // Don't display these rows, just count them MS_LOG(INFO) << "Row fetched #: " << rowCount; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); rowCount++; } ASSERT_EQ(rowCount, 200); rc = myClient->DestroyCache(); EXPECT_TRUE(rc.IsOk()); } //// Simple test with a repeated cache op over random data producer. //// This one will exceed memory and require a spill. //// //// RepeatOp //// | //// CacheOp //// | //// RandomDataOp //// TEST_F(MindDataTestCacheOp, TestRandomDataCacheSpill) { Status rc; int32_t rank = 0; // not used MS_LOG(INFO) << "UT test TestRandomDataCacheSpill"; // Start with an empty execution tree auto myTree = std::make_shared(); // Create a schema using the C api's std::unique_ptr testSchema = std::make_unique(); // 2 columns. First column is an "image" 640,480,3 TensorShape c1Shape({640, 480, 3}); ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible, rank, // not used &c1Shape); // Column 2 will just be a scalar label number TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape); testSchema->AddColumn(c1); testSchema->AddColumn(c2); // RandomDataOp std::shared_ptr myRandomDataOp; rc = RandomDataOp::Builder() .SetRowsPerBuffer(2) .SetNumWorkers(4) .SetDataSchema(std::move(testSchema)) .SetTotalRows(10) .Build(&myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); // CacheOp int64_t num_samples = 0; int64_t start_index = 0; auto seq_sampler = std::make_shared(num_samples, start_index); std::shared_ptr myClient = std::make_shared(1, 4, true); std::shared_ptr myCacheOp; rc = CacheOp::Builder() .SetNumWorkers(4) .SetClient(myClient) .SetRowsPerBuffer(3) .SetSampler(std::move(seq_sampler)) .Build(&myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myCacheOp); EXPECT_TRUE(rc.IsOk()); // RepeatOp uint32_t numRepeats = 4; std::shared_ptr myRepeatOp; rc = RepeatOp::Builder(numRepeats).Build(&myRepeatOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRepeatOp); EXPECT_TRUE(rc.IsOk()); // Assign tree relations and root rc = myRepeatOp->AddChild(myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myCacheOp->AddChild(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssignRoot(myRepeatOp); EXPECT_TRUE(rc.IsOk()); MS_LOG(INFO) << "Launching tree and begin iteration"; rc = myTree->Prepare(); EXPECT_TRUE(rc.IsOk()); std::cout << *myClient << std::endl; rc = myTree->Launch(); EXPECT_TRUE(rc.IsOk()); // Start the loop of reading tensors from our pipeline DatasetIterator dI(myTree); TensorRow tensorList; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); int rowCount = 0; while (!tensorList.empty()) { // Don't display these rows, just count them MS_LOG(INFO) << "Row fetched #: " << rowCount; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); rowCount++; } ASSERT_EQ(rowCount, 40); rc = myClient->DestroyCache(); EXPECT_TRUE(rc.IsOk()); } TEST_F(MindDataTestCacheOp, TestImageFolderCacheMerge) { Status rc; int64_t num_samples = 0; int64_t start_index = 0; auto seq_sampler = std::make_shared(num_samples, start_index); std::shared_ptr myClient = std::make_shared(1, 0, true); std::shared_ptr myMergeOp; rc = CacheMergeOp::Builder().SetNumWorkers(3).SetOpConnectorSize(3).SetNumCleaner(2).SetClient(myClient).Build( &myMergeOp); EXPECT_TRUE(rc.IsOk()); std::shared_ptr myLookupOp; rc = CacheLookupOp::Builder() .SetNumWorkers(3) .SetOpConnectorSize(3) .SetClient(myClient) .SetSampler(seq_sampler) .Build(&myLookupOp); EXPECT_TRUE(rc.IsOk()); std::shared_ptr so; ImageFolderOp::Builder builder; builder.SetSampler(myLookupOp) .SetOpConnectorSize(3) .SetNumWorkers(3) .SetRowsPerBuffer(2) .SetExtensions({".jpg", ".JPEG"}) .SetRecursive(true) .SetImageFolderDir(datasets_root_path_ + "/testPK/data"); rc = builder.Build(&so); EXPECT_TRUE(rc.IsOk()); // RepeatOp uint32_t numRepeats = 4; std::shared_ptr myRepeatOp; rc = RepeatOp::Builder(numRepeats).Build(&myRepeatOp); EXPECT_TRUE(rc.IsOk()); auto myTree = std::make_shared(); rc = myTree->AssociateNode(so); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myLookupOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myMergeOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRepeatOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssignRoot(myRepeatOp); EXPECT_TRUE(rc.IsOk()); rc = myRepeatOp->AddChild(myMergeOp); EXPECT_TRUE(rc.IsOk()); rc = myMergeOp->AddChild(myLookupOp); EXPECT_TRUE(rc.IsOk()); rc = myMergeOp->AddChild(so); EXPECT_TRUE(rc.IsOk()); rc = myTree->Prepare(); EXPECT_TRUE(rc.IsOk()); rc = myTree->Launch(); EXPECT_TRUE(rc.IsOk()); // Start the loop of reading tensors from our pipeline DatasetIterator dI(myTree); TensorRow tensorList; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); int rowCount = 0; while (!tensorList.empty()) { rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); if (rc.IsError()) { std::cout << rc << std::endl; break; } rowCount++; } ASSERT_EQ(rowCount, 176); std::cout << "Row count : " << rowCount << std::endl; rc = myClient->DestroyCache(); EXPECT_TRUE(rc.IsOk()); } //// Simple test with a repeated cache op over random data producer. //// The difference in this one is that you do not add the sampler to the cache op directly. //// Instead, the sampler is added as part of the leaf op construction. Then, the prepare //// phase will pull this up from the leaf and into the cache. //// It removes the sampler from the leaf op, which doesn't make sense there anyway for //// the RandomDataOp which doesn't support sampling without a cache. //// //// RepeatOp //// | //// CacheOp //// | //// RandomDataOp //// TEST_F(MindDataTestCacheOp, TestCacheInheritSampler) { Status rc; int32_t rank = 0; // not used MS_LOG(INFO) << "UT test TestCacheInheritSampler"; int64_t num_samples = 0; int64_t start_index = 0; auto seq_sampler = std::make_shared(num_samples, start_index); // Start with an empty execution tree auto myTree = std::make_shared(); // Create a schema using the C api's std::unique_ptr testSchema = std::make_unique(); // 2 columns. First column is an "image" 640,480,3 TensorShape c1Shape({640, 480, 3}); ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible, rank, // not used &c1Shape); // Column 2 will just be a scalar label number TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape); testSchema->AddColumn(c1); testSchema->AddColumn(c2); // RandomDataOp std::shared_ptr myRandomDataOp; rc = RandomDataOp::Builder() .SetRowsPerBuffer(2) .SetNumWorkers(4) .SetDataSchema(std::move(testSchema)) .SetTotalRows(10) .SetSampler(std::move(seq_sampler)) .Build(&myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); // CacheOp std::shared_ptr myClient = std::make_shared(1, 4, true); std::shared_ptr myCacheOp; rc = CacheOp::Builder().SetNumWorkers(4).SetClient(myClient).SetRowsPerBuffer(3).Build(&myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myCacheOp); EXPECT_TRUE(rc.IsOk()); // RepeatOp uint32_t numRepeats = 4; std::shared_ptr myRepeatOp; rc = RepeatOp::Builder(numRepeats).Build(&myRepeatOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssociateNode(myRepeatOp); EXPECT_TRUE(rc.IsOk()); // Assign tree relations and root rc = myRepeatOp->AddChild(myCacheOp); EXPECT_TRUE(rc.IsOk()); rc = myCacheOp->AddChild(myRandomDataOp); EXPECT_TRUE(rc.IsOk()); rc = myTree->AssignRoot(myRepeatOp); EXPECT_TRUE(rc.IsOk()); MS_LOG(INFO) << "Launching tree and begin iteration"; rc = myTree->Prepare(); EXPECT_TRUE(rc.IsOk()); std::cout << *myClient << std::endl; rc = myTree->Launch(); EXPECT_TRUE(rc.IsOk()); // Start the loop of reading tensors from our pipeline DatasetIterator dI(myTree); TensorRow tensorList; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); int rowCount = 0; while (!tensorList.empty()) { // Don't display these rows, just count them MS_LOG(INFO) << "Row fetched #: " << rowCount; rc = dI.FetchNextTensorRow(&tensorList); EXPECT_TRUE(rc.IsOk()); rowCount++; } ASSERT_EQ(rowCount, 40); rc = myClient->DestroyCache(); EXPECT_TRUE(rc.IsOk()); }