Added pull based prototype

pull/12809/head
Mahdi 4 years ago
parent 7104e42304
commit 9c3c36217e

@ -129,6 +129,23 @@ std::shared_ptr<Iterator> Dataset::CreateIteratorCharIF(std::vector<std::vector<
return iter;
}
// Function to create the iterator, which will build and launch the execution tree.
std::shared_ptr<PullIterator> Dataset::CreatePullBasedIterator(std::vector<std::vector<char>> columns) {
// The specified columns will be selected from the dataset and passed down the pipeline
// in the order specified, other columns will be discarded.
// This code is not in a try/catch block because there is no execution tree class that will be created.
auto ds = shared_from_this();
if (!VectorCharToString(columns).empty()) {
ds = ds->Project(VectorCharToString(columns));
}
std::shared_ptr<PullIterator> iter = std::make_shared<PullIterator>();
Status rc = iter->BuildAndLaunchTree(ds);
if (rc.IsError()) MS_LOG(ERROR) << "CreateIterator: Iterator exception caught: " << rc;
RETURN_SECOND_IF_ERROR(rc, nullptr);
return iter;
}
#ifndef ENABLE_ANDROID
// Function to return a transferred Node that transfers data through a device.
bool Dataset::DeviceQueueCharIF(const std::vector<char> &queue_name, const std::vector<char> &device_type,

@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,6 +15,7 @@
*/
#include "minddata/dataset/include/iterator.h"
#include "minddata/dataset/core/client.h"
#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h"
#include "minddata/dataset/engine/consumers/tree_consumer.h"
#include "minddata/dataset/engine/runtime_context.h"
#include "minddata/dataset/include/datasets.h"
@ -65,9 +66,11 @@ Status Iterator::GetNextRow(MSTensorVec *row) {
// Shut down the data pipeline.
void Iterator::Stop() {
Status rc = runtime_context_->Terminate();
if (rc.IsError()) {
MS_LOG(ERROR) << rc.ToString();
if (runtime_context_ != nullptr) {
Status rc = runtime_context_->Terminate();
if (rc.IsError()) {
MS_LOG(ERROR) << rc.ToString();
}
}
}
@ -82,5 +85,54 @@ Status Iterator::BuildAndLaunchTree(std::shared_ptr<Dataset> ds, int32_t num_epo
return Status::OK();
}
PullIterator::PullIterator() : pull_consumer_(nullptr) {}
// Get the next row from the data pipeline.
Status PullIterator::GetRows(int32_t num_rows, std::vector<MSTensorVec> *row) {
for (int i = 0; i < num_rows; i++) {
std::vector<std::shared_ptr<dataset::Tensor>> md_row;
Status rc = pull_consumer_->GetNextAsVector(&md_row);
if (rc.IsError()) {
row->clear();
MS_LOG(ERROR) << "GetNextRow: Failed to get next row. Error status: " << rc;
return rc;
}
MSTensorVec ms_row = {};
for (auto de_tensor : md_row) {
CHECK_FAIL_RETURN_UNEXPECTED(de_tensor->HasData(), "Apply transform failed, output tensor has no data");
ms_row.push_back(mindspore::MSTensor(std::make_shared<DETensor>(de_tensor)));
}
row->push_back(ms_row);
}
return Status::OK();
}
Status PullIterator::GetNextRow(MSTensorVec *row) {
CHECK_FAIL_RETURN_UNEXPECTED(pull_consumer_ != nullptr, "Consumer is nullptr.");
std::vector<std::shared_ptr<dataset::Tensor>> md_row;
Status rc = pull_consumer_->GetNextAsVector(&md_row);
if (rc.IsError()) {
row->clear();
MS_LOG(ERROR) << "GetNextRow: Failed to get next row. Error status: " << rc;
return rc;
}
for (auto de_tensor : md_row) {
CHECK_FAIL_RETURN_UNEXPECTED(de_tensor->HasData(), "Apply transform failed, output tensor has no data");
row->push_back(mindspore::MSTensor(std::make_shared<DETensor>(de_tensor)));
}
return Status::OK();
}
// Function to build and launch the execution tree. This function kicks off a different type of consumer
// for the tree, the reason why this is the case is due to the fact that PullBasedIterator does not need
// to instantiate threads for each op. As such, the call to the consumer will by pass the execution tree.
Status PullIterator::BuildAndLaunchTree(std::shared_ptr<Dataset> ds) {
if (pull_consumer_ == nullptr) pull_consumer_ = std::make_unique<PullBasedIteratorConsumer>();
RETURN_IF_NOT_OK(pull_consumer_->Init(std::move(ds->IRNode())));
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -6,7 +6,7 @@ add_subdirectory(perf)
add_subdirectory(cache)
if(ENABLE_TDTQUE)
add_subdirectory(tdt)
add_subdirectory(tdt)
endif()
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
@ -17,27 +17,36 @@ set(SRC_FILES_LIST
data_schema.cc
dataset_iterator.cc
tree_adapter.cc
tree_adapter_lite.cc
runtime_context.cc
python_runtime_context.cc
consumers/pull_based_tree_consumer.cc
consumers/tree_consumer.cc
serdes.cc
)
if(ENABLE_PYTHON)
set(SRC_FILES_LIST
${SRC_FILES_LIST}
python_runtime_context.cc
consumers/python_tree_consumer.cc
)
set(SRC_FILES_LIST
${SRC_FILES_LIST}
python_runtime_context.cc
consumers/python_tree_consumer.cc
)
endif()
add_library(engine OBJECT ${SRC_FILES_LIST})
if(ENABLE_PYTHON)
target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS})
target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS})
endif()
add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf engine-cache-client engine-datasetops-mapop)
add_dependencies(engine engine-datasetops
engine-datasetops-source
engine-opt
engine-gnn
engine-perf
engine-cache-client
engine-datasetops-mapop
)
if(ENABLE_TDTQUE)
add_dependencies(engine engine-tdt)
add_dependencies(engine engine-tdt)
endif()

