!12809 Added pull-based prototype
From: @mahdirahmanihanzaki Reviewed-by: Signed-off-by:pull/12809/MERGE
commit
62a6f783b3
@ -0,0 +1,106 @@
|
||||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include "minddata/dataset/engine/consumers/pull_based_tree_consumer.h"
|
||||
|
||||
namespace mindspore::dataset {
|
||||
|
||||
PullBasedIteratorConsumer::PullBasedIteratorConsumer() { tree_adapter_lite_ = std::make_unique<TreeAdapterLite>(); }
|
||||
|
||||
Status PullBasedIteratorConsumer::Init(std::shared_ptr<DatasetNode> root) {
|
||||
return tree_adapter_lite_->BuildTree(std::move(root));
|
||||
}
|
||||
|
||||
std::vector<TensorRow> PullBasedIteratorConsumer::GetRows(int64_t num_rows) {
|
||||
std::vector<TensorRow> rows;
|
||||
for (int i = 0; i < num_rows; i++) {
|
||||
TensorRow row;
|
||||
RETURN_SECOND_IF_ERROR(tree_adapter_lite_->GetNextRow(&row), {});
|
||||
if (row.empty()) break;
|
||||
rows.push_back(row);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
Status PullBasedIteratorConsumer::GetNextAsVector(std::vector<TensorPtr> *out) {
|
||||
RETURN_UNEXPECTED_IF_NULL(out);
|
||||
out->clear();
|
||||
|
||||
TensorRow res;
|
||||
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res));
|
||||
|
||||
// Return empty vector if there's no data
|
||||
RETURN_OK_IF_TRUE(res.empty());
|
||||
|
||||
std::copy(res.begin(), res.end(), std::back_inserter(*out));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PullBasedIteratorConsumer::GetNextAsMap(std::unordered_map<std::string, TensorPtr> *const out_map) {
|
||||
RETURN_UNEXPECTED_IF_NULL(out_map);
|
||||
out_map->clear();
|
||||
|
||||
TensorRow res;
|
||||
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&res));
|
||||
|
||||
// Return empty map if there's no data
|
||||
RETURN_OK_IF_TRUE(res.empty());
|
||||
|
||||
// Populate the out map from the row and return it
|
||||
for (const auto &colMap : tree_adapter_lite_->GetColumnNameMap()) {
|
||||
(*out_map)[colMap.first] = std::move(res[colMap.second]);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PullBasedIteratorConsumer::GetNextAsOrderedPair(
|
||||
std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *const vec) {
|
||||
std::unique_ptr<TreeAdapter> tree_adapter = std::make_unique<TreeAdapter>();
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(vec != nullptr && vec->empty(), "vec is null or non-empty.");
|
||||
|
||||
TensorRow curr_row;
|
||||
|
||||
RETURN_IF_NOT_OK(tree_adapter_lite_->GetNextRow(&curr_row));
|
||||
RETURN_OK_IF_TRUE(curr_row.empty());
|
||||
size_t num_cols = curr_row.size(); // num_cols is non-empty.
|
||||
// order the column names according to their ids
|
||||
if (column_order_.empty()) {
|
||||
const int32_t invalid_col_id = -1;
|
||||
column_order_.resize(num_cols, {std::string(), invalid_col_id});
|
||||
for (const auto &itr : tree_adapter->GetColumnNameMap()) {
|
||||
int32_t ind = itr.second;
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(ind < num_cols && ind >= 0, "column id out of bounds.");
|
||||
column_order_[ind] = std::make_pair(itr.first, ind);
|
||||
}
|
||||
// error check, make sure the ids in col_name_id_map are continuous and starts from 0
|
||||
for (const auto &col : column_order_) {
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(col.second != invalid_col_id, "column ids are not continuous.");
|
||||
}
|
||||
}
|
||||
|
||||
vec->reserve(num_cols);
|
||||
|
||||
std::transform(column_order_.begin(), column_order_.end(), std::back_inserter(*vec),
|
||||
[curr_row](const auto &col) { return std::make_pair(col.first, curr_row[col.second]); });
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace mindspore::dataset
|
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_
|
||||
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <cstddef>
|
||||
#include "minddata/dataset/engine/tree_adapter_lite.h"
|
||||
|
||||
namespace mindspore::dataset {
|
||||
|
||||
class TreeAdapterLite;
|
||||
class TensorRow;
|
||||
|
||||
/// Consumer that iterates over the dataset and returns the rows one by one as a in a pull based fashion
|
||||
class PullBasedIteratorConsumer {
|
||||
public:
|
||||
/// Constructor
|
||||
PullBasedIteratorConsumer();
|
||||
|
||||
~PullBasedIteratorConsumer() = default;
|
||||
|
||||
Status Init(std::shared_ptr<DatasetNode> root);
|
||||
|
||||
/// \brief Returns the next row in a vector format
|
||||
/// \note This is currently a placeholder function
|
||||
/// \param[in] num_rows the number of rows that we want to get
|
||||
/// \param[out] out std::vector of TensorRows
|
||||
/// \return Status error code
|
||||
std::vector<TensorRow> GetRows(int64_t num_rows);
|
||||
|
||||
/// Returns the next row in a vector format
|
||||
/// \param[out] out std::vector of Tensors
|
||||
/// \return Status error code
|
||||
Status GetNextAsVector(std::vector<TensorPtr> *out);
|
||||
|
||||
/// Returns the next row in as a map
|
||||
/// \param[out] out std::map of string to Tensor
|
||||
/// \return Status error code
|
||||
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *out);
|
||||
|
||||
/// Returns the next row in as a vector
|
||||
/// \param[out] out std::vector of pairs of string to Tensor
|
||||
/// \return Status error code
|
||||
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);
|
||||
|
||||
private:
|
||||
std::unique_ptr<TreeAdapterLite> tree_adapter_lite_;
|
||||
std::vector<std::pair<std::string, int32_t>> column_order_; // key: column name, val: column id
|
||||
};
|
||||
} // namespace mindspore::dataset
|
||||
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PULL_BASED_TREE_CONSUMER_H_
|
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "minddata/dataset/engine/tree_adapter_lite.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
|
||||
TreeAdapterLite::TreeAdapterLite() : root_(nullptr) { tree_ = std::make_unique<ExecutionTree>(); }
|
||||
|
||||
Status TreeAdapterLite::BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *const op) {
|
||||
// Build the DatasetOp ExecutionTree from the optimized IR tree
|
||||
std::vector<std::shared_ptr<DatasetOp>> ops;
|
||||
RETURN_IF_NOT_OK(ir->Build(&ops));
|
||||
|
||||
CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node.");
|
||||
|
||||
(*op) = ops.front(); // return the first op to be added as child by the caller of this function
|
||||
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(*op));
|
||||
|
||||
for (size_t i = 1; i < ops.size(); i++) {
|
||||
RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i]));
|
||||
RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i]));
|
||||
}
|
||||
|
||||
// Build the children of IR, once they return, add the return value to *op
|
||||
for (std::shared_ptr<DatasetNode> child_ir : ir->Children()) {
|
||||
std::shared_ptr<DatasetOp> child_op;
|
||||
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(child_ir, &child_op));
|
||||
RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TreeAdapterLite::BuildTree(std::shared_ptr<DatasetNode> root_ir) {
|
||||
RETURN_UNEXPECTED_IF_NULL(root_ir);
|
||||
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir, &root_));
|
||||
RETURN_IF_NOT_OK(tree_->AssignRoot(root_));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TreeAdapterLite::GetNextRow(TensorRow *row) {
|
||||
RETURN_UNEXPECTED_IF_NULL(root_);
|
||||
RETURN_IF_NOT_OK(root_->GetNextRow(row));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_
|
||||
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "minddata/dataset/engine/ir/datasetops/dataset_node.h"
|
||||
|
||||
namespace mindspore {
|
||||
namespace dataset {
|
||||
|
||||
class TensorRow;
|
||||
class DatasetNode;
|
||||
|
||||
class TreeAdapterLite {
|
||||
public:
|
||||
TreeAdapterLite();
|
||||
|
||||
~TreeAdapterLite() = default;
|
||||
|
||||
Status BuildTree(std::shared_ptr<DatasetNode> root_ir);
|
||||
|
||||
// Get rows equal to num_rows
|
||||
Status GetNextRow(TensorRow *row);
|
||||
|
||||
std::unordered_map<std::string, int32_t> GetColumnNameMap() const { return tree_->root()->column_name_id_map(); }
|
||||
|
||||
private:
|
||||
// This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree.
|
||||
Status BuildExecutionTreeRecur(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op);
|
||||
|
||||
std::shared_ptr<DatasetOp> root_; // current connector capacity of root op, used for profiling
|
||||
std::unique_ptr<ExecutionTree> tree_;
|
||||
};
|
||||
|
||||
} // namespace dataset
|
||||
} // namespace mindspore
|
||||
|
||||
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TREE_ADAPTER_LITE_H_
|
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Copyright 2021 Huawei Technologies Co., Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "common/common.h"
|
||||
#include "minddata/dataset/include/datasets.h"
|
||||
|
||||
namespace common = mindspore::common;
|
||||
|
||||
using namespace mindspore::dataset;
|
||||
|
||||
class MindDataTestPipeline : public UT::DatasetOpTesting {
|
||||
protected:
|
||||
};
|
||||
|
||||
TEST_F(MindDataTestPipeline, TestPullBasedBatch) {
|
||||
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic.";
|
||||
|
||||
std::string folder_path = datasets_root_path_ + "/testAlbum/images";
|
||||
std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json";
|
||||
std::vector<std::string> column_names = {"label"};
|
||||
// Create a Album Dataset
|
||||
std::shared_ptr<Dataset> ds = Album(folder_path, schema_file, column_names);
|
||||
EXPECT_NE(ds, nullptr);
|
||||
|
||||
int32_t batch_size = 4;
|
||||
ds = ds->Batch(batch_size, true);
|
||||
EXPECT_NE(ds, nullptr);
|
||||
|
||||
auto iter = ds->CreatePullBasedIterator();
|
||||
EXPECT_NE(iter, nullptr);
|
||||
|
||||
std::vector<mindspore::MSTensor> row;
|
||||
Status rc = iter->GetNextRow(&row);
|
||||
EXPECT_EQ(rc, Status::OK());
|
||||
EXPECT_EQ(row.size(), 1);
|
||||
auto temp = row[0].Shape();
|
||||
std::vector<int64_t> result = {batch_size, 2};
|
||||
EXPECT_EQ(row[0].Shape(), result);
|
||||
}
|
||||
|
||||
TEST_F(MindDataTestPipeline, TestPullBasedProject) {
|
||||
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestAlbumBasic.";
|
||||
|
||||
std::string folder_path = datasets_root_path_ + "/testAlbum/images";
|
||||
std::string schema_file = datasets_root_path_ + "/testAlbum/datasetSchema.json";
|
||||
std::vector<std::string> column_names = {"label", "image"};
|
||||
// Create a Album Dataset
|
||||
std::shared_ptr<Dataset> ds = Album(folder_path, schema_file, column_names);
|
||||
EXPECT_NE(ds, nullptr);
|
||||
|
||||
std::vector<mindspore::MSTensor> row;
|
||||
auto iter = ds->CreatePullBasedIterator();
|
||||
EXPECT_NE(iter, nullptr);
|
||||
Status rc = iter->GetNextRow(&row);
|
||||
EXPECT_EQ(rc, Status::OK());
|
||||
EXPECT_EQ(row.size(), 2);
|
||||
|
||||
std::shared_ptr<Dataset> ds2 = Album(folder_path, schema_file, column_names);
|
||||
EXPECT_NE(ds2, nullptr);
|
||||
std::vector<std::string> columns_to_project = {"image"};
|
||||
ds2 = ds2->Project(columns_to_project);
|
||||
EXPECT_NE(ds2, nullptr);
|
||||
|
||||
auto iter2 = ds2->CreatePullBasedIterator();
|
||||
EXPECT_NE(iter2, nullptr);
|
||||
|
||||
std::vector<mindspore::MSTensor> new_row;
|
||||
rc = iter2->GetNextRow(&new_row);
|
||||
EXPECT_EQ(rc, Status::OK());
|
||||
EXPECT_EQ(new_row.size(), 1);
|
||||
}
|
Loading…
Reference in new issue