[Part I] Push down json save logic to IR and add getter to each IR node

pull/11032/head
TinaMengtingZhang 4 years ago
parent 9646953465
commit 4812fece80

@ -22,6 +22,7 @@
#include "minddata/dataset/callback/py_ds_callback.h"
#include "minddata/dataset/core/constants.h"
#include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/engine/serdes.h"
#include "minddata/dataset/include/datasets.h"
#include "minddata/dataset/text/sentence_piece_vocab.h"
@ -92,7 +93,13 @@ PYBIND_REGISTER(DatasetNode, 1, ([](const py::module *m) {
THROW_IF_ERROR(zip->ValidateParams());
return zip;
},
py::arg("datasets"));
py::arg("datasets"))
.def("to_json", [](std::shared_ptr<DatasetNode> self, const std::string &json_filepath) {
nlohmann::json args;
auto serdas = std::make_shared<Serdes>();
THROW_IF_ERROR(serdas->SaveToJSON(self, json_filepath, &args));
return args.dump();
});
}));
// PYBIND FOR LEAF NODES

@ -258,6 +258,11 @@ std::shared_ptr<SamplerObj> PreBuiltSamplerObj::Copy() {
return sampler;
}
Status PreBuiltSamplerObj::to_json(nlohmann::json *out_json) {
RETURN_IF_NOT_OK(sp_->to_json(out_json));
return Status::OK();
}
#ifndef ENABLE_ANDROID
std::shared_ptr<mindrecord::ShardOperator> PKSamplerObj::BuildForMindDataset() {
// runtime mindrecord sampler object

@ -229,6 +229,11 @@ std::shared_ptr<TensorOp> PreBuiltOperation::Build() { return op_; }
std::string PreBuiltOperation::Name() const { return op_ ? op_->Name() : kPreBuiltOperation; }
Status PreBuiltOperation::to_json(nlohmann::json *out_json) {
RETURN_IF_NOT_OK(op_->to_json(out_json));
return Status::OK();
}
// RandomApplyOperation
RandomApplyOperation::RandomApplyOperation(const std::vector<std::shared_ptr<TensorOperation>> &transforms, double prob)
: TensorOperation(true), transforms_(transforms), prob_(prob) {}

@ -20,6 +20,7 @@ set(SRC_FILES_LIST
runtime_context.cc
python_runtime_context.cc
consumers/tree_consumer.cc
serdes.cc
)
if (ENABLE_PYTHON)
set(SRC_FILES_LIST

@ -190,5 +190,26 @@ void DistributedSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const
}
}
Status DistributedSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "DistributedSampler";
args["num_shards"] = num_devices_;
args["shard_id"] = device_id_;
args["shuffle"] = shuffle_;
args["num_samples"] = num_samples_;
args["offset"] = offset_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -72,6 +72,11 @@ class DistributedSamplerRT : public SamplerRT {
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
int64_t cnt_; // number of samples that have already been filled in to buffer
uint32_t seed_;

@ -128,5 +128,24 @@ void PKSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
// Then add our own info if any
}
}
Status PKSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "PKSampler";
args["num_val"] = samples_per_class_;
args["shuffle"] = shuffle_;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -61,6 +61,11 @@ class PKSamplerRT : public SamplerRT { // NOT YET FINISHED
// @param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
bool shuffle_;
uint32_t seed_;

@ -127,5 +127,24 @@ void RandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
// Then add our own info if any
}
}
Status RandomSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "RandomSampler";
args["replacement"] = replacement_;
args["num_samples"] = num_samples_;
args["reshuffle_each_epoch"] = reshuffle_each_epoch_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -52,6 +52,11 @@ class RandomSamplerRT : public SamplerRT {
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
uint32_t seed_;
bool replacement_;

@ -149,6 +149,11 @@ class SamplerRT {
// @return Status The status code returned
Status GetAssociatedChildId(int64_t *out_associated_id, int64_t id);
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
virtual Status to_json(nlohmann::json *out_json) { return Status::OK(); }
protected:
// Number of rows of data from the place this sampler is sampling from. If this sampler
// has a child sampler, num_rows_ is the number of ids the child sampler will

@ -17,6 +17,7 @@
#include <algorithm>
#include <memory>
#include <vector>
namespace mindspore {
namespace dataset {
@ -131,5 +132,23 @@ void SequentialSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
out << "\nStart index: " << start_index_;
}
}
Status SequentialSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "SequentialSampler";
args["start_index"] = start_index_;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -61,6 +61,11 @@ class SequentialSamplerRT : public SamplerRT {
// @param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
int64_t current_id_; // The id sequencer. Each new id increments from this
int64_t start_index_; // The starting id. current_id_ begins from here.

@ -131,5 +131,23 @@ void SubsetRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const
// Then add our own info if any
}
}
Status SubsetRandomSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "SubsetRandomSampler";
args["indices"] = indices_;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -56,6 +56,11 @@ class SubsetRandomSamplerRT : public SamplerRT {
// @param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
// A list of indices (already randomized in constructor).
std::vector<int64_t> indices_;

@ -193,5 +193,24 @@ void WeightedRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) con
// Then add our own info if any
}
}
Status WeightedRandomSamplerRT::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["sampler_name"] = "WeightedRandomSampler";
args["weights"] = weights_;
args["num_samples"] = num_samples_;
args["replacement"] = replacement_;
if (this->HasChildSampler()) {
std::vector<nlohmann::json> children_args;
for (auto child : child_) {
nlohmann::json child_arg;
RETURN_IF_NOT_OK(child->to_json(&child_arg));
children_args.push_back(child_arg);
}
args["child_sampler"] = children_args;
}
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -58,6 +58,11 @@ class WeightedRandomSamplerRT : public SamplerRT {
// @param show_all - bool to show detailed vs summary
void SamplerPrint(std::ostream &out, bool show_all) const override;
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
// A list of weights for each sample.
std::vector<double> weights_;

@ -28,6 +28,7 @@ class DatasetCache {
virtual Status Build() = 0;
virtual Status ValidateParams() = 0;
virtual Status CreateCacheOp(int num_workers, std::shared_ptr<DatasetOp> *ds_op) = 0;
virtual Status to_json(nlohmann::json *out_json) { return Status::OK(); }
};
} // namespace mindspore::dataset