@ -0,0 +1,106 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <algorithm>
#include <unordered_map>
#include <vector>
#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h"
namespace mindspore::dataset {
PullBasedIteratorConsumer::PullBasedIteratorConsumer() { tree_adapter_lite_ = std::make_unique<TreeAdapterLite>(); }
Status PullBasedIteratorConsumer::Init(std::shared_ptr<DatasetNode> root) {
return tree_adapter_lite_->BuildTree(std::move(root));
}
std::vector<TensorRow> PullBasedIteratorConsumer::GetRows(int64_t num_rows) {
std::vector<TensorRow> rows;
for (int i = 0; i < num_rows; i++) {
TensorRow row;
RETURN_SECOND_IF_ERROR(tree_adapter_lite_->GetNextRow(&row), {});
if (row.empty()) break;
rows.push_back(row);
}
return rows;
}
Status PullBasedIteratorConsumer::GetNextAsVector(std::vector<TensorPtr> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
out->clear();
TensorRow res;
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res));
// Return empty vector if there's no data
RETURN_OK_IF_TRUE(res.empty());
std::copy(res.begin(), res.end(), std::back_inserter(*out));
return Status::OK();
}
Status PullBasedIteratorConsumer::GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out_map) {
RETURN_UNEXPECTED_IF_NULL(out_map);
out_map->clear();
TensorRow res;
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res));
// Return empty map if there's no data
RETURN_OK_IF_TRUE(res.empty());
// Populate the out map from the row and return it
for (const auto &colMap : tree_adapter_lite_->GetColumnNameMap()) {
(*out_map)[colMap.first] = std::move(res[colMap.second]);
}
return Status::OK();
}
Status PullBasedIteratorConsumer::GetNextAsOrderedPair(
std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) {
std::unique_ptr<TreeAdapter> tree_adapter = std::make_unique<TreeAdapter>();
CHECK_FAIL_RETURN_UNEXPECTED(vec != nullptr && vec->empty(), "vec is null or non-empty.");
TensorRow curr_row;
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&curr_row));
RETURN_OK_IF_TRUE(curr_row.empty());
size_t num_cols = curr_row.size(); // num_cols is non-empty.
// order the column names according to their ids
if (column_order_.empty()) {
const int32_t invalid_col_id = -1;
column_order_.resize(num_cols, {std::string(), invalid_col_id});
for (const auto &itr : tree_adapter->GetColumnNameMap()) {
int32_t ind = itr.second;
CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds.");
column_order_[ind] = std::make_pair(itr.first, ind);
}
// error check, make sure the ids in col_name_id_map are continuous and starts from 0
for (const auto &col : column_order_) {
CHECK_FAIL_RETURN_UNEXPECTED(col.second != invalid_col_id, "column ids are not continuous.");
}
}
vec->reserve(num_cols);
std::transform(column_order_.begin(), column_order_.end(), std::back_inserter(*vec),
[curr_row](const auto &col) { return std::make_pair(col.first, curr_row[col.second]); });
return Status::OK();
}
} // namespace mindspore::dataset

