From 9c3c36217e5b632698e1df702a995a93257515f8 Mon Sep 17 00:00:00 2001 From: Mahdi Date: Wed, 13 Jan 2021 18:02:39 -0500 Subject: [PATCH] Added pull based prototype --- .../ccsrc/minddata/dataset/api/datasets.cc | 17 +++ .../ccsrc/minddata/dataset/api/iterator.cc | 60 +++++++++- .../minddata/dataset/engine/CMakeLists.txt | 27 +++-- .../consumers/pull_based_tree_consumer.cc | 106 ++++++++++++++++++ .../consumers/pull_based_tree_consumer.h | 69 ++++++++++++ .../dataset/engine/datasetops/batch_op.cc | 34 +++++- .../dataset/engine/datasetops/batch_op.h | 4 + .../dataset/engine/datasetops/dataset_op.cc | 5 + .../dataset/engine/datasetops/dataset_op.h | 2 + .../dataset/engine/datasetops/project_op.cc | 12 ++ .../dataset/engine/datasetops/project_op.h | 2 + .../engine/datasetops/source/album_op.cc | 23 +++- .../engine/datasetops/source/album_op.h | 3 + .../dataset/engine/tree_adapter_lite.cc | 64 +++++++++++ .../dataset/engine/tree_adapter_lite.h | 58 ++++++++++ .../ccsrc/minddata/dataset/include/datasets.h | 8 ++ .../ccsrc/minddata/dataset/include/iterator.h | 32 +++++- mindspore/lite/minddata/CMakeLists.txt | 2 + tests/ut/cpp/dataset/CMakeLists.txt | 1 + tests/ut/cpp/dataset/c_api_pull_based_test.cc | 83 ++++++++++++++ 20 files changed, 596 insertions(+), 16 deletions(-) create mode 100644 mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc create mode 100644 mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h create mode 100644 mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc create mode 100644 mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h create mode 100644 tests/ut/cpp/dataset/c_api_pull_based_test.cc diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index fb6a03bdda..215884d071 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -129,6 +129,23 @@ std::shared_ptr Dataset::CreateIteratorCharIF(std::vector Dataset::CreatePullBasedIterator(std::vector> columns) { + // The specified columns will be selected from the dataset and passed down the pipeline + // in the order specified, other columns will be discarded. + // This code is not in a try/catch block because there is no execution tree class that will be created. + auto ds = shared_from_this(); + if (!VectorCharToString(columns).empty()) { + ds = ds->Project(VectorCharToString(columns)); + } + + std::shared_ptr iter = std::make_shared(); + Status rc = iter->BuildAndLaunchTree(ds); + if (rc.IsError()) MS_LOG(ERROR) << "CreateIterator: Iterator exception caught: " << rc; + RETURN_SECOND_IF_ERROR(rc, nullptr); + return iter; +} + #ifndef ENABLE_ANDROID // Function to return a transferred Node that transfers data through a device. bool Dataset::DeviceQueueCharIF(const std::vector &queue_name, const std::vector &device_type, diff --git a/mindspore/ccsrc/minddata/dataset/api/iterator.cc b/mindspore/ccsrc/minddata/dataset/api/iterator.cc index 5ba8e83f44..096c91ec3b 100644 --- a/mindspore/ccsrc/minddata/dataset/api/iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/api/iterator.cc @@ -1,5 +1,5 @@ /** - * Copyright 2020 Huawei Technologies Co., Ltd + * Copyright 2020-2021 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ #include "minddata/dataset/include/iterator.h" #include "minddata/dataset/core/client.h" +#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h" #include "minddata/dataset/engine/consumers/tree_consumer.h" #include "minddata/dataset/engine/runtime_context.h" #include "minddata/dataset/include/datasets.h" @@ -65,9 +66,11 @@ Status Iterator::GetNextRow(MSTensorVec *row) { // Shut down the data pipeline. void Iterator::Stop() { - Status rc = runtime_context_->Terminate(); - if (rc.IsError()) { - MS_LOG(ERROR) << rc.ToString(); + if (runtime_context_ != nullptr) { + Status rc = runtime_context_->Terminate(); + if (rc.IsError()) { + MS_LOG(ERROR) << rc.ToString(); + } } } @@ -82,5 +85,54 @@ Status Iterator::BuildAndLaunchTree(std::shared_ptr ds, int32_t num_epo return Status::OK(); } +PullIterator::PullIterator() : pull_consumer_(nullptr) {} +// Get the next row from the data pipeline. +Status PullIterator::GetRows(int32_t num_rows, std::vector *row) { + for (int i = 0; i < num_rows; i++) { + std::vector> md_row; + Status rc = pull_consumer_->GetNextAsVector(&md_row); + + if (rc.IsError()) { + row->clear(); + MS_LOG(ERROR) << "GetNextRow: Failed to get next row. Error status: " << rc; + return rc; + } + + MSTensorVec ms_row = {}; + for (auto de_tensor : md_row) { + CHECK_FAIL_RETURN_UNEXPECTED(de_tensor->HasData(), "Apply transform failed, output tensor has no data"); + ms_row.push_back(mindspore::MSTensor(std::make_shared(de_tensor))); + } + row->push_back(ms_row); + } + return Status::OK(); +} + +Status PullIterator::GetNextRow(MSTensorVec *row) { + CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr."); + std::vector> md_row; + Status rc = pull_consumer_->GetNextAsVector(&md_row); + if (rc.IsError()) { + row->clear(); + MS_LOG(ERROR) << "GetNextRow: Failed to get next row. Error status: " << rc; + return rc; + } + + for (auto de_tensor : md_row) { + CHECK_FAIL_RETURN_UNEXPECTED(de_tensor->HasData(), "Apply transform failed, output tensor has no data"); + row->push_back(mindspore::MSTensor(std::make_shared(de_tensor))); + } + return Status::OK(); +} + +// Function to build and launch the execution tree. This function kicks off a different type of consumer +// for the tree, the reason why this is the case is due to the fact that PullBasedIterator does not need +// to instantiate threads for each op. As such, the call to the consumer will by pass the execution tree. +Status PullIterator::BuildAndLaunchTree(std::shared_ptr ds) { + if (pull_consumer_ == nullptr) pull_consumer_ = std::make_unique(); + RETURN_IF_NOT_OK(pull_consumer_->Init(std::move(ds->IRNode()))); + return Status::OK(); +} + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt index 9aa001cab8..f329f59813 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt @@ -6,7 +6,7 @@ add_subdirectory(perf) add_subdirectory(cache) if(ENABLE_TDTQUE) - add_subdirectory(tdt) + add_subdirectory(tdt) endif() file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") @@ -17,27 +17,36 @@ set(SRC_FILES_LIST data_schema.cc dataset_iterator.cc tree_adapter.cc + tree_adapter_lite.cc runtime_context.cc python_runtime_context.cc + consumers/pull_based_tree_consumer.cc consumers/tree_consumer.cc serdes.cc ) if(ENABLE_PYTHON) - set(SRC_FILES_LIST - ${SRC_FILES_LIST} - python_runtime_context.cc - consumers/python_tree_consumer.cc - ) + set(SRC_FILES_LIST + ${SRC_FILES_LIST} + python_runtime_context.cc + consumers/python_tree_consumer.cc + ) endif() add_library(engine OBJECT ${SRC_FILES_LIST}) if(ENABLE_PYTHON) - target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS}) + target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS}) endif() -add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf engine-cache-client engine-datasetops-mapop) +add_dependencies(engine engine-datasetops + engine-datasetops-source + engine-opt + engine-gnn + engine-perf + engine-cache-client + engine-datasetops-mapop + ) if(ENABLE_TDTQUE) - add_dependencies(engine engine-tdt) + add_dependencies(engine engine-tdt) endif() diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc new file mode 100644 index 0000000000..8f23c80eb8 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.cc @@ -0,0 +1,106 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h" + +namespace mindspore::dataset { + +PullBasedIteratorConsumer::PullBasedIteratorConsumer() { tree_adapter_lite_ = std::make_unique(); } + +Status PullBasedIteratorConsumer::Init(std::shared_ptr root) { + return tree_adapter_lite_->BuildTree(std::move(root)); +} + +std::vector PullBasedIteratorConsumer::GetRows(int64_t num_rows) { + std::vector rows; + for (int i = 0; i < num_rows; i++) { + TensorRow row; + RETURN_SECOND_IF_ERROR(tree_adapter_lite_->GetNextRow(&row), {}); + if (row.empty()) break; + rows.push_back(row); + } + + return rows; +} + +Status PullBasedIteratorConsumer::GetNextAsVector(std::vector *out) { + RETURN_UNEXPECTED_IF_NULL(out); + out->clear(); + + TensorRow res; + RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res)); + + // Return empty vector if there's no data + RETURN_OK_IF_TRUE(res.empty()); + + std::copy(res.begin(), res.end(), std::back_inserter(*out)); + return Status::OK(); +} + +Status PullBasedIteratorConsumer::GetNextAsMap(std::unordered_map *const out_map) { + RETURN_UNEXPECTED_IF_NULL(out_map); + out_map->clear(); + + TensorRow res; + RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res)); + + // Return empty map if there's no data + RETURN_OK_IF_TRUE(res.empty()); + + // Populate the out map from the row and return it + for (const auto &colMap : tree_adapter_lite_->GetColumnNameMap()) { + (*out_map)[colMap.first] = std::move(res[colMap.second]); + } + return Status::OK(); +} + +Status PullBasedIteratorConsumer::GetNextAsOrderedPair( + std::vector>> *const vec) { + std::unique_ptr tree_adapter = std::make_unique(); + CHECK_FAIL_RETURN_UNEXPECTED(vec != nullptr && vec->empty(), "vec is null or non-empty."); + + TensorRow curr_row; + + RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&curr_row)); + RETURN_OK_IF_TRUE(curr_row.empty()); + size_t num_cols = curr_row.size(); // num_cols is non-empty. + // order the column names according to their ids + if (column_order_.empty()) { + const int32_t invalid_col_id = -1; + column_order_.resize(num_cols, {std::string(), invalid_col_id}); + for (const auto &itr : tree_adapter->GetColumnNameMap()) { + int32_t ind = itr.second; + CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds."); + column_order_[ind] = std::make_pair(itr.first, ind); + } + // error check, make sure the ids in col_name_id_map are continuous and starts from 0 + for (const auto &col : column_order_) { + CHECK_FAIL_RETURN_UNEXPECTED(col.second != invalid_col_id, "column ids are not continuous."); + } + } + + vec->reserve(num_cols); + + std::transform(column_order_.begin(), column_order_.end(), std::back_inserter(*vec), + [curr_row](const auto &col) { return std::make_pair(col.first, curr_row[col.second]); }); + + return Status::OK(); +} +} // namespace mindspore::dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h new file mode 100644 index 0000000000..ac7ef6a1be --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/pull_based_tree_consumer.h @@ -0,0 +1,69 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_ + +#include +#include +#include +#include +#include +#include +#include "minddata/dataset/engine/tree_adapter_lite.h" + +namespace mindspore::dataset { + +class TreeAdapterLite; +class TensorRow; + +/// Consumer that iterates over the dataset and returns the rows one by one as a in a pull based fashion +class PullBasedIteratorConsumer { + public: + /// Constructor + PullBasedIteratorConsumer(); + + ~PullBasedIteratorConsumer() = default; + + Status Init(std::shared_ptr root); + + /// \brief Returns the next row in a vector format + /// \note This is currently a placeholder function + /// \param[in] num_rows the number of rows that we want to get + /// \param[out] out std::vector of TensorRows + /// \return Status error code + std::vector GetRows(int64_t num_rows); + + /// Returns the next row in a vector format + /// \param[out] out std::vector of Tensors + /// \return Status error code + Status GetNextAsVector(std::vector *out); + + /// Returns the next row in as a map + /// \param[out] out std::map of string to Tensor + /// \return Status error code + Status GetNextAsMap(std::unordered_map *out); + + /// Returns the next row in as a vector + /// \param[out] out std::vector of pairs of string to Tensor + /// \return Status error code + Status GetNextAsOrderedPair(std::vector>> *vec); + + private: + std::unique_ptr tree_adapter_lite_; + std::vector> column_order_; // key: column name, val: column id +}; +} // namespace mindspore::dataset +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index eb711b3344..6920cbd1d2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -74,7 +74,9 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, out_col_names_(out_col), batch_size_func_(batch_size_func), batch_map_func_(batch_map_func), - pad_info_(pad_map) { + pad_info_(pad_map), + batch_num_(0), + batch_cnt_(0) { worker_queues_.Init(num_workers, op_queue_size); } // if PYTHON is disabled. per_batch_map can't be used @@ -560,5 +562,35 @@ int64_t BatchOp::GetTreeBatchSize() { return start_batch_size_; } +Status BatchOp::GetNextRow(TensorRow *row) { + std::unique_ptr table = std::make_unique(); + child_iterator_ = std::make_unique(this, 0, 0); + int32_t cur_batch_size = 0; + RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_))); + for (int i = 0; i < cur_batch_size; i++) { + TensorRow new_row; + RETURN_IF_NOT_OK(child_[0]->GetNextRow(&new_row)); + if (!new_row.empty()) { + table->emplace_back(new_row); + if (table->size() == static_cast(cur_batch_size)) break; + } else { + if (drop_ || table->empty()) { + table = std::make_unique(); // this drops when drop == true + } + } + } + std::unique_ptr out = std::make_unique(); + RETURN_UNEXPECTED_IF_NULL(table); + if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); // do padding if needed + if (!table->empty()) { + RETURN_IF_NOT_OK(BatchRows(&table, &out, table->size())); + CHECK_FAIL_RETURN_UNEXPECTED(out->size() == 1, "Batch returned 2 rows while 1 row was expected."); + *row = out->back(); + batch_cnt_++; + batch_num_++; + } + return Status::OK(); +} + } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h index db82de9cbd..196e125a30 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h @@ -259,6 +259,8 @@ class BatchOp : public ParallelOp { // @return Status The status code returned Status LaunchThreadsAndInitOp(); + Status GetNextRow(TensorRow *row) override; + #ifdef ENABLE_PYTHON // Invoke batch size function with current BatchInfo to generate batch size. // @return Status The status code returned @@ -278,6 +280,8 @@ class BatchOp : public ParallelOp { std::unique_ptr child_iterator_; // child iterator for fetching TensorRows 1 by 1 std::unordered_map child_map_; // col_name_id_map of the child node QueueList, CBatchInfo>> worker_queues_; // internal queue for syncing worker + int64_t batch_num_; + int64_t batch_cnt_; #ifdef ENABLE_PYTHON py::function batch_size_func_; // Function pointer of batch size function py::function batch_map_func_; // Function pointer of per batch map function diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc index 5367955f78..1e0d591d83 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc @@ -252,6 +252,11 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const { } } +Status DatasetOp::GetNextRow(TensorRow *row) { + RETURN_UNEXPECTED_IF_NULL(child_[0]); + return child_[0]->GetNextRow(row); +} + // Gets the next buffer from the given child Status DatasetOp::GetNextBuffer(std::unique_ptr *p_buffer, int32_t worker_id, bool retry_if_eoe) { // pop is a blocked call and will throw an interruption if the whole group shuts down. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h index 026ab51a08..135e2da056 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h @@ -129,6 +129,8 @@ class DatasetOp : public std::enable_shared_from_this { /// \param show_all - A bool to control if you want to show all info or just a summary virtual void Print(std::ostream &out, bool show_all) const; + virtual Status GetNextRow(TensorRow *row); + /// \brief << Stream output operator overload /// \notes This allows you to write the debug print info using stream operators /// \param out - reference to the output stream being overloaded diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc index 98159b9122..413175d87c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc @@ -151,5 +151,17 @@ Status ProjectOp::ComputeColMap() { } return Status::OK(); } + +Status ProjectOp::GetNextRow(TensorRow *row) { + ComputeColMap(); + TensorRow new_row; + RETURN_IF_NOT_OK(child_[0]->GetNextRow(&new_row)); + (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row), + [&new_row](uint32_t x) { return new_row[x]; }); + // Now if columns changed after map, we don't know which column we should keep, + // so temporarily we don't support print file_path after ProjectOp. + new_row.setPath({}); + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h index 3a33430af5..d741530f6d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.h @@ -101,6 +101,8 @@ class ProjectOp : public PipelineOp { // @return Status The status code returned Status EofReceived(int32_t worker_id) override; + Status GetNextRow(TensorRow *row) override; + // Op name getter // @return Name of the current Op std::string Name() const override { return kProjectOp; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index fbe7136390..ac2cabd8c7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -81,7 +81,8 @@ AlbumOp::AlbumOp(int32_t num_wkrs, int32_t rows_per_buffer, std::string file_dir row_cnt_(0), buf_cnt_(0), sampler_ind_(0), - dirname_offset_(0) { + dirname_offset_(0), + sample_ids_(nullptr) { // Set the column name map (base class field) for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { column_name_id_map_[data_schema_->column(i).name()] = i; @@ -600,5 +601,25 @@ Status AlbumOp::ComputeColMap() { } return Status::OK(); } + +Status AlbumOp::GetNextRow(TensorRow *row) { + if (image_rows_.empty()) PrescanEntry(); + if (sample_ids_ == nullptr) { + RETURN_IF_NOT_OK(this->InitSampler()); + std::unique_ptr sample_buffer; + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_buffer)); + RETURN_IF_NOT_OK(sample_buffer->PopRow(&sample_row)); + sample_ids_ = sample_row[0]; + } + if (row_cnt_ + 1 > sample_ids_->Size()) { + return Status::OK(); + } + int64_t key; + sample_ids_->GetItemAt(&key, {row_cnt_}); + RETURN_IF_NOT_OK(LoadTensorRow(key, image_rows_[key], row)); + row_cnt_++; + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index b12946731d..63491ad9f1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -291,6 +291,8 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { /// \return Status The status code returned Status Reset() override; + Status GetNextRow(TensorRow *row) override; + // Private function for computing the assignment of the column name map. // @return Status The status code returned Status ComputeColMap() override; @@ -306,6 +308,7 @@ class AlbumOp : public ParallelOp, public RandomAccessOp { int64_t sampler_ind_; int64_t dirname_offset_; std::vector image_rows_; + TensorPtr sample_ids_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc new file mode 100644 index 0000000000..1865afbd92 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.cc @@ -0,0 +1,64 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "minddata/dataset/engine/tree_adapter_lite.h" + +namespace mindspore { +namespace dataset { + +TreeAdapterLite::TreeAdapterLite() : root_(nullptr) { tree_ = std::make_unique(); } + +Status TreeAdapterLite::BuildExecutionTreeRecur(std::shared_ptr ir, std::shared_ptr *const op) { + // Build the DatasetOp ExecutionTree from the optimized IR tree + std::vector> ops; + RETURN_IF_NOT_OK(ir->Build(&ops)); + + CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node."); + + (*op) = ops.front(); // return the first op to be added as child by the caller of this function + + RETURN_IF_NOT_OK(tree_->AssociateNode(*op)); + + for (size_t i = 1; i < ops.size(); i++) { + RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i])); + RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i])); + } + + // Build the children of IR, once they return, add the return value to *op + for (std::shared_ptr child_ir : ir->Children()) { + std::shared_ptr child_op; + RETURN_IF_NOT_OK(BuildExecutionTreeRecur(child_ir, &child_op)); + RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops + } + + return Status::OK(); +} + +Status TreeAdapterLite::BuildTree(std::shared_ptr root_ir) { + RETURN_UNEXPECTED_IF_NULL(root_ir); + RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir, &root_)); + RETURN_IF_NOT_OK(tree_->AssignRoot(root_)); + return Status::OK(); +} + +Status TreeAdapterLite::GetNextRow(TensorRow *row) { + RETURN_UNEXPECTED_IF_NULL(root_); + RETURN_IF_NOT_OK(root_->GetNextRow(row)); + return Status::OK(); +} + +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h new file mode 100644 index 0000000000..c42da20718 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter_lite.h @@ -0,0 +1,58 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_ + +#include +#include +#include +#include +#include + +#include "minddata/dataset/engine/ir/datasetops/dataset_node.h" + +namespace mindspore { +namespace dataset { + +class TensorRow; +class DatasetNode; + +class TreeAdapterLite { + public: + TreeAdapterLite(); + + ~TreeAdapterLite() = default; + + Status BuildTree(std::shared_ptr root_ir); + + // Get rows equal to num_rows + Status GetNextRow(TensorRow *row); + + std::unordered_map GetColumnNameMap() const { return tree_->root()->column_name_id_map(); } + + private: + // This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree. + Status BuildExecutionTreeRecur(std::shared_ptr ir, std::shared_ptr *op); + + std::shared_ptr root_; // current connector capacity of root op, used for profiling + std::unique_ptr tree_; +}; + +} // namespace dataset +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_ diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index 58720314c9..927e0c7d75 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -36,6 +36,7 @@ #include "minddata/dataset/include/tensor.h" #include "minddata/dataset/include/text.h" #include "minddata/dataset/include/type_id.h" +#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h" namespace mindspore { namespace dataset { @@ -44,6 +45,7 @@ class Tensor; class TensorRow; class TensorShape; class TreeAdapter; +class TreeAdapterLite; class TreeGetters; class Vocab; @@ -51,6 +53,7 @@ class DatasetCache; class DatasetNode; class Iterator; +class PullBasedIteratorConsumer; class TensorOperation; class SchemaObj; @@ -134,6 +137,11 @@ class Dataset : public std::enable_shared_from_this { /// \return Shared pointer to the original object std::shared_ptr SetNumWorkers(int32_t num_workers); + /// \brief Function to create an PullBasedIterator over the Dataset + /// \param[in] columns List of columns to be used to specify the order of columns + /// \return Shared pointer to the Iterator + std::shared_ptr CreatePullBasedIterator(std::vector> columns = {}); + /// \brief Function to create an Iterator over the Dataset pipeline /// \param[in] columns List of columns to be used to specify the order of columns /// \param[in] num_epochs Number of epochs to run through the pipeline, default -1 which means infinite epochs. diff --git a/mindspore/ccsrc/minddata/dataset/include/iterator.h b/mindspore/ccsrc/minddata/dataset/include/iterator.h index b69cb33a1c..799dc1d877 100644 --- a/mindspore/ccsrc/minddata/dataset/include/iterator.h +++ b/mindspore/ccsrc/minddata/dataset/include/iterator.h @@ -37,6 +37,7 @@ class Tensor; class NativeRuntimeContext; class IteratorConsumer; +class PullBasedIteratorConsumer; class Dataset; @@ -80,7 +81,7 @@ class Iterator { /// \note Type of return data is a vector(without column name). /// \param[out] row - the output tensor row. /// \return - a Status error code, returns OK if no error encountered. - Status GetNextRow(MSTensorVec *row); + virtual Status GetNextRow(MSTensorVec *row); /// \brief Function to shut down the data pipeline. void Stop(); @@ -131,6 +132,35 @@ class Iterator { std::unique_ptr runtime_context_; IteratorConsumer *consumer_; }; + +class PullIterator : public Iterator { + public: + /// \brief Constructor + PullIterator(); + + /// \brief Function to get next row from the data pipeline. + /// \note Type of return data is a vector(without column name). + /// \param[out] row - the output tensor row. + /// \return Returns true if no error encountered else false. + Status GetNextRow(MSTensorVec *row) override; + + /// \brief Function to get specified rows from the data pipeline. + /// \note Type of return data is a vector(without column name). + /// \note This behavior is subject to change + /// \param[in] num_rows - the number of rows to fetch. + /// \param[out] row - the output tensor row. + /// \return Returns true if no error encountered else false. + Status GetRows(int32_t num_rows, std::vector *row); + + /// \brief Method for building and launching the pipeline. + /// \note Consider making this function protected. + /// \param[in] ds - The root node that calls the function + /// \return - a Status error code, returns OK if no error encountered. + Status BuildAndLaunchTree(std::shared_ptr ds); + + private: + std::unique_ptr pull_consumer_; +}; } // namespace dataset } // namespace mindspore #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_INCLUDE_ITERATOR_H_ diff --git a/mindspore/lite/minddata/CMakeLists.txt b/mindspore/lite/minddata/CMakeLists.txt index 4b32e8f1cc..38bc9e3134 100644 --- a/mindspore/lite/minddata/CMakeLists.txt +++ b/mindspore/lite/minddata/CMakeLists.txt @@ -125,6 +125,8 @@ if(BUILD_MINDDATA STREQUAL "full") ${MINDDATA_DIR}/core/tensor.cc ${MINDDATA_DIR}/core/global_context.cc ${MINDDATA_DIR}/core/client.cc + ${MINDDATA_DIR}/engine/tree_adapter_lite.cc + ${MINDDATA_DIR}/engine/consumers/pull_based_tree_consumer.cc ${MINDDATA_DIR}/engine/consumers/tree_consumer.cc ${MINDDATA_DIR}/engine/ir/datasetops/dataset_node.cc ${MINDDATA_DIR}/engine/ir/datasetops/epoch_ctrl_node.cc diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index f046bbe0fa..18e6598e5a 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -31,6 +31,7 @@ SET(DE_UT_SRCS c_api_dataset_voc_test.cc c_api_datasets_test.cc c_api_epoch_ctrl_test.cc + c_api_pull_based_test.cc c_api_repeat_test.cc c_api_samplers_test.cc c_api_text_sentence_piece_vocab_test.cc diff --git a/tests/ut/cpp/dataset/c_api_pull_based_test.cc b/tests/ut/cpp/dataset/c_api_pull_based_test.cc new file mode 100644 index 0000000000..5f7994f140 --- /dev/null +++ b/tests/ut/cpp/dataset/c_api_pull_based_test.cc @@ -0,0 +1,83 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "minddata/dataset/include/datasets.h" + +namespace common = mindspore::common; + +using namespace mindspore::dataset; + +class MindDataTestPipeline : public UT::DatasetOpTesting { + protected: +}; + +TEST_F(MindDataTestPipeline, TestPullBasedBatch) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic."; + + std::string folder_path = datasets_root_path_ + "/testAlbum/images"; + std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json"; + std::vector column_names = {"label"}; + // Create a Album Dataset + std::shared_ptr ds = Album(folder_path, schema_file, column_names); + EXPECT_NE(ds, nullptr); + + int32_t batch_size = 4; + ds = ds->Batch(batch_size, true); + EXPECT_NE(ds, nullptr); + + auto iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + + std::vector row; + Status rc = iter->GetNextRow(&row); + EXPECT_EQ(rc, Status::OK()); + EXPECT_EQ(row.size(), 1); + auto temp = row[0].Shape(); + std::vector result = {batch_size, 2}; + EXPECT_EQ(row[0].Shape(), result); +} + +TEST_F(MindDataTestPipeline, TestPullBasedProject) { + MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic."; + + std::string folder_path = datasets_root_path_ + "/testAlbum/images"; + std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json"; + std::vector column_names = {"label", "image"}; + // Create a Album Dataset + std::shared_ptr ds = Album(folder_path, schema_file, column_names); + EXPECT_NE(ds, nullptr); + + std::vector row; + auto iter = ds->CreatePullBasedIterator(); + EXPECT_NE(iter, nullptr); + Status rc = iter->GetNextRow(&row); + EXPECT_EQ(rc, Status::OK()); + EXPECT_EQ(row.size(), 2); + + std::shared_ptr ds2 = Album(folder_path, schema_file, column_names); + EXPECT_NE(ds2, nullptr); + std::vector columns_to_project = {"image"}; + ds2 = ds2->Project(columns_to_project); + EXPECT_NE(ds2, nullptr); + + auto iter2 = ds2->CreatePullBasedIterator(); + EXPECT_NE(iter2, nullptr); + + std::vector new_row; + rc = iter2->GetNextRow(&new_row); + EXPECT_EQ(rc, Status::OK()); + EXPECT_EQ(new_row.size(), 1); +} \ No newline at end of file