Optimization for GetDatasetSize

pull/9249/head
Mahdi 4 years ago
parent af3c27d354
commit f80553bc6e

@ -199,12 +199,13 @@ bool Dataset::Save(std::string dataset_path, int32_t num_files, std::string data
// Constructor
Dataset::Dataset() { tree_getters_ = std::make_shared<TreeGetters>(); }
int64_t Dataset::GetDatasetSize() {
int64_t Dataset::GetDatasetSize(bool estimate) {
int64_t dataset_size;
std::unique_ptr<NativeRuntimeContext> runtime_context = std::make_unique<NativeRuntimeContext>();
RETURN_SECOND_IF_ERROR(runtime_context->Init(), -1);
RETURN_SECOND_IF_ERROR(tree_getters_->Init(this->IRNode()), -1);
RETURN_SECOND_IF_ERROR(tree_getters_->GetDatasetSize(&dataset_size), -1);
std::shared_ptr<DatasetSizeGetter> size_getter = std::make_shared<DatasetSizeGetter>();
RETURN_SECOND_IF_ERROR(size_getter->Init(this->IRNode()), -1);
RETURN_SECOND_IF_ERROR(size_getter->GetDatasetSize(&dataset_size, estimate), -1);
return dataset_size;
}

@ -106,19 +106,7 @@ PYBIND_REGISTER(ImageFolderOp, 1, ([](const py::module *m) {
}));
PYBIND_REGISTER(ManifestOp, 1, ([](const py::module *m) {
(void)py::class_<ManifestOp, DatasetOp, std::shared_ptr<ManifestOp>>(*m, "ManifestOp")
.def_static("get_num_rows_and_classes",
[](const std::string &file, const py::dict &dict, const std::string &usage) {
int64_t count = 0, num_classes = 0;
THROW_IF_ERROR(ManifestOp::CountTotalRows(file, dict, usage, &count, &num_classes));
return py::make_tuple(count, num_classes);
})
.def_static("get_class_indexing", [](const std::string &file, const py::dict &dict,
const std::string &usage) {
std::map<std::string, int32_t> output_class_indexing;
THROW_IF_ERROR(ManifestOp::GetClassIndexing(file, dict, usage, &output_class_indexing));
return output_class_indexing;
});
(void)py::class_<ManifestOp, DatasetOp, std::shared_ptr<ManifestOp>>(*m, "ManifestOp");
}));
PYBIND_REGISTER(MindRecordOp, 1, ([](const py::module *m) {
(void)py::class_<MindRecordOp, DatasetOp, std::shared_ptr<MindRecordOp>>(*m, "MindRecordOp")
@ -173,13 +161,6 @@ PYBIND_REGISTER(TFReaderOp, 1, ([](const py::module *m) {
PYBIND_REGISTER(VOCOp, 1, ([](const py::module *m) {
(void)py::class_<VOCOp, DatasetOp, std::shared_ptr<VOCOp>>(*m, "VOCOp")
.def_static("get_num_rows",
[](const std::string &dir, const std::string &task_type, const std::string &task_mode,
const py::dict &dict, int64_t numSamples) {
int64_t count = 0;
THROW_IF_ERROR(VOCOp::CountTotalRows(dir, task_type, task_mode, dict, &count));
return count;
})
.def_static("get_class_indexing", [](const std::string &dir, const std::string &task_type,
const std::string &task_mode, const py::dict &dict) {
std::map<std::string, int32_t> output_class_indexing;

@ -184,7 +184,11 @@ PYBIND_REGISTER(GeneratorNode, 2, ([](const py::module *m) {
auto gen = std::make_shared<GeneratorNode>(generator_function, schema);
THROW_IF_ERROR(gen->ValidateParams());
return gen;
}));
}))
.def("SetGeneratorDatasetSize", [](std::shared_ptr<GeneratorNode> self, int64_t sz) {
self->SetGeneratorDatasetSize(sz);
return self;
});
}));
PYBIND_REGISTER(ImageFolderNode, 2, ([](const py::module *m) {

@ -93,12 +93,6 @@ PYBIND_REGISTER(TreeGetters, 1, ([](const py::module *m) {
THROW_IF_ERROR(self.GetClassIndexing(&output_class_indexing));
return output_class_indexing;
})
.def("GetDatasetSize",
[](PythonTreeGetters &self) {
int64_t dataset_size;
THROW_IF_ERROR(self.GetDatasetSize(&dataset_size));
return dataset_size;
})
.def("__deepcopy__", [](py::object &tree_getter, py::dict memo) { return tree_getter; });
}));
@ -164,5 +158,18 @@ PYBIND_REGISTER(PythonSaveToDisk, 1, ([](const py::module *m) {
.def("Save", [](PythonSaveToDisk &self) { THROW_IF_ERROR(self.Save()); });
}));
PYBIND_REGISTER(PythonDatasetSizeGetter, 1, ([](const py::module *m) {
(void)py::class_<PythonDatasetSizeGetter, TreeConsumer, std::shared_ptr<PythonDatasetSizeGetter>>(
*m, "DatasetSizeGetters")
.def(py::init<>())
.def("Init", [](PythonDatasetSizeGetter &self,
std::shared_ptr<DatasetNode> d) { THROW_IF_ERROR(self.Init(d)); })
.def("GetDatasetSize", [](PythonDatasetSizeGetter &self, bool estimate) {
int64_t size;
THROW_IF_ERROR(self.GetDatasetSize(&size, estimate));
return size;
});
}));
} // namespace dataset
} // namespace mindspore

@ -65,4 +65,8 @@ Status PythonTreeGetters::GetRow(TensorRow *r) {
py::gil_scoped_release gil_release;
return TreeGetters::GetRow(r);
}
Status PythonDatasetSizeGetter::GetRow(const std::shared_ptr<TreeAdapter> &tree_adapter, TensorRow *r) {
py::gil_scoped_release gil_release;
return DatasetSizeGetter::GetRow(tree_adapter, r);
}
} // namespace mindspore::dataset

@ -60,5 +60,9 @@ class PythonTreeGetters : public TreeGetters {
public:
Status GetRow(TensorRow *r) override;
};
class PythonDatasetSizeGetter : public DatasetSizeGetter {
public:
Status GetRow(const std::shared_ptr<TreeAdapter> &tree_adapter, TensorRow *r) override;
};
} // namespace mindspore::dataset
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONSUMERS_PYTHON_TREE_CONSUMER_H_

@ -451,29 +451,6 @@ Status TreeGetters::Init(std::shared_ptr<DatasetNode> d) {
Status TreeGetters::GetRow(TensorRow *row) { return tree_adapter_->GetNext(row); }
Status TreeGetters::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ == -1) {
RETURN_IF_NOT_OK(InternalInit(static_cast<int8_t>(GetterPass::kDatasetSize)));
std::shared_ptr<DatasetOp> root = std::shared_ptr<DatasetOp>(tree_adapter_->GetRoot());
RETURN_UNEXPECTED_IF_NULL(root);
RETURN_IF_NOT_OK(root->GetDatasetSize(dataset_size));
if (*dataset_size == -1) { // run through the tree and get everything
TensorRow row;
RETURN_IF_NOT_OK(GetRow(&row));
int64_t row_cnt = 0;
while (!row.empty()) {
++row_cnt;
RETURN_IF_NOT_OK(GetRow(&row));
}
*dataset_size = row_cnt;
}
dataset_size_ = *dataset_size; // save the previous result
}
*dataset_size = dataset_size_;
return Status::OK();
}
Status TreeGetters::GetOutputTypes(std::vector<DataType> *types) {
RETURN_IF_NOT_OK(GetFirstRowShapeAndType());
*types = first_row_type_;
@ -573,5 +550,46 @@ Status BuildVocabConsumer::Start() {
CHECK_FAIL_RETURN_UNEXPECTED(row.empty(), "The fetched row from BuildVocab should be an EOE.");
return Status::OK();
}
Status DatasetSizeGetter::GetDatasetSize(int64_t *size, bool estimate) {
if (dataset_size_ == -1) {
RETURN_IF_NOT_OK(root_->GetDatasetSize(shared_from_this(), estimate, size));
dataset_size_ = *size; // save the previous result
}
*size = dataset_size_;
return Status::OK();
}
Status DatasetSizeGetter::Init(std::shared_ptr<DatasetNode> d) {
root_ = std::move(d);
return Status::OK();
}
Status DatasetSizeGetter::DryRun(std::shared_ptr<DatasetNode> ir_node, int64_t *dataset_size) {
std::shared_ptr<TreeAdapter> tree_adapter = std::make_shared<TreeAdapter>();
tree_adapters_.push_back(tree_adapter);
tree_adapter->SetPrePassOverride([](OptPass pre) {
pre.push_back(
std::make_unique<GetterPass>(static_cast<GetterPass::GetterType>(GetterPass::GetterType::kDatasetSize)));
return pre;
});
RETURN_IF_NOT_OK(tree_adapter->Compile(std::move(ir_node), 1));
TensorRow row;
RETURN_IF_NOT_OK(GetRow(tree_adapter, &row));
int64_t row_cnt = 0;
while (!row.empty()) {
++row_cnt;
RETURN_IF_NOT_OK(GetRow(tree_adapter, &row));
}
*dataset_size = row_cnt;
return Status::OK();
}
Status DatasetSizeGetter::GetRow(const std::shared_ptr<TreeAdapter> &tree_adapter, TensorRow *row) {
return tree_adapter->GetNext(row);
}
Status DatasetSizeGetter::Terminate() {
for (const auto &tree : tree_adapters_) {
RETURN_IF_NOT_OK(tree->AllTasks()->ServiceStop());
}
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -177,7 +177,6 @@ class TreeGetters : public TreeConsumer {
~TreeGetters() = default;
Status Init(std::shared_ptr<DatasetNode> d) override;
Status GetDatasetSize(int64_t *size);
Status GetOutputTypes(std::vector<DataType> *types);
Status GetOutputShapes(std::vector<TensorShape> *shapes);
Status GetBatchSize(int64_t *batch_size);
@ -186,7 +185,7 @@ class TreeGetters : public TreeConsumer {
Status GetColumnNames(std::vector<std::string> *output);
Status GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing);
std::string Name() override { return "TreeGetters"; }
virtual Status GetRow(TensorRow *r);
virtual Status GetRow(TensorRow *row);
private:
Status GetFirstRowShapeAndType();
@ -202,6 +201,35 @@ class TreeGetters : public TreeConsumer {
Status InternalInit();
};
/// Consumer that is used to get some pipeline information
class DatasetSizeGetter : public TreeConsumer, public std::enable_shared_from_this<DatasetSizeGetter> {
public:
DatasetSizeGetter() : dataset_size_(-1) {}
~DatasetSizeGetter() = default;
Status Init(std::shared_ptr<DatasetNode> d) override;
Status Terminate() override;
/// \brief Function to get the dataset size
/// \param[in] estimate This is only supported by some of the ops and it's used to speed up the process of getting
/// dataset size at the expense of accuracy.
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *size, bool estimate = false);
virtual Status GetRow(const std::shared_ptr<TreeAdapter> &tree_adapter, TensorRow *row);
std::string Name() override { return "DatasetSizeGetter"; }
/// \brief Gets the dataset size by iterating over the entire dataset on a sub tree starting from ir_node
/// param[in] ir_node The node that marks the top most of the sub tree on which we want to iterate
/// \return Status - The status code return
Status DryRun(std::shared_ptr<DatasetNode> ir_node, int64_t *dataset_size);
private:
std::shared_ptr<DatasetNode> root_;
std::vector<std::shared_ptr<TreeAdapter>> tree_adapters_;
int64_t dataset_size_;
};
class BuildVocabConsumer : public TreeConsumer {
public:
/// BuildVocabConsumer Constructor which will call the base class default constructor.

@ -531,30 +531,6 @@ Status BatchOp::ComputeColMap() {
return Status::OK();
}
Status BatchOp::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
#ifdef ENABLE_PYTHON
if (batch_size_func_) {
*dataset_size = -1;
return Status::OK();
}
#endif
int64_t num_rows;
RETURN_IF_NOT_OK(child_[0]->GetDatasetSize(&num_rows));
if (num_rows > 0 && start_batch_size_ > 0) {
if (drop_) {
num_rows = static_cast<int64_t>(floor(num_rows / (1.0 * start_batch_size_)));
} else {
num_rows = static_cast<int64_t>(ceil(num_rows / (1.0 * start_batch_size_)));
}
}
*dataset_size = num_rows;
dataset_size_ = num_rows;
return Status::OK();
}
int64_t BatchOp::GetTreeBatchSize() {
#ifdef ENABLE_PYTHON
if (batch_size_func_) {

@ -219,11 +219,6 @@ class BatchOp : public ParallelOp {
static Status PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
const std::unordered_map<std::string, int32_t> &column_name_id_map);
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
int64_t GetTreeBatchSize() override;
protected:

@ -232,12 +232,5 @@ Status BucketBatchByLengthOp::ComputeColMap() {
return Status::OK();
}
// Get Dataset size
Status BucketBatchByLengthOp::GetDatasetSize(int64_t *dataset_size) {
// We are returning -1 because we can't easily calculate GetDatasetSize. Returning -1 will make TreeGetters to
// iterate over the dataset and count the size
*dataset_size = dataset_size_;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -112,11 +112,6 @@ class BucketBatchByLengthOp : public PipelineOp {
std::string Name() const override { return kBucketBatchByLengthOp; }
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
// << Stream output operator overload
// @notes This allows you to write the debug print info using stream operators
// @param out - reference to the output stream being overloaded

@ -196,12 +196,5 @@ Status ConcatOp::PreAccept(NodePass *p, bool *modified) {
return p->PreRunOnNode(shared_from_base<ConcatOp>(), modified);
}
// Get Dataset size
Status ConcatOp::GetDatasetSize(int64_t *dataset_size) {
// We are returning -1 because we can't easily calculate GetDatasetSize. Returning -1 will make TreeGetters to
// iterate over the dataset and count the size
*dataset_size = dataset_size_;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -111,11 +111,6 @@ class ConcatOp : public PipelineOp {
/// \return Status of the node visit
Status PreAccept(NodePass *p, bool *modified) override;
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
private:
Status Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf);

@ -294,24 +294,6 @@ Status DatasetOp::GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t wo
return Status::OK();
}
// Gets the dataset size
Status DatasetOp::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
if (child_.size() == 1) {
return child_[0]->GetDatasetSize(dataset_size);
} else if (child_.size() > 1) {
// It is okay for dataset to have more than 1 child, GetDatasetSize shouldn't fail in this case.
// This is done mostly for cache, which injects cache lookup/merge operators. Cache path will
// always be in front of the child_ structure, so we get the dataset size from the last child.
return child_[child_.size() - 1]->GetDatasetSize(dataset_size);
} else {
RETURN_STATUS_UNEXPECTED("Trying to get dataset size from leaf node, missing override");
}
}
// Gets the number of classes
Status DatasetOp::GetNumClasses(int64_t *num_classes) {
if (child_.size() == 1) {

@ -180,10 +180,6 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
/// \return Status - The error code return
Status GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id = 0, int32_t child_index = 0);
/// \brief Gets the dataset size
/// \return Status - The status code return
virtual Status GetDatasetSize(int64_t *dataset_size);
/// \brief Gets the batch size
/// \return Status - The status code return
virtual int64_t GetTreeBatchSize();

@ -258,13 +258,5 @@ Status FilterOp::PreAccept(NodePass *p, bool *modified) {
return p->PreRunOnNode(shared_from_base<FilterOp>(), modified);
}
// Get Dataset size
Status FilterOp::GetDatasetSize(int64_t *dataset_size) {
// We are returning -1 because we can't easily calculate GetDatasetSize. Returning -1 will make TreeGetters to
// iterate over the dataset and count the size
*dataset_size = dataset_size_;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -137,11 +137,6 @@ class FilterOp : public ParallelOp {
// @return Name of the current Op
std::string Name() const override { return kFilterOp; }
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
private:
// predicate_func python callable which returns a boolean value.
std::shared_ptr<TensorOp> predicate_func_;

@ -187,21 +187,6 @@ Status RepeatOp::Accept(NodePass *p, bool *modified) {
return p->RunOnNode(shared_from_base<RepeatOp>(), modified);
}
// Get Dataset size
Status RepeatOp::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
int64_t num_rows;
RETURN_IF_NOT_OK(child_[0]->GetDatasetSize(&num_rows));
if (num_rows > 0 && num_repeats_ > 0) {
num_rows = num_rows * num_repeats_;
}
*dataset_size = num_rows;
dataset_size_ = num_rows;
return Status::OK();
}
int64_t RepeatOp::GetTreeRepeatCount() { return num_repeats_; }
} // namespace dataset
} // namespace mindspore

@ -133,11 +133,6 @@ class RepeatOp : public PipelineOp {
/// \@return Status - The error code return
Status Reset() override;
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
int64_t GetTreeRepeatCount() override;
// \brief Adds an operator to the repeat ops list of tracked leaf/eoe nodes

@ -136,20 +136,5 @@ Status SkipOp::PreAccept(NodePass *p, bool *modified) {
return p->PreRunOnNode(shared_from_base<SkipOp>(), modified);
}
// Get Dataset size
Status SkipOp::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
int64_t num_rows;
RETURN_IF_NOT_OK(child_[0]->GetDatasetSize(&num_rows));
*dataset_size = 0;
if (max_skips_ >= 0 && max_skips_ < num_rows) {
*dataset_size = num_rows - max_skips_;
}
dataset_size_ = *dataset_size;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -86,11 +86,6 @@ class SkipOp : public PipelineOp {
/// \return Status of the node visit
Status PreAccept(NodePass *p, bool *modified) override;
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
// Op name getter
// @return Name of the current Op
std::string Name() const override { return kSkipOp; }

@ -452,63 +452,5 @@ Status CelebAOp::ComputeColMap() {
return Status::OK();
}
// Get Dataset size
Status CelebAOp::GetDatasetSize(int64_t *dataset_size) {
int64_t num_rows, sample_size;
std::string line;
Path folder_path(folder_path_);
std::ifstream attr_file((folder_path / "list_attr_celeba.txt").toString());
if (!attr_file.is_open()) {
std::string attr_file_name = (folder_path / "list_attr_celeba.txt").toString();
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open Celeba attr file: " + attr_file_name);
}
std::string rows_num;
(void)getline(attr_file, rows_num);
try {
num_rows = static_cast<int64_t>(std::stoul(rows_num)); // First line is rows number in attr file
} catch (std::invalid_argument &e) {
RETURN_STATUS_UNEXPECTED(
"Invalid data, failed to convert rows_num from attr_file to unsigned long, invalid argument: " + rows_num);
} catch (std::out_of_range &e) {
RETURN_STATUS_UNEXPECTED(
"Invalid data, failed to convert rows_num from attr_file to unsigned long, out of range: " + rows_num);
}
if (usage_ != "all") {
int64_t partition_num = 0;
char usage_type;
if (usage_ == "train") {
usage_type = '0';
} else {
if (usage_ == "valid") {
usage_type = '1';
} else {
if (usage_ == "test")
usage_type = '2';
else
RETURN_STATUS_UNEXPECTED("Invalid usage.");
}
}
if (!partition_file_.is_open()) {
partition_file_.open((folder_path / "list_eval_partition.txt").toString());
}
if (partition_file_.is_open()) {
while (getline(partition_file_, line)) {
int start = line.find(' ');
if (line.at(start + 1) == usage_type) {
partition_num++;
}
}
} else {
std::string partition_file_name = "list_eval_partition.txt";
RETURN_STATUS_UNEXPECTED("Invalid file, failed to open Celeba partition file: " + partition_file_name);
}
num_rows = std::min(num_rows, partition_num);
}
sample_size = sampler_->CalculateNumSamples(num_rows);
*dataset_size = sample_size;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -179,11 +179,6 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
// @return Name of the current Op
std::string Name() const override { return "CelebAOp"; }
/// \brief Base-class override for GetDatasetSize
/// \param[out] dataset_size the size of the dataset
/// \return Status of the function
Status GetDatasetSize(int64_t *dataset_size) override;
private:
// Called first when function is called
// @return

@ -508,20 +508,5 @@ Status CifarOp::ComputeColMap() {
return Status::OK();
}
// Get Dataset size
Status CifarOp::GetDatasetSize(int64_t *dataset_size) {
if (dataset_size_ > 0) {
*dataset_size = dataset_size_;
return Status::OK();
}
int64_t num_rows, sample_size;
num_rows = num_rows_;
if (num_rows_ <= 0)
RETURN_IF_NOT_OK(CountTotalRows(folder_path_, usage_, cifar_type_ == CifarType::kCifar10, &num_rows));
sample_size = sampler_->CalculateNumSamples(num_rows);
*dataset_size = sample_size;
dataset_size_ = *dataset_size;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save