@ -0,0 +1,69 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <cstddef>
#include "minddata/dataset/engine/tree_adapter_lite.h"
namespace mindspore::dataset {
class TreeAdapterLite;
class TensorRow;
/// Consumer that iterates over the dataset and returns the rows one by one as a in a pull based fashion
class PullBasedIteratorConsumer {
public:
/// Constructor
PullBasedIteratorConsumer();
~PullBasedIteratorConsumer() = default;
Status Init(std::shared_ptr<DatasetNode> root);
/// \brief Returns the next row in a vector format
/// \note This is currently a placeholder function
/// \param[in] num_rows the number of rows that we want to get
/// \param[out] out std::vector of TensorRows
/// \return Status error code
std::vector<TensorRow> GetRows(int64_t num_rows);
/// Returns the next row in a vector format
/// \param[out] out std::vector of Tensors
/// \return Status error code
Status GetNextAsVector(std::vector<TensorPtr> *out);
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// \return Status error code
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *out);
/// Returns the next row in as a vector
/// \param[out] out std::vector of pairs of string to Tensor
/// \return Status error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);
private:
std::unique_ptr<TreeAdapterLite> tree_adapter_lite_;
std::vector<std::pair<std::string, int32_t>> column_order_; // key: column name, val: column id
};
} // namespace mindspore::dataset
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_

