dataset Python Pushdown - minor review rework

pull/9404/head
Cathy Wong 4 years ago
parent b15504f6a5
commit d15072ca27

@ -260,7 +260,7 @@ std::vector<std::pair<std::string, std::vector<int32_t>>> Dataset::GetClassIndex
std::shared_ptr<SchemaObj> Schema(const std::string &schema_file) {
auto schema = std::make_shared<SchemaObj>(schema_file);
return schema->init() ? schema : nullptr;
return schema->Init() ? schema : nullptr;
}
// FUNCTIONS TO CREATE DATASETS FOR LEAF CLASSES
@ -456,7 +456,7 @@ ConcatDataset::ConcatDataset(const std::vector<std::shared_ptr<Dataset>> &datase
}
FilterDataset::FilterDataset(std::shared_ptr<Dataset> input, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns) {
const std::vector<std::string> &input_columns) {
std::shared_ptr<TensorOp> c_func = nullptr;
if (predicate) c_func = std::make_shared<CFuncOp>(predicate);
auto ds = std::make_shared<FilterNode>(input->IRNode(), c_func, input_columns);
@ -466,7 +466,7 @@ FilterDataset::FilterDataset(std::shared_ptr<Dataset> input, std::function<Tenso
#endif
MapDataset::MapDataset(std::shared_ptr<Dataset> input, std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns, std::vector<std::string> output_columns,
const std::vector<std::string> &input_columns, const std::vector<std::string> &output_columns,
const std::vector<std::string> &project_columns, const std::shared_ptr<DatasetCache> &cache,
std::vector<std::shared_ptr<DSCallback>> callbacks) {
auto ds = std::make_shared<MapNode>(input->IRNode(), operations, input_columns, output_columns, project_columns,
@ -636,8 +636,8 @@ std::shared_ptr<BatchDataset> Dataset::Batch(int32_t batch_size, bool drop_remai
SchemaObj::SchemaObj(const std::string &schema_file) : schema_file_(schema_file), num_rows_(0), dataset_type_("") {}
// SchemaObj init function
Status SchemaObj::init() {
// SchemaObj Init function
Status SchemaObj::Init() {
if (!schema_file_.empty()) {
Path schema_file(schema_file_);
CHECK_FAIL_RETURN_UNEXPECTED(schema_file.Exists(),
@ -650,7 +650,9 @@ Status SchemaObj::init() {
CHECK_FAIL_RETURN_UNEXPECTED(js.find("columns") != js.end(),
"\"columns\" node is required in the schema json file.");
} catch (const std::exception &err) {
RETURN_STATUS_SYNTAX_ERROR("Schema file failed to load");
std::string err_msg = "Schema file failed to load: ";
err_msg += err.what();
RETURN_STATUS_SYNTAX_ERROR(err_msg);
}
return from_json(js);
}
@ -658,13 +660,13 @@ Status SchemaObj::init() {
}
// Function to add a column to schema with a mstype de_type and known shape
Status SchemaObj::add_column(std::string name, TypeId de_type, std::vector<int32_t> shape) {
Status SchemaObj::add_column(const std::string &name, TypeId de_type, const std::vector<int32_t> &shape) {
DataType data_type = dataset::MSTypeToDEType(de_type);
return add_column(name, data_type.ToString(), shape);
}
// Function to add a column to schema with a string de_type and known shape
Status SchemaObj::add_column(std::string name, std::string de_type, std::vector<int32_t> shape) {
Status SchemaObj::add_column(const std::string &name, const std::string &de_type, const std::vector<int32_t> &shape) {
DataType data_type(de_type);
CHECK_FAIL_RETURN_UNEXPECTED(data_type != DataType::DE_UNKNOWN, "Type is unknown.");
@ -679,13 +681,13 @@ Status SchemaObj::add_column(std::string name, std::string de_type, std::vector<
}
// Function to add a column to schema with a mstype de_type and without shape
Status SchemaObj::add_column(std::string name, TypeId de_type) {
Status SchemaObj::add_column(const std::string &name, TypeId de_type) {
DataType data_type = dataset::MSTypeToDEType(de_type);
return add_column(name, data_type.ToString());
}
// Function to add a column to schema with a string de_type and without shape
Status SchemaObj::add_column(std::string name, std::string de_type) {
Status SchemaObj::add_column(const std::string &name, const std::string &de_type) {
DataType data_type(de_type);
CHECK_FAIL_RETURN_UNEXPECTED(data_type != DataType::DE_UNKNOWN, "Type is unknown.");
@ -791,7 +793,9 @@ Status SchemaObj::FromJSONString(const std::string &json_string) {
"\"columns\" node is required in the schema json JSON.");
RETURN_IF_NOT_OK(from_json(js));
} catch (const std::exception &err) {
RETURN_STATUS_SYNTAX_ERROR("JSON string is failed to parse");
std::string err_msg = "FromJSONString: JSON string failed to parse: ";
err_msg += err.what();
RETURN_STATUS_SYNTAX_ERROR(err_msg);
}
return Status::OK();
}
@ -801,7 +805,9 @@ Status SchemaObj::ParseColumnString(const std::string &json_string) {
nlohmann::json js = nlohmann::json::parse(json_string);
RETURN_IF_NOT_OK(parse_column(js));
} catch (const std::exception &err) {
RETURN_STATUS_SYNTAX_ERROR("JSON string is failed to parse");
std::string err_msg = "ParseColumnString: JSON string failed to parse: ";
err_msg += err.what();
RETURN_STATUS_SYNTAX_ERROR(err_msg);
}
return Status::OK();
}

@ -31,7 +31,7 @@ PYBIND_REGISTER(
(void)py::class_<SchemaObj, std::shared_ptr<SchemaObj>>(*m, "SchemaObj", "to create a SchemaObj")
.def(py::init([](std::string schema_file) {
auto schema = std::make_shared<SchemaObj>(schema_file);
THROW_IF_ERROR(schema->init());
THROW_IF_ERROR(schema->Init());
return schema;
}))
.def("add_column", [](SchemaObj &self, std::string name, TypeId de_type,

@ -519,15 +519,17 @@ Status TreeGetters::InternalInit(int8_t type) {
return pre;
});
Status s = tree_adapter_->Compile(std::move(root_), 1);
if (!s.IsError()) init_flag_ = true;
if (s.IsOk()) init_flag_ = true;
return s;
}
Status TreeGetters::InternalInit() {
if (init_flag_) return Status::OK();
Status s = tree_adapter_->Compile(std::move(root_), 1);
if (!s.IsError()) init_flag_ = true;
if (s.IsOk()) init_flag_ = true;
return s;
}
Status TreeGetters::GetFirstRowShapeAndType() {
RETURN_OK_IF_TRUE(first_row_obtained_);
RETURN_IF_NOT_OK(InternalInit(static_cast<int8_t>(GetterPass::kOutputShapeAndType)));
@ -540,6 +542,7 @@ Status TreeGetters::GetFirstRowShapeAndType() {
first_row_obtained_ = true;
return Status::OK();
}
Status BuildVocabConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d), 1); }
Status BuildVocabConsumer::Start() {

@ -41,6 +41,8 @@ class TreeConsumer {
/// \return Status error code.
virtual Status Init(std::shared_ptr<DatasetNode> d);
/// Internal function to perform the termination
/// \return Status error code
virtual Status Terminate();
protected:
@ -72,8 +74,8 @@ class IteratorConsumer : public TreeConsumer {
/// \return Status error code
Status GetNextAsMap(std::unordered_map<std::string, TensorPtr> *out);
/// Returns the next row in as a map
/// \param[out] out std::map of string to Tensor
/// Returns the next row in as a vector
/// \param[out] out std::vector of pairs of string to Tensor
/// \return Status error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);

@ -63,7 +63,7 @@ class IteratorBase {
// @return A unordered map from column name to shared pointer to Tensor.
Status GetNextAsMap(TensorMap *out_map);
/// \breif return column_name, tensor pair in the order of its column id.
/// \brief return column_name, tensor pair in the order of its column id.
/// \param[out] vec
/// \return Error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);

@ -59,6 +59,7 @@ Status ConcatNode::ValidateParams() {
RETURN_STATUS_SYNTAX_ERROR(err_msg);
}
// Either one of children_flag_and_nums_ or children_start_end_index_ should be non-empty.
if ((children_flag_and_nums_.empty() && !children_start_end_index_.empty()) ||
(!children_flag_and_nums_.empty() && children_start_end_index_.empty())) {
std::string err_msg = "ConcatNode: children_flag_and_nums and children_start_end_index should be used together";

@ -269,7 +269,7 @@ class Dataset : public std::enable_shared_from_this<Dataset> {
/// \param[in] input_columns List of names of the input columns to filter
/// \return Shared pointer to the current FilterNode
std::shared_ptr<FilterDataset> Filter(std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns = {}) {
const std::vector<std::string> &input_columns = {}) {
return std::make_shared<FilterDataset>(shared_from_this(), predicate, input_columns);
}
#endif
@ -291,8 +291,8 @@ class Dataset : public std::enable_shared_from_this<Dataset> {
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// \return Shared pointer to the current MapDataset
std::shared_ptr<MapDataset> Map(std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns = {},
std::vector<std::string> output_columns = {},
const std::vector<std::string> &input_columns = {},
const std::vector<std::string> &output_columns = {},
const std::vector<std::string> &project_columns = {},
const std::shared_ptr<DatasetCache> &cache = nullptr,
std::vector<std::shared_ptr<DSCallback>> callbacks = {}) {
@ -377,36 +377,36 @@ class SchemaObj {
/// \brief Destructor
~SchemaObj() = default;
/// \brief SchemaObj init function
/// \return bool true if schema init success
Status init();
/// \brief SchemaObj Init function
/// \return bool true if schema initialization is successful
Status Init();
/// \brief Add new column to the schema with unknown shape of rank 1
/// \param[in] name name of the column.
/// \param[in] de_type data type of the column(TypeId).
/// \return bool true if schema init success
Status add_column(std::string name, TypeId de_type);
Status add_column(const std::string &name, TypeId de_type);
/// \brief Add new column to the schema with unknown shape of rank 1
/// \param[in] name name of the column.
/// \param[in] de_type data type of the column(std::string).
/// \param[in] shape shape of the column.
/// \return bool true if schema init success
Status add_column(std::string name, std::string de_type);
Status add_column(const std::string &name, const std::string &de_type);
/// \brief Add new column to the schema
/// \param[in] name name of the column.
/// \param[in] de_type data type of the column(TypeId).
/// \param[in] shape shape of the column.
/// \return bool true if schema init success
Status add_column(std::string name, TypeId de_type, std::vector<int32_t> shape);
Status add_column(const std::string &name, TypeId de_type, const std::vector<int32_t> &shape);
/// \brief Add new column to the schema
/// \param[in] name name of the column.
/// \param[in] de_type data type of the column(std::string).
/// \param[in] shape shape of the column.
/// \return bool true if schema init success
Status add_column(std::string name, std::string de_type, std::vector<int32_t> shape);
Status add_column(const std::string &name, const std::string &de_type, const std::vector<int32_t> &shape);
/// \brief Get a JSON string of the schema
/// \return JSON string of the schema
@ -473,7 +473,7 @@ class ConcatDataset : public Dataset {
class FilterDataset : public Dataset {
public:
FilterDataset(std::shared_ptr<Dataset> input, std::function<TensorRow(TensorRow)> predicate,
std::vector<std::string> input_columns);
const std::vector<std::string> &input_columns);
~FilterDataset() = default;
};
#endif
@ -481,7 +481,7 @@ class FilterDataset : public Dataset {
class MapDataset : public Dataset {
public:
MapDataset(std::shared_ptr<Dataset> input, std::vector<std::shared_ptr<TensorOperation>> operations,
std::vector<std::string> input_columns, std::vector<std::string> output_columns,
const std::vector<std::string> &input_columns, const std::vector<std::string> &output_columns,
const std::vector<std::string> &project_columns, const std::shared_ptr<DatasetCache> &cache,
std::vector<std::shared_ptr<DSCallback>> callbacks);
~MapDataset() = default;

@ -1306,7 +1306,7 @@ def check_cache_option(cache):
def check_to_device_send(method):
"""A wrapper that wraps a parameter checker around the check_to_device_send."""
"""Check the input arguments of send function for TransferDataset."""
@wraps(method)
def new_method(self, *args, **kwargs):

Loading…
Cancel
Save