add filter c api and ut

pull/8232/head
Xiao Tianci 4 years ago
parent 491efd8aa4
commit 4a5e287b92

@ -33,6 +33,7 @@
// IR non-leaf nodes
#include "minddata/dataset/engine/ir/datasetops/batch_node.h"
#include "minddata/dataset/engine/ir/datasetops/concat_node.h"
#include "minddata/dataset/engine/ir/datasetops/filter_node.h"
#include "minddata/dataset/engine/ir/datasetops/map_node.h"
#include "minddata/dataset/engine/ir/datasetops/project_node.h"
#include "minddata/dataset/engine/ir/datasetops/rename_node.h"
@ -487,6 +488,15 @@ ConcatDataset::ConcatDataset(const std::vector<std::shared_ptr<Dataset>> &datase
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
#ifndef ENABLE_ANDROID
FilterDataset::FilterDataset(std::shared_ptr<Dataset> input, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns) {
auto ds = std::make_shared<FilterNode>(input->IRNode(), predicate, input_columns);
ir_node_ = std::static_pointer_cast<DatasetNode>(ds);
}
#endif
MapDataset::MapDataset(std::shared_ptr<Dataset> input, std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns, std::vector<std::string> output_columns,
const std::vector<std::string> &project_columns, const std::shared_ptr<DatasetCache> &cache) {

@ -829,8 +829,9 @@ Status DEPipeline::ParseFilterOp(const py::dict &args, std::shared_ptr<DatasetOp
if (!py::isinstance<py::function>(op)) {
RETURN_STATUS_UNEXPECTED("Error: predicate is not recognised (not pyfunc).");
}
py::function predicate_func = op.cast<py::function>();
(void)builder->SetPredicateFunc(std::move(predicate_func));
std::shared_ptr<TensorOp> py_func;
py_func = std::make_shared<PyFuncOp>(value.cast<py::function>(), DataType::DE_BOOL);
(void)builder->SetPredicateFunc(py_func);
} else if (key == "input_columns") {
std::vector<std::string> in_col_names = ToStringVector(args["input_columns"]);
(void)builder->SetInColNames(in_col_names);

@ -64,7 +64,7 @@ Status FilterOp::Builder::Build(std::shared_ptr<FilterOp> *ptr) {
}
FilterOp::FilterOp(const std::vector<std::string> &in_col_names, int32_t num_workers, int32_t op_queue_size,
py::function predicate_func)
std::shared_ptr<TensorOp> predicate_func)
: ParallelOp(num_workers, op_queue_size), predicate_func_(std::move(predicate_func)), in_columns_(in_col_names) {}
Status FilterOp::operator()() {
@ -242,28 +242,11 @@ Status FilterOp::CheckInput(const TensorRow &input) const {
Status FilterOp::InvokePredicateFunc(const TensorRow &input, bool *out_predicate) {
RETURN_IF_NOT_OK(CheckInput(input));
// Acquire Python GIL.
py::gil_scoped_acquire gil_acquire;
if (Py_IsInitialized() == 0) {
return Status(StatusCode::kPythonInterpreterFailure, "Python Interpreter is finalized");
}
try {
// Transform input tensor vector into numpy array vector.
py::tuple input_args(input.size());
for (size_t i = 0; i < input.size(); i++) {
py::array new_data;
RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data));
input_args[i] = new_data;
}
// Invoke python function.
py::object ret_py_obj = predicate_func_(*input_args);
*out_predicate = ret_py_obj.cast<py::bool_>();
} catch (const py::error_already_set &e) {
std::stringstream ss;
ss << e.what() << std::endl;
ss << "Invalid parameter, predicate function function should return true/false.";
return Status(StatusCode::kPyFuncException, ss.str());
}
TensorRow output;
RETURN_IF_NOT_OK(predicate_func_->Compute(input, &output));
RETURN_IF_NOT_OK(output.at(0)->GetItemAt(out_predicate, {}));
return Status(StatusCode::kOK, "FilterOp predicate func call succeed");
}

@ -46,7 +46,7 @@ class FilterOp : public ParallelOp {
// Setter method.
// @return Builder setter method returns reference to the builder.
Builder &SetPredicateFunc(py::function func) {
Builder &SetPredicateFunc(std::shared_ptr<TensorOp> func) {
builder_predicate_func_ = std::move(func);
return *this;
}
@ -82,7 +82,7 @@ class FilterOp : public ParallelOp {
// @return Status - The error code return.
Status SanityCheck();
std::vector<std::string> build_in_col_names_;
py::function builder_predicate_func_;
std::shared_ptr<TensorOp> builder_predicate_func_;
int32_t builder_num_workers_;
int32_t builder_op_connector_size_;
};
@ -97,7 +97,7 @@ class FilterOp : public ParallelOp {
// @param op_connector_size The size of each queue in the connector.
// @param predicate_func python callable which returns a boolean value.
FilterOp(const std::vector<std::string> &in_col_names, int32_t num_workers, int32_t op_queue_size,
py::function predicate_func);
std::shared_ptr<TensorOp> predicate_func);
// Destructor
~FilterOp() = default;
@ -144,7 +144,7 @@ class FilterOp : public ParallelOp {
private:
// predicate_func python callable which returns a boolean value.
py::function predicate_func_;
std::shared_ptr<TensorOp> predicate_func_;
// Variable to store the column name that will feed to predicate function.
std::vector<std::string> in_columns_;

@ -9,6 +9,7 @@ set(DATASET_ENGINE_IR_DATASETOPS_SRC_FILES
build_sentence_piece_vocab_node.cc
build_vocab_node.cc
concat_node.cc
filter_node.cc
map_node.cc
project_node.cc
rename_node.cc

@ -0,0 +1,61 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minddata/dataset/engine/ir/datasetops/filter_node.h"
#include <memory>
#include <string>
#include <vector>
#include "minddata/dataset/engine/datasetops/filter_op.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
namespace dataset {
// Constructor for FilterNode
FilterNode::FilterNode(std::shared_ptr<DatasetNode> child, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns)
: predicate_(predicate), input_columns_(input_columns) {
this->children.push_back(child);
}
std::vector<std::shared_ptr<DatasetOp>> FilterNode::Build() {
// A vector containing shared pointer to the Dataset Ops that this object will create
std::vector<std::shared_ptr<DatasetOp>> node_ops;
std::shared_ptr<TensorOp> c_func;
c_func = std::make_shared<CFuncOp>(predicate_);
node_ops.push_back(std::make_shared<FilterOp>(input_columns_, num_workers_, connector_que_size_, c_func));
return node_ops;
}
Status FilterNode::ValidateParams() {
if (predicate_ == nullptr) {
std::string err_msg = "FilterNode: predicate is not specified.";
MS_LOG(ERROR) << err_msg;
RETURN_STATUS_SYNTAX_ERROR(err_msg);
}
if (!input_columns_.empty()) {
RETURN_IF_NOT_OK(ValidateDatasetColumnParam("FilterNode", "input_columns", input_columns_));
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,53 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_FILTER_NODE_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_FILTER_NODE_H_
#include <memory>
#include <string>
#include <vector>
#include "minddata/dataset/engine/ir/datasetops/dataset_node.h"
namespace mindspore {
namespace dataset {
class FilterNode : public DatasetNode {
public:
/// \brief Constructor
FilterNode(std::shared_ptr<DatasetNode> child, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns = {});
/// \brief Destructor
~FilterNode() = default;
/// \brief a base class override function to create the required runtime dataset op objects for this class
/// \return The list of shared pointers to the newly created DatasetOps
std::vector<std::shared_ptr<DatasetOp>> Build() override;
/// \brief Parameters validation
/// \return Status Status::OK() if all the parameters are valid
Status ValidateParams() override;
private:
std::function<TensorRow(TensorRow)> predicate_;
std::vector<std::string> input_columns_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_DATASETOPS_FILTER_NODE_H_

@ -67,6 +67,7 @@ class CsvBase;
class BatchDataset;
#ifndef ENABLE_ANDROID
class BucketBatchByLengthDataset;
class FilterDataset;
#endif
class CSVDataset;
class ConcatDataset;
@ -242,6 +243,18 @@ class Dataset : public std::enable_shared_from_this<Dataset> {
return std::make_shared<ConcatDataset>(all_datasets);
}
#ifndef ENABLE_ANDROID
/// \brief Function to filter dataset by predicate
/// \notes If input_columns is not provided or empty, all columns will be used
/// \param[in] predicate Function callable which returns a boolean value. If false then filter the element
/// \param[in] input_columns List of names of the input columns to filter
/// \return Shared pointer to the current FilterNode
std::shared_ptr<FilterDataset> Filter(std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns = {}) {
return std::make_shared<FilterDataset>(shared_from_this(), predicate, input_columns);
}
#endif
/// \brief Function to create a MapDataset
/// \notes Applies each operation in operations to this dataset
/// \param[in] operations Vector of operations to be applied on the dataset. Operations are
@ -418,6 +431,14 @@ class ConcatDataset : public Dataset {
explicit ConcatDataset(const std::vector<std::shared_ptr<Dataset>> &input);
};
#ifndef ENABLE_ANDROID
class FilterDataset : public Dataset {
public:
FilterDataset(std::shared_ptr<Dataset> input, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns);
};
#endif
class MapDataset : public Dataset {
public:
MapDataset(std::shared_ptr<Dataset> input, std::vector<std::shared_ptr<TensorOperation>> operations,

@ -105,6 +105,9 @@ Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) {
RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({1}), DataType(DataType::DE_INT32), &out));
RETURN_IF_NOT_OK(out->SetItemAt({0}, ret_py_obj.cast<int32_t>()));
break;
case DataType::DE_BOOL:
RETURN_IF_NOT_OK(Tensor::CreateScalar(ret_py_obj.cast<bool>(), &out));
break;
default:
RETURN_STATUS_UNEXPECTED("No cast for the specified DataType was found.");
}

@ -189,6 +189,7 @@ if (BUILD_MINDDATA STREQUAL "full")
"${MINDDATA_DIR}/engine/ir/datasetops/bucket_batch_by_length_node.cc"
"${MINDDATA_DIR}/engine/ir/datasetops/build_sentence_piece_vocab_node.cc"
"${MINDDATA_DIR}/engine/ir/datasetops/build_vocab_node.cc"
"${MINDDATA_DIR}/engine/ir/datasetops/filter_node.cc"
"${MINDDATA_DIR}/engine/ir/datasetops/sync_wait_node.cc"
)
list(REMOVE_ITEM MINDDATA_ENGINE_CONSUMERS_SRC_FILES

@ -35,6 +35,41 @@ mindspore::dataset::TensorRow BucketBatchTestFunction(mindspore::dataset::Tensor
return output;
}
TensorRow Predicate1(TensorRow input) {
// Return true if input is equal to 3
uint64_t input_value;
input.at(0)->GetItemAt(&input_value, {0});
bool result = (input_value == 3);
// Convert from boolean to TensorRow
TensorRow output;
std::shared_ptr<Tensor> out;
Tensor::CreateEmpty(mindspore::dataset::TensorShape({1}),
mindspore::dataset::DataType(mindspore::dataset::DataType::Type::DE_BOOL), &out);
out->SetItemAt({0}, result);
output.push_back(out);
return output;
}
TensorRow Predicate2(TensorRow input) {
// Return true if label is more than 1
// The index of label in input is 1
uint64_t input_value;
input.at(1)->GetItemAt(&input_value, {0});
bool result = (input_value > 1);
// Convert from boolean to TensorRow
TensorRow output;
std::shared_ptr<Tensor> out;
Tensor::CreateEmpty(mindspore::dataset::TensorShape({1}),
mindspore::dataset::DataType(mindspore::dataset::DataType::Type::DE_BOOL), &out);
out->SetItemAt({0}, result);
output.push_back(out);
return output;
}
TEST_F(MindDataTestPipeline, TestBatchAndRepeat) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBatchAndRepeat.";
@ -471,6 +506,180 @@ TEST_F(MindDataTestPipeline, TestConcatSuccess2) {
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestFilterSuccess1) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestFilterSuccess1.";
// Test basic filter api with specific predicate to judge if label is equal to 3
// Create a TFRecord Dataset
std::string data_file = datasets_root_path_ + "/test_tf_file_3_images/train-0000-of-0001.data";
std::string schema_file = datasets_root_path_ + "/test_tf_file_3_images/datasetSchema.json";
std::shared_ptr<Dataset> ds = TFRecord({data_file}, schema_file, {"image", "label"}, 0, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create objects for the tensor ops
std::shared_ptr<TensorOperation> decode_op = vision::Decode(true);
EXPECT_NE(decode_op, nullptr);
std::shared_ptr<TensorOperation> resize_op = vision::Resize({64, 64});
EXPECT_NE(resize_op, nullptr);
// Create a Map operation on ds
ds = ds->Map({decode_op, resize_op});
EXPECT_NE(ds, nullptr);
// Create a Filter operation on ds
ds = ds->Filter(Predicate1, {"label"});
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
std::vector<uint64_t> label_list;
uint64_t i = 0;
while (row.size() != 0) {
i++;
auto label = row["label"];
uint64_t label_value;
label->GetItemAt(&label_value, {0});
label_list.push_back(label_value);
iter->GetNextRow(&row);
}
// Only 1 column whose label is equal to 3
EXPECT_EQ(i, 1);
EXPECT_EQ(label_list.at(0), 3);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestFilterSuccess2) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestFilterSuccess2.";
// Test filter api without input_columns
// Create a TFRecord Dataset
std::string data_file = datasets_root_path_ + "/test_tf_file_3_images/train-0000-of-0001.data";
std::string schema_file = datasets_root_path_ + "/test_tf_file_3_images/datasetSchema.json";
std::shared_ptr<Dataset> ds = TFRecord({data_file}, schema_file, {"image", "label"}, 0, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create a Filter operation on ds
ds = ds->Filter(Predicate2);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
std::vector<uint64_t> label_list;
uint64_t i = 0;
while (row.size() != 0) {
i++;
auto label = row["label"];
uint64_t label_value;
label->GetItemAt(&label_value, {0});
label_list.push_back(label_value);
iter->GetNextRow(&row);
}
// There are 2 columns whose label is more than 1
EXPECT_EQ(i, 2);
EXPECT_EQ(label_list.at(0), 2);
EXPECT_EQ(label_list.at(1), 3);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestFilterFail1) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestFilterFail1.";
// Test filter api with nullptr predicate
// Create a TFRecord Dataset
std::string data_file = datasets_root_path_ + "/test_tf_file_3_images/train-0000-of-0001.data";
std::string schema_file = datasets_root_path_ + "/test_tf_file_3_images/datasetSchema.json";
std::shared_ptr<Dataset> ds = TFRecord({data_file}, schema_file, {"image", "label"}, 0, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
std::function<TensorRow(TensorRow)> predicate_null = nullptr;
// Create a Filter operation on ds
ds = ds->Filter(predicate_null);
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
std::shared_ptr<Iterator> iter = ds->CreateIterator();
// Expect failure: invalid Filter input with nullptr predicate
EXPECT_EQ(iter, nullptr);
}
TEST_F(MindDataTestPipeline, TestFilterFail2) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestFilterFail2.";
// Test filter api with wrong input_columns
// Create a TFRecord Dataset
std::string data_file = datasets_root_path_ + "/test_tf_file_3_images/train-0000-of-0001.data";
std::string schema_file = datasets_root_path_ + "/test_tf_file_3_images/datasetSchema.json";
std::shared_ptr<Dataset> ds = TFRecord({data_file}, schema_file, {"image", "label"}, 0, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create a Filter operation on ds
ds = ds->Filter(Predicate1, {"not_exist"});
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
// This will trigger the creation of the Execution Tree and launch it.
std::shared_ptr<Iterator> iter = ds->CreateIterator();
EXPECT_NE(iter, nullptr);
// iterate over the dataset and get each row
std::unordered_map<std::string, std::shared_ptr<Tensor>> row;
iter->GetNextRow(&row);
uint64_t i = 0;
while (row.size() != 0) {
i++;
iter->GetNextRow(&row);
}
// Expect failure: column check fail and return nothing
EXPECT_EQ(i, 0);
// Manually terminate the pipeline
iter->Stop();
}
TEST_F(MindDataTestPipeline, TestFilterFail3) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestFilterFail3.";
// Test filter api with empty input_columns
// Create a TFRecord Dataset
std::string data_file = datasets_root_path_ + "/test_tf_file_3_images/train-0000-of-0001.data";
std::string schema_file = datasets_root_path_ + "/test_tf_file_3_images/datasetSchema.json";
std::shared_ptr<Dataset> ds = TFRecord({data_file}, schema_file, {"image", "label"}, 0, ShuffleMode::kFalse);
EXPECT_NE(ds, nullptr);
// Create a Filter operation on ds
ds = ds->Filter(Predicate1, {""});
EXPECT_NE(ds, nullptr);
// Create an iterator over the result of the above dataset
std::shared_ptr<Iterator> iter = ds->CreateIterator();
// Expect failure: invalid Filter input with empty string of column name
EXPECT_EQ(iter, nullptr);
}
TEST_F(MindDataTestPipeline, TestImageFolderBatchAndRepeat) {
MS_LOG(INFO) << "Doing MindDataTestPipeline-TestImageFolderBatchAndRepeat.";

@ -284,7 +284,6 @@ def test_filter_by_generator_with_map_part_col():
ret_data = []
for item in dataset_f.create_dict_iterator(num_epochs=1, output_numpy=True):
num_iter += 1
print(item)
ret_data.append(item["out1"])
assert num_iter == 3
assert ret_data[0] == 9

Loading…
Cancel
Save