@ -74,7 +74,9 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size,
out_col_names_(out_col),
batch_size_func_(batch_size_func),
batch_map_func_(batch_map_func),
pad_info_(pad_map) {
pad_info_(pad_map),
batch_num_(0),
batch_cnt_(0) {
worker_queues_.Init(num_workers, op_queue_size);
}
// if PYTHON is disabled. per_batch_map can't be used
@ -560,5 +562,35 @@ int64_t BatchOp::GetTreeBatchSize() {
return start_batch_size_;
}
Status BatchOp::GetNextRow(TensorRow *row) {
std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
int32_t cur_batch_size = 0;
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_)));
for (int i = 0; i < cur_batch_size; i++) {
TensorRow new_row;
RETURN_IF_NOT_OK(child_[0]->GetNextRow(&new_row));
if (!new_row.empty()) {
table->emplace_back(new_row);
if (table->size() == static_cast<size_t>(cur_batch_size)) break;
} else {
if (drop_ || table->empty()) {
table = std::make_unique<TensorQTable>(); // this drops when drop == true
}
}
}
std::unique_ptr<TensorQTable> out = std::make_unique<TensorQTable>();
RETURN_UNEXPECTED_IF_NULL(table);
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); // do padding if needed
if (!table->empty()) {
RETURN_IF_NOT_OK(BatchRows(&table, &out, table->size()));
CHECK_FAIL_RETURN_UNEXPECTED(out->size() == 1, "Batch returned 2 rows while 1 row was expected.");
*row = out->back();
batch_cnt_++;
batch_num_++;
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -259,6 +259,8 @@ class BatchOp : public ParallelOp {
// @return Status The status code returned
Status LaunchThreadsAndInitOp();
Status GetNextRow(TensorRow *row) override;
#ifdef ENABLE_PYTHON
// Invoke batch size function with current BatchInfo to generate batch size.
// @return Status The status code returned
@ -278,6 +280,8 @@ class BatchOp : public ParallelOp {
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
std::unordered_map<std::string, int32_t> child_map_; // col_name_id_map of the child node
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker
int64_t batch_num_;
int64_t batch_cnt_;
#ifdef ENABLE_PYTHON
py::function batch_size_func_; // Function pointer of batch size function
py::function batch_map_func_; // Function pointer of per batch map function

@ -252,6 +252,11 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const {
}
}
Status DatasetOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(child_[0]);
return child_[0]->GetNextRow(row);
}
// Gets the next buffer from the given child
Status DatasetOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) {
// pop is a blocked call and will throw an interruption if the whole group shuts down.

@ -129,6 +129,8 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
/// \param show_all - A bool to control if you want to show all info or just a summary
virtual void Print(std::ostream &out, bool show_all) const;
virtual Status GetNextRow(TensorRow *row);
/// \brief << Stream output operator overload
/// \notes This allows you to write the debug print info using stream operators
/// \param out - reference to the output stream being overloaded

@ -151,5 +151,17 @@ Status ProjectOp::ComputeColMap() {
}
return Status::OK();
}
Status ProjectOp::GetNextRow(TensorRow *row) {
ComputeColMap();
TensorRow new_row;
RETURN_IF_NOT_OK(child_[0]->GetNextRow(&new_row));
(void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row),
[&new_row](uint32_t x) { return new_row[x]; });
// Now if columns changed after map, we don't know which column we should keep,
// so temporarily we don't support print file_path after ProjectOp.
new_row.setPath({});
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -101,6 +101,8 @@ class ProjectOp : public PipelineOp {
// @return Status The status code returned
Status EofReceived(int32_t worker_id) override;
Status GetNextRow(TensorRow *row) override;
// Op name getter
// @return Name of the current Op
std::string Name() const override { return kProjectOp; }

@ -81,7 +81,8 @@ AlbumOp::AlbumOp(int32_t num_wkrs, int32_t rows_per_buffer, std::string file_dir
row_cnt_(0),
buf_cnt_(0),
sampler_ind_(0),
dirname_offset_(0) {
dirname_offset_(0),
sample_ids_(nullptr) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
@ -600,5 +601,25 @@ Status AlbumOp::ComputeColMap() {
}
return Status::OK();
}
Status AlbumOp::GetNextRow(TensorRow *row) {
if (image_rows_.empty()) PrescanEntry();
if (sample_ids_ == nullptr) {
RETURN_IF_NOT_OK(this->InitSampler());
std::unique_ptr<DataBuffer> sample_buffer;
TensorRow sample_row;
RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_buffer));
RETURN_IF_NOT_OK(sample_buffer->PopRow(&sample_row));
sample_ids_ = sample_row[0];
}
if (row_cnt_ + 1 > sample_ids_->Size()) {
return Status::OK();
}
int64_t key;
sample_ids_->GetItemAt(&key, {row_cnt_});
RETURN_IF_NOT_OK(LoadTensorRow(key, image_rows_[key], row));
row_cnt_++;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -291,6 +291,8 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
/// \return Status The status code returned
Status Reset() override;
Status GetNextRow(TensorRow *row) override;
// Private function for computing the assignment of the column name map.
// @return Status The status code returned
Status ComputeColMap() override;
@ -306,6 +308,7 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
int64_t sampler_ind_;
int64_t dirname_offset_;
std::vector<std::string> image_rows_;
TensorPtr sample_ids_;
};
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,64 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/tree_adapter_lite.h"
namespace mindspore {
namespace dataset {
TreeAdapterLite::TreeAdapterLite() : root_(nullptr) { tree_ = std::make_unique<ExecutionTree>(); }
Status TreeAdapterLite::BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *const op) {
// Build the DatasetOp ExecutionTree from the optimized IR tree
std::vector<std::shared_ptr<DatasetOp>> ops;
RETURN_IF_NOT_OK(ir->Build(&ops));
CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node.");
(*op) = ops.front(); // return the first op to be added as child by the caller of this function
RETURN_IF_NOT_OK(tree_->AssociateNode(*op));
for (size_t i = 1; i < ops.size(); i++) {
RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i]));
RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i]));
}
// Build the children of IR, once they return, add the return value to *op
for (std::shared_ptr<DatasetNode> child_ir : ir->Children()) {
std::shared_ptr<DatasetOp> child_op;
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(child_ir, &child_op));
RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops
}
return Status::OK();
}
Status TreeAdapterLite::BuildTree(std::shared_ptr<DatasetNode> root_ir) {
RETURN_UNEXPECTED_IF_NULL(root_ir);
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir, &root_));
RETURN_IF_NOT_OK(tree_->AssignRoot(root_));
return Status::OK();
}
Status TreeAdapterLite::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(root_);
RETURN_IF_NOT_OK(root_->GetNextRow(row));
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,58 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/ir/datasetops/dataset_node.h"
namespace mindspore {
namespace dataset {
class TensorRow;
class DatasetNode;
class TreeAdapterLite {
public:
TreeAdapterLite();
~TreeAdapterLite() = default;
Status BuildTree(std::shared_ptr<DatasetNode> root_ir);
// Get rows equal to num_rows
Status GetNextRow(TensorRow *row);
std::unordered_map<std::string, int32_t> GetColumnNameMap() const { return tree_->root()->column_name_id_map(); }
private:
// This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree.
Status BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op);
std::shared_ptr<DatasetOp> root_; // current connector capacity of root op, used for profiling
std::unique_ptr<ExecutionTree> tree_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_

@ -36,6 +36,7 @@
#include "minddata/dataset/include/tensor.h"
#include "minddata/dataset/include/text.h"
#include "minddata/dataset/include/type_id.h"
#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h"
namespace mindspore {
namespace dataset {
@ -44,6 +45,7 @@ class Tensor;
class TensorRow;
class TensorShape;
class TreeAdapter;
class TreeAdapterLite;
class TreeGetters;
class Vocab;
@ -51,6 +53,7 @@ class DatasetCache;
class DatasetNode;
class Iterator;
class PullBasedIteratorConsumer;
class TensorOperation;
class SchemaObj;
@ -134,6 +137,11 @@ class Dataset : public std::enable_shared_from_this<Dataset> {
/// \return Shared pointer to the original object
std::shared_ptr<Dataset> SetNumWorkers(int32_t num_workers);
/// \brief Function to create an PullBasedIterator over the Dataset
/// \param[in] columns List of columns to be used to specify the order of columns
/// \return Shared pointer to the Iterator
std::shared_ptr<PullIterator> CreatePullBasedIterator(std::vector<std::vector<char>> columns = {});
/// \brief Function to create an Iterator over the Dataset pipeline
/// \param[in] columns List of columns to be used to specify the order of columns
/// \param[in] num_epochs Number of epochs to run through the pipeline, default -1 which means infinite epochs.

@ -37,6 +37,7 @@ class Tensor;
class NativeRuntimeContext;
class IteratorConsumer;
class PullBasedIteratorConsumer;
class Dataset;
@ -80,7 +81,7 @@ class Iterator {
/// \note Type of return data is a vector(without column name).
/// \param[out] row - the output tensor row.
/// \return - a Status error code, returns OK if no error encountered.
Status GetNextRow(MSTensorVec *row);
virtual Status GetNextRow(MSTensorVec *row);
/// \brief Function to shut down the data pipeline.
void Stop();
@ -131,6 +132,35 @@ class Iterator {
std::unique_ptr<NativeRuntimeContext> runtime_context_;
IteratorConsumer *consumer_;
};
class PullIterator : public Iterator {
public:
/// \brief Constructor
PullIterator();
/// \brief Function to get next row from the data pipeline.
/// \note Type of return data is a vector(without column name).
/// \param[out] row - the output tensor row.
/// \return Returns true if no error encountered else false.
Status GetNextRow(MSTensorVec *row) override;
/// \brief Function to get specified rows from the data pipeline.
/// \note Type of return data is a vector(without column name).
/// \note This behavior is subject to change
/// \param[in] num_rows - the number of rows to fetch.
/// \param[out] row - the output tensor row.
/// \return Returns true if no error encountered else false.
Status GetRows(int32_t num_rows, std::vector<MSTensorVec> *row);
/// \brief Method for building and launching the pipeline.
/// \note Consider making this function protected.
/// \param[in] ds - The root node that calls the function
/// \return - a Status error code, returns OK if no error encountered.
Status BuildAndLaunchTree(std::shared_ptr<Dataset> ds);
private:
std::unique_ptr<PullBasedIteratorConsumer> pull_consumer_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_INCLUDE_ITERATOR_H_

@ -125,6 +125,8 @@ if(BUILD_MINDDATA STREQUAL "full")
${MINDDATA_DIR}/core/tensor.cc
${MINDDATA_DIR}/core/global_context.cc
${MINDDATA_DIR}/core/client.cc
${MINDDATA_DIR}/engine/tree_adapter_lite.cc
${MINDDATA_DIR}/engine/consumers/pull_based_tree_consumer.cc
${MINDDATA_DIR}/engine/consumers/tree_consumer.cc
${MINDDATA_DIR}/engine/ir/datasetops/dataset_node.cc
${MINDDATA_DIR}/engine/ir/datasetops/epoch_ctrl_node.cc

@ -31,6 +31,7 @@ SET(DE_UT_SRCS
c_api_dataset_voc_test.cc
c_api_datasets_test.cc
c_api_epoch_ctrl_test.cc
c_api_pull_based_test.cc
c_api_repeat_test.cc
c_api_samplers_test.cc
c_api_text_sentence_piece_vocab_test.cc

@ -0,0 +1,83 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/common.h"
#include "minddata/dataset/include/datasets.h"
namespace common = mindspore::common;
using namespace mindspore::dataset;
class MindDataTestPipeline : public UT::DatasetOpTesting {
protected:
};
TEST_F(MindDataTestPipeline, TestPullBasedBatch) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic.";
std::string folder_path = datasets_root_path_ + "/testAlbum/images";
std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json";
std::vector<std::string> column_names = {"label"};
// Create a Album Dataset
std::shared_ptr<Dataset> ds = Album(folder_path, schema_file, column_names);
EXPECT_NE(ds, nullptr);
int32_t batch_size = 4;
ds = ds->Batch(batch_size, true);
EXPECT_NE(ds, nullptr);
auto iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
std::vector<mindspore::MSTensor> row;
Status rc = iter->GetNextRow(&row);
EXPECT_EQ(rc, Status::OK());
EXPECT_EQ(row.size(), 1);
auto temp = row[0].Shape();
std::vector<int64_t> result = {batch_size, 2};
EXPECT_EQ(row[0].Shape(), result);
}
TEST_F(MindDataTestPipeline, TestPullBasedProject) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic.";
std::string folder_path = datasets_root_path_ + "/testAlbum/images";
std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json";
std::vector<std::string> column_names = {"label", "image"};
// Create a Album Dataset
std::shared_ptr<Dataset> ds = Album(folder_path, schema_file, column_names);
EXPECT_NE(ds, nullptr);
std::vector<mindspore::MSTensor> row;
auto iter = ds->CreatePullBasedIterator();
EXPECT_NE(iter, nullptr);
Status rc = iter->GetNextRow(&row);
EXPECT_EQ(rc, Status::OK());
EXPECT_EQ(row.size(), 2);
std::shared_ptr<Dataset> ds2 = Album(folder_path, schema_file, column_names);
EXPECT_NE(ds2, nullptr);
std::vector<std::string> columns_to_project = {"image"};
ds2 = ds2->Project(columns_to_project);
EXPECT_NE(ds2, nullptr);
auto iter2 = ds2->CreatePullBasedIterator();
EXPECT_NE(iter2, nullptr);
std::vector<mindspore::MSTensor> new_row;
rc = iter2->GetNextRow(&new_row);
EXPECT_EQ(rc, Status::OK());
EXPECT_EQ(new_row.size(), 1);
}
Loading…
Cancel
Save