@ -44,5 +44,18 @@ Status DatasetCacheImpl::CreateCacheOp(int32_t num_workers, std::shared_ptr<Data
return Status::OK();
}
Status DatasetCacheImpl::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["session_id"] = session_id_;
args["cache_memory_size"] = cache_mem_sz_;
args["spill"] = spill_;
if (hostname_) args["hostname"] = hostname_.value();
if (port_) args["port"] = port_.value();
if (num_connections_) args["num_connections"] = num_connections_.value();
if (prefetch_sz_) args["prefetch_size"] = prefetch_sz_.value();
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -60,6 +60,8 @@ class DatasetCacheImpl : public DatasetCache {
~DatasetCacheImpl() = default;
Status to_json(nlohmann::json *out_json) override;
private:
std::shared_ptr<CacheClient> cache_client_;
session_id_type session_id_;

@ -152,5 +152,20 @@ Status BatchNode::AcceptAfter(IRNodePass *const p, bool *const modified) {
// Downcast shared pointer then call visitor
return p->VisitAfter(shared_from_base<BatchNode>(), modified);
}
Status BatchNode::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["num_parallel_workers"] = num_workers_;
args["batch_size"] = batch_size_;
args["drop_remainder"] = drop_remainder_;
#ifdef ENABLE_PYTHON
args["input_columns"] = in_col_names_;
args["output_columns"] = out_col_names_;
args["column_order"] = col_order_;
if (batch_map_func_ != nullptr) args["per_batch_map"] = "pyfunc";
#endif
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

@ -87,6 +87,24 @@ class BatchNode : public DatasetNode {
/// \return Status of the node visit
Status AcceptAfter(IRNodePass *const p, bool *const modified) override;
/// \brief Getter functions
int32_t BatchSize() const { return batch_size_; }
bool DropRemainder() const { return drop_remainder_; }
#ifdef ENABLE_PYTHON
bool Pad() const { return pad_; }
const std::vector<std::string> &InColNames() const { return in_col_names_; }
const std::vector<std::string> &OutColNames() const { return out_col_names_; }
const std::vector<std::string> &ColOrder() const { return col_order_; }
const py::function &BatchSizeFunc() const { return batch_size_func_; }
const py::function &BatchMapFunc() const { return batch_map_func_; }
const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &PadMap() const { return pad_map_; }
#endif
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
Status to_json(nlohmann::json *out_json) override;
private:
int32_t batch_size_;
bool drop_remainder_;

@ -431,6 +431,13 @@ Status DatasetNode::ValidateParams() {
return Status::OK();
}
Status DatasetNode::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["num_parallel_workers"] = num_workers_;
*out_json = args;
return Status::OK();
}
Status MappableSourceNode::Accept(IRNodePass *const p, bool *const modified) {
return p->Visit(shared_from_base<MappableSourceNode>(), modified);
}

@ -271,6 +271,11 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> {
virtual bool IsSizeDefined() { return true; }
/// \brief Get the arguments of node
/// \param[out] out_json JSON string of all attributes
/// \return Status of the function
virtual Status to_json(nlohmann::json *out_json);
protected:
std::vector<std::shared_ptr<DatasetNode>> children_;
DatasetNode *parent_; // used to record the only one parent of an IR node after parsing phase

@ -16,6 +16,7 @@
#include "minddata/dataset/engine/ir/datasetops/map_node.h"
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
@ -122,5 +123,33 @@ void MapNode::setOperations(const std::vector<std::shared_ptr<TensorOperation>>
operations_ = operations;
}
std::vector<std::shared_ptr<TensorOperation>> MapNode::operations() { return operations_; }
Status MapNode::to_json(nlohmann::json *out_json) {
nlohmann::json args;
args["num_parallel_workers"] = num_workers_;
args["input_columns"] = input_columns_;
args["output_columns"] = output_columns_;
if (!project_columns_.empty()) args["column_order"] = project_columns_;
if (cache_ != nullptr) {
nlohmann::json cache_args;
RETURN_IF_NOT_OK(cache_->to_json(&cache_args));
args["cache"] = cache_args;
}
std::vector<nlohmann::json> ops;
std::vector<int32_t> cbs;
nlohmann::json op_args;
for (auto op : operations_) {
RETURN_IF_NOT_OK(op->to_json(&op_args));
op_args["tensor_op_name"] = op->Name();
ops.push_back(op_args);
}
args["operations"] = ops;
std::transform(callbacks_.begin(), callbacks_.end(), std::back_inserter(cbs),
[](std::shared_ptr<DSCallback> cb) -> int32_t { return cb->step_size(); });
args["callback"] = cbs;
*out_json = args;
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save