Added BucketBatchByLength for C API

pull/4877/head
mahdi 5 years ago
parent 39e2791149
commit 84efa0b96b

@ -37,6 +37,7 @@
#endif
// Dataset operator headers (in alphabetical order)
#include "minddata/dataset/engine/datasetops/batch_op.h"
#include "minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h"
#include "minddata/dataset/engine/datasetops/build_vocab_op.h"
#include "minddata/dataset/engine/datasetops/concat_op.h"
#include "minddata/dataset/engine/datasetops/map_op/map_op.h"
@ -280,6 +281,25 @@ std::shared_ptr<BatchDataset> Dataset::Batch(int32_t batch_size, bool drop_remai
return ds;
}
// Function to create a BucketBatchByLength dataset
std::shared_ptr<BucketBatchByLengthDataset> Dataset::BucketBatchByLength(
const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries,
const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow),
const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info, bool pad_to_bucket_boundary,
bool drop_remainder) {
auto ds = std::make_shared<BucketBatchByLengthDataset>(column_names, bucket_boundaries, bucket_batch_sizes,
element_length_function, pad_info, pad_to_bucket_boundary,
drop_remainder);
if (!ds->ValidateParams()) {
return nullptr;
}
ds->children.push_back(shared_from_this());
return ds;
}
#ifndef ENABLE_ANDROID
// Function to create a Vocab from dataset
std::shared_ptr<Vocab> Dataset::BuildVocab(const std::vector<std::string> &columns,
@ -1590,6 +1610,79 @@ bool BatchDataset::ValidateParams() {
return true;
}
BucketBatchByLengthDataset::BucketBatchByLengthDataset(
const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries,
const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow),
const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info, bool pad_to_bucket_boundary,
bool drop_remainder)
: column_names_(column_names),
bucket_boundaries_(bucket_boundaries),
bucket_batch_sizes_(bucket_batch_sizes),
element_length_function_(element_length_function),
pad_info_(pad_info),
pad_to_bucket_boundary_(pad_to_bucket_boundary),
drop_remainder_(drop_remainder) {}
std::vector<std::shared_ptr<DatasetOp>> BucketBatchByLengthDataset::Build() {
// A vector containing shared pointer to the Dataset Ops that this object will create
std::vector<std::shared_ptr<DatasetOp>> node_ops;
std::shared_ptr<TensorOp> c_func;
if (element_length_function_ != nullptr) {
c_func = std::make_shared<CFuncOp>(element_length_function_);
} else {
c_func = nullptr;
}
node_ops.push_back(std::make_shared<BucketBatchByLengthOp>(column_names_, bucket_boundaries_, bucket_batch_sizes_,
c_func, pad_info_, pad_to_bucket_boundary_,
drop_remainder_, connector_que_size_));
return node_ops;
}
bool BucketBatchByLengthDataset::ValidateParams() {
if (element_length_function_ == nullptr && column_names_.size() != 1) {
MS_LOG(ERROR) << "BucketBatchByLength: If element_length_function is not specified, exactly one column name "
"should be passed.";
return false;
}
// Check bucket_boundaries: must be positive and strictly increasing
if (bucket_boundaries_.empty()) {
MS_LOG(ERROR) << "BucketBatchByLength: bucket_boundaries cannot be empty.";
return false;
}
for (int i = 0; i < bucket_boundaries_.size(); i++) {
if (bucket_boundaries_[i] <= 0) {
MS_LOG(ERROR)
<< "BucketBatchByLength: bucket_boundaries must only contain positive numbers. However, the element at index: "
<< i << " was: " << bucket_boundaries_[i];
return false;
}
if (i > 0 && bucket_boundaries_[i - 1] >= bucket_boundaries_[i]) {
MS_LOG(ERROR)
<< "BucketBatchByLength: bucket_boundaries must be strictly increasing. However, the elements at index: "
<< i - 1 << " and " << i << " were: " << bucket_boundaries_[i - 1] << " and " << bucket_boundaries_[i]
<< " respectively.";
return false;
}
}
// Check bucket_batch_sizes: must be positive
if (bucket_batch_sizes_.empty()) {
MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes must be non-empty";
return false;
}
if (bucket_batch_sizes_.size() != bucket_boundaries_.size() + 1) {
MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes's size must equal the size of bucket_boundaries + 1";
return false;
}
if (std::any_of(bucket_batch_sizes_.begin(), bucket_batch_sizes_.end(), [](int i) { return i <= 0; })) {
MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes must only contain positive numbers.";
return false;
}
return true;
}
#ifndef ENABLE_ANDROID
BuildVocabDataset::BuildVocabDataset(std::shared_ptr<Vocab> vocab, const std::vector<std::string> &columns,
const std::pair<int64_t, int64_t> &freq_range, int64_t top_k,

@ -952,7 +952,9 @@ Status DEPipeline::ParseBucketBatchByLengthOp(const py::dict &args, std::shared_
(void)builder->SetBucketBatchSizes(ToIntVector(value));
}
if (key == "element_length_function") {
(void)builder->SetElementLengthFunction(value.cast<py::function>());
std::shared_ptr<TensorOp> py_func;
py_func = std::make_shared<PyFuncOp>(value.cast<py::function>(), DataType::DE_INT32);
(void)builder->SetElementLengthFunction(py_func);
}
if (key == "pad_info") {
PadInfo pad_info;

@ -85,7 +85,7 @@ Status BucketBatchByLengthOp::Builder::Build(std::shared_ptr<BucketBatchByLength
BucketBatchByLengthOp::BucketBatchByLengthOp(std::vector<std::string> length_dependent_columns,
std::vector<int32_t> bucket_boundaries,
std::vector<int32_t> bucket_batch_sizes,
py::function element_length_function, PadInfo pad_info,
std::shared_ptr<TensorOp> element_length_function, PadInfo pad_info,
bool pad_to_bucket_boundary, bool drop_remainder,
int32_t op_connector_size)
: PipelineOp(op_connector_size),
@ -155,34 +155,15 @@ Status BucketBatchByLengthOp::ObtainElementLength(int32_t *out_element_length, T
// call pyfunc here if given pyfunc, otherwise return 0th dimension of shape of
// the single column specified in length_dependent_columns_
if (element_length_function_) {
py::gil_scoped_acquire gil_acquire;
if (Py_IsInitialized() == 0) {
return Status(StatusCode::kPythonInterpreterFailure, "Python Interpreter is finalized");
}
try {
size_t number_of_arguments = length_dependent_columns_.size();
py::tuple input_arguments(number_of_arguments);
for (size_t i = 0; i < number_of_arguments; i++) {
py::array argument_value;
int32_t column_index = column_name_id_map_[length_dependent_columns_[i]];
RETURN_IF_NOT_OK(element[column_index]->GetDataAsNumpy(&argument_value));
input_arguments[i] = argument_value;
}
py::object length = element_length_function_(*input_arguments);
*out_element_length = length.cast<int32_t>();
if (*out_element_length < 0) {
return Status(StatusCode::kPyFuncException, "Element length function should return a non negative integer.");
}
} catch (const py::error_already_set &e) {
return Status(StatusCode::kPyFuncException, e.what());
} catch (const py::cast_error &e) {
return Status(StatusCode::kPyFuncException, "Count not cast output of element length function to int32_t.");
TensorRow output;
RETURN_IF_NOT_OK(element_length_function_->Compute(element, &output));
RETURN_IF_NOT_OK(output.at(0)->GetItemAt(out_element_length, {0}));
if (*out_element_length < 0) {
RETURN_STATUS_UNEXPECTED("BucketBatchByLength: element_length_function returned negative integer");
}
} else {
*out_element_length = element[0]->shape()[0];
}
return Status::OK();
}

@ -27,6 +27,7 @@
#include "minddata/dataset/engine/dataset_iterator.h"
#include "minddata/dataset/engine/datasetops/batch_op.h"
#include "minddata/dataset/engine/datasetops/pipeline_op.h"
#include "minddata/dataset/kernels/tensor_op.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
@ -57,7 +58,7 @@ class BucketBatchByLengthOp : public PipelineOp {
return *this;
}
Builder &SetElementLengthFunction(py::function element_length_function) {
Builder &SetElementLengthFunction(std::shared_ptr<TensorOp> element_length_function) {
builder_element_length_function_ = element_length_function;
return *this;
}
@ -90,7 +91,7 @@ class BucketBatchByLengthOp : public PipelineOp {
std::vector<std::string> builder_length_dependent_columns_;
std::vector<int32_t> builder_bucket_boundaries_;
std::vector<int32_t> builder_bucket_batch_sizes_;
py::function builder_element_length_function_;
std::shared_ptr<TensorOp> builder_element_length_function_;
PadInfo builder_pad_info_;
bool builder_pad_to_bucket_boundary_;
bool builder_drop_remainder_;
@ -98,8 +99,8 @@ class BucketBatchByLengthOp : public PipelineOp {
};
BucketBatchByLengthOp(std::vector<std::string> length_dependent_columns, std::vector<int32_t> bucket_boundaries,
std::vector<int32_t> bucket_batch_sizes, py::function element_length_function, PadInfo pad_info,
bool pad_to_bucket_boundary, bool drop_remainder, int32_t op_connector_size);
std::vector<int32_t> bucket_batch_sizes, std::shared_ptr<TensorOp> element_length_function,
PadInfo pad_info, bool pad_to_bucket_boundary, bool drop_remainder, int32_t op_connector_size);
// Destructor
~BucketBatchByLengthOp() = default;
@ -137,7 +138,7 @@ class BucketBatchByLengthOp : public PipelineOp {
std::vector<std::string> length_dependent_columns_;
std::vector<int32_t> bucket_boundaries_;
std::vector<int32_t> bucket_batch_sizes_;
py::function element_length_function_;
std::shared_ptr<TensorOp> element_length_function_;
PadInfo pad_info_;
bool pad_to_bucket_boundary_;
bool drop_remainder_;

@ -30,6 +30,8 @@
#include "minddata/dataset/include/iterator.h"
#include "minddata/dataset/include/samplers.h"
#include "minddata/dataset/include/type_id.h"
#include "minddata/dataset/kernels/c_func_op.h"
#include "minddata/dataset/kernels/tensor_op.h"
#ifndef ENABLE_ANDROID
#include "minddata/dataset/text/vocab.h"
#endif
@ -72,6 +74,7 @@ class VOCDataset;
#endif
// Dataset Op classes (in alphabetical order)
class BatchDataset;
class BucketBatchByLengthDataset;
#ifndef ENABLE_ANDROID
class BuildVocabDataset;
#endif
@ -370,6 +373,35 @@ class Dataset : public std::enable_shared_from_this<Dataset> {
/// \return Shared pointer to the current BatchDataset
std::shared_ptr<BatchDataset> Batch(int32_t batch_size, bool drop_remainder = false);
/// \brief Function to create a BucketBatchByLengthDataset
/// \notes Combines batch_size number of consecutive rows into batches
/// \param[in] column_names Columns passed to element_length_function
/// \param[in] bucket_boundaries A list consisting of the upper boundaries of the buckets.
/// Must be strictly increasing. If there are n boundaries, n+1 buckets are created: One bucket for
/// [0, bucket_boundaries[0]), one bucket for [bucket_boundaries[i], bucket_boundaries[i+1]) for each
/// 0<i<n, and one bucket for [bucket_boundaries[n-1], inf).
/// \param[in] bucket_batch_sizes A list consisting of the batch sizes for each bucket.
/// Must contain elements equal to the size of bucket_boundaries + 1.
/// \param[in] element_length_function A function pointer that takes in TensorRow and outputs a TensorRow. The output
/// must contain a single tensor containing a single int32_t. If no value is provided, then size of column_names
/// must be 1, and the size of the first dimension of that column will be taken as the length (default=nullptr)
/// \param[in] pad_info Represents how to batch each column. The key corresponds to the column name, the value must
/// be a tuple of 2 elements. The first element corresponds to the shape to pad to, and the second element
/// corresponds to the value to pad with. If a column is not specified, then that column will be padded to the
/// longest in the current batch, and 0 will be used as the padding value. Any unspecified dimensions will be
/// padded to the longest in the current batch, unless if pad_to_bucket_boundary is true. If no padding is wanted,
/// set pad_info to None (default=empty dictionary).
/// \param[in] pad_to_bucket_boundary If true, will pad each unspecified dimension in pad_info to the bucket_boundary
/// minus 1. If there are any elements that fall into the last bucket, an error will occur (default=false).
/// \param[in] drop_remainder If true, will drop the last batch for each bucket if it is not a full batch
/// (default=false).
/// \return Shared pointer to the current BucketBatchByLengthDataset
std::shared_ptr<BucketBatchByLengthDataset> BucketBatchByLength(
const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries,
const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow) = nullptr,
const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info = {},
bool pad_to_bucket_boundary = false, bool drop_remainder = false);
#ifndef ENABLE_ANDROID
/// \brief Function to create a Vocab from source dataset
/// \notes Build a vocab from a dataset. This would collect all the unique words in a dataset and return a vocab
@ -953,6 +985,36 @@ class BatchDataset : public Dataset {
std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map_;
};
class BucketBatchByLengthDataset : public Dataset {
public:
/// \brief Constructor
BucketBatchByLengthDataset(
const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries,
const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow) = nullptr,
const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info = {},
bool pad_to_bucket_boundary = false, bool drop_remainder = false);
/// \brief Destructor
~BucketBatchByLengthDataset() = default;
/// \brief a base class override function to create the required runtime dataset op objects for this class
/// \return The list of shared pointers to the newly created DatasetOps
std::vector<std::shared_ptr<DatasetOp>> Build() override;
/// \brief Parameters validation
/// \return bool true if all the params are valid
bool ValidateParams() override;
private:
std::vector<std::string> column_names_;
std::vector<int32_t> bucket_boundaries_;
std::vector<int32_t> bucket_batch_sizes_;
TensorRow (*element_length_function_)(TensorRow);
std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_info_;
bool pad_to_bucket_boundary_;
bool drop_remainder_;
};
#ifndef ENABLE_ANDROID
class BuildVocabDataset : public Dataset {
public:

@ -7,6 +7,7 @@ if (ENABLE_PYTHON)
data/compose_op.cc
data/random_apply_op.cc
data/random_choice_op.cc
c_func_op.cc
py_func_op.cc
tensor_op.cc)
target_include_directories(kernels PRIVATE ${pybind11_INCLUDE_DIRS})

@ -0,0 +1,37 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <vector>
#include "minddata/dataset/kernels/c_func_op.h"
#include "minddata/dataset/kernels/tensor_op.h"
#include "minddata/dataset/util/status.h"
namespace mindspore {
namespace dataset {
Status CFuncOp::Compute(const TensorRow &input, TensorRow *output) {
IO_CHECK_VECTOR(input, output);
Status ret = Status(StatusCode::kOK, "CFunc Call Succeed");
try {
*output = (*c_func_ptr_)(input);
} catch (const std::exception &e) {
RETURN_STATUS_UNEXPECTED("Unexpected error in CFuncOp");
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,50 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_
#include <memory>
#include <vector>
#include <utility>
#include <string>
#include "minddata/dataset/core/tensor.h"
#include "minddata/dataset/kernels/tensor_op.h"
namespace mindspore {
namespace dataset {
class CFuncOp : public TensorOp {
public:
explicit CFuncOp(TensorRow (*func)(TensorRow)) : c_func_ptr_(func) {}
~CFuncOp() override = default;
uint32_t NumInput() override { return 0; }
uint32_t NumOutput() override { return 0; }
// Calls c_func_ptr and returns the result
Status Compute(const TensorRow &input, TensorRow *output) override;
std::string Name() const override { return kCFuncOp; }
private:
TensorRow (*c_func_ptr_)(TensorRow);
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_

@ -37,35 +37,44 @@ Status PyFuncOp::Compute(const TensorRow &input, TensorRow *output) {
try {
// Transform input tensor vector into numpy array vector
py::tuple input_args(input.size());
for (size_t i = 0; i < input.size(); i++) {
py::array new_data;
RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data));
// possible memcpy here
input_args[i] = new_data;
py::object ret_py_obj;
if (input.size() > 0) {
for (size_t i = 0; i < input.size(); i++) {
py::array new_data;
RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data));
// possible memcpy here
input_args[i] = new_data;
}
// Invoke python function
ret_py_obj = this->py_func_ptr_(*input_args);
} else {
ret_py_obj = this->py_func_ptr_();
}
// Invoke python function
py::object ret_py_obj = this->py_func_ptr_(*input_args);
// Process the return value
if (py::isinstance<py::array>(ret_py_obj)) {
// In case of a n-1 mapping, the return value will be a numpy array
std::shared_ptr<Tensor> out;
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_obj.cast<py::array>(), &out));
output->push_back(out);
} else if (py::isinstance<py::tuple>(ret_py_obj)) {
// In case of a n-m mapping, the return value will be a tuple of numpy arrays
py::tuple ret_py_tuple = ret_py_obj.cast<py::tuple>();
// Iterate over two containers simultaneously for memory copy
for (size_t i = 0; i < ret_py_tuple.size(); i++) {
py::object ret_py_ele = ret_py_tuple[i];
if (!py::isinstance<py::array>(ret_py_ele)) {
goto ShapeMisMatch;
if (output_type_ != DataType::DE_UNKNOWN) {
RETURN_IF_NOT_OK(CastOutput(ret_py_obj, output));
} else {
if (py::isinstance<py::tuple>(ret_py_obj)) {
// In case of a n-m mapping, the return value will be a tuple of numpy arrays
py::tuple ret_py_tuple = ret_py_obj.cast<py::tuple>();
// Iterate over two containers simultaneously for memory copy
for (size_t i = 0; i < ret_py_tuple.size(); i++) {
py::object ret_py_ele = ret_py_tuple[i];
if (!py::isinstance<py::array>(ret_py_ele)) {
goto ShapeMisMatch;
}
std::shared_ptr<Tensor> out;
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &out));
output->push_back(out);
}
} else if (py::isinstance<py::array>(ret_py_obj)) {
// In case of a n-1 mapping, the return value will be a numpy array
std::shared_ptr<Tensor> out;
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &out));
RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_obj.cast<py::array>(), &out));
output->push_back(out);
} else {
goto ShapeMisMatch;
}
} else {
goto ShapeMisMatch;
}
} catch (const py::error_already_set &e) {
ret = Status(StatusCode::kPyFuncException, e.what());
@ -79,5 +88,24 @@ ShapeMisMatch:
ret = Status(StatusCode::kShapeMisMatch, "PyFunc should return a numpy array or a numpy array tuple");
goto ComputeReturn;
}
Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) {
try {
std::shared_ptr<Tensor> out;
switch (output_type_) {
case DataType::DE_INT32:
RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({1}), DataType(DataType::DE_INT32), &out));
RETURN_IF_NOT_OK(out->SetItemAt({0}, ret_py_obj.cast<int32_t>()));
break;
default:
RETURN_STATUS_UNEXPECTED("No cast for the specified DataType was found.");
}
output->push_back(out);
} catch (const std::exception &e) {
return Status(StatusCode::kUnexpectedError, e.what());
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -27,9 +27,11 @@
namespace mindspore {
namespace dataset {
class __attribute__((visibility("hidden"))) PyFuncOp : public TensorOp {
class PyFuncOp : public TensorOp {
public:
explicit PyFuncOp(py::function func) : py_func_ptr_(std::move(func)) {}
explicit PyFuncOp(py::function func) : py_func_ptr_(std::move(func)) { output_type_ = DataType::DE_UNKNOWN; }
explicit PyFuncOp(py::function func, DataType::Type output_type)
: py_func_ptr_(std::move(func)), output_type_(output_type) {}
~PyFuncOp() override = default;
@ -39,10 +41,18 @@ class __attribute__((visibility("hidden"))) PyFuncOp : public TensorOp {
// Compute function for n-n mapping.
Status Compute(const TensorRow &input, TensorRow *output) override;
/// \brief Function to convert a primitive type py::object to a TensorRow
/// \notes Changes the py::object to a tensor with corresponding C++ DataType based on output_type_ and adds it to a
/// TensorRow. This function is used inside Compute.
/// \param[in] ret_py_obj The python object we want to cast
/// \param[output] The TensorRow output
/// \return Status
Status CastOutput(const py::object &ret_py_obj, TensorRow *output);
std::string Name() const override { return kPyFuncOp; }
private:
py::function py_func_ptr_;
DataType::Type output_type_;
};
} // namespace dataset
} // namespace mindspore

@ -127,6 +127,7 @@ constexpr char kToFloat16Op[] = "ToFloat16Op";
constexpr char kTypeCastOp[] = "TypeCastOp";
// other
constexpr char kCFuncOp[] = "CFuncOp";
constexpr char kPyFuncOp[] = "PyFuncOp";
constexpr char kNoOp[] = "NoOp";

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save