Enable caching over non-mappable dataset c++ api

and caching over Album dataset
pull/7796/head
Lixia Chen 4 years ago
parent fd7732cbcb
commit 02ddddfe7e

@ -301,8 +301,9 @@ std::shared_ptr<SchemaObj> Schema(const std::string &schema_file) {
// Function to create a AlbumNode.
std::shared_ptr<AlbumNode> Album(const std::string &dataset_dir, const std::string &data_schema,
const std::vector<std::string> &column_names, bool decode,
const std::shared_ptr<SamplerObj> &sampler) {
auto ds = std::make_shared<AlbumNode>(dataset_dir, data_schema, column_names, decode, sampler);
const std::shared_ptr<SamplerObj> &sampler,
const std::shared_ptr<DatasetCache> &cache) {
auto ds = std::make_shared<AlbumNode>(dataset_dir, data_schema, column_names, decode, sampler, cache);
return ds->ValidateParams() ? ds : nullptr;
}
@ -1021,9 +1022,25 @@ std::shared_ptr<DatasetCache> CreateDatasetCache(session_id_type id, uint64_t me
auto cache = std::make_shared<DatasetCacheImpl>(id, mem_sz, spill, hostname, port, num_connections, prefetch_sz);
return cache->ValidateParams() ? cache : nullptr;
}
#endif
std::shared_ptr<SamplerObj> SelectSampler(int64_t num_samples, bool shuffle, int32_t num_shards, int32_t shard_id) {
if (shuffle) {
if (num_shards > 1) {
// If shuffle enabled, sharding enabled, use distributed random sampler
return DistributedSampler(num_shards, shard_id, shuffle, num_samples);
}
// If shuffle enabled, sharding disabled, use random sampler
return RandomSampler(num_samples >= 0, num_samples);
}
if (num_shards > 1) {
// If shuffle disabled, sharding enabled, use distributed sequential sampler
return DistributedSampler(num_shards, shard_id, shuffle, num_samples);
}
// If shuffle disabled, sharding disabled, use sequential sampler
return SequentialSampler(0, num_samples);
}
} // namespace api
} // namespace dataset
} // namespace mindspore

@ -425,9 +425,9 @@ Status AlbumOp::LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num,
// to take a reference to a column descriptor?
// the design of this class is to make the code more readable, forgoing minor perfomance gain like
// getting rid of duplicated checks
Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) {
Status AlbumOp::LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row) {
// testing here is to just print out file path
(*row) = {};
(*row) = TensorRow(row_id, {});
MS_LOG(INFO) << "Image row file: " << file << ".";
std::ifstream file_handle(folder_path_ + file);
@ -444,7 +444,7 @@ Status AlbumOp::LoadTensorRow(const std::string &file, TensorRow *row) {
// get columns in schema:
int32_t columns = data_schema_->NumColumns();
// loop over each column descriptor, this can optimized by swtich cases
// loop over each column descriptor, this can optimized by switch cases
for (int32_t i = 0; i < columns; i++) {
// special case to handle
if (data_schema_->column(i).name() == "id") {
@ -521,7 +521,7 @@ Status AlbumOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Dat
TensorRow trow;
for (const int64_t &key : keys) {
RETURN_IF_NOT_OK(this->LoadTensorRow(image_rows_[key], &trow));
RETURN_IF_NOT_OK(this->LoadTensorRow(key, image_rows_[key], &trow));
deq->push_back(std::move(trow));
}
(*db)->set_tensor_table(std::move(deq));

@ -269,10 +269,11 @@ class AlbumOp : public ParallelOp, public RandomAccessOp {
Status LoadIDTensor(const std::string &file, uint32_t col_num, TensorRow *row);
/// \brief Load a tensor row according to a json file
/// \param[in] row_id_type row_id - id for this tensor row
/// \param[in] ImageColumns file Json file location
/// \param[inout] TensorRow row Json content stored into a tensor row
/// \return Status The error code returned
Status LoadTensorRow(const std::string &file, TensorRow *row);
Status LoadTensorRow(row_id_type row_id, const std::string &file, TensorRow *row);
/// \param[in] const std::vector<int64_t> &keys Keys in ioblock
/// \param[inout] std::unique_ptr<DataBuffer> db Databuffer to push to

@ -187,6 +187,10 @@ class ClueOp : public ParallelOp {
/// that this clue op will produce the full set of data into the cache.
void MakeSimpleProducer();
// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ClueOp"; }
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.

@ -31,8 +31,9 @@ namespace api {
// Constructor for AlbumNode
AlbumNode::AlbumNode(const std::string &dataset_dir, const std::string &data_schema,
const std::vector<std::string> &column_names, bool decode,
const std::shared_ptr<SamplerObj> &sampler)
: dataset_dir_(dataset_dir),
const std::shared_ptr<SamplerObj> &sampler, const std::shared_ptr<DatasetCache> &cache)
: Dataset(std::move(cache)),
dataset_dir_(dataset_dir),
schema_path_(data_schema),
column_names_(column_names),
decode_(decode),
@ -63,6 +64,8 @@ std::vector<std::shared_ptr<DatasetOp>> AlbumNode::Build() {
// Argument that is not exposed to user in the API.
std::set<std::string> extensions = {};
RETURN_EMPTY_IF_ERROR(AddCacheOp(&node_ops));
node_ops.push_back(std::make_shared<AlbumOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_,
decode_, extensions, std::move(schema), std::move(sampler_->Build())));
return node_ops;

@ -31,7 +31,8 @@ class AlbumNode : public Dataset {
public:
/// \brief Constructor
AlbumNode(const std::string &dataset_dir, const std::string &data_schema,
const std::vector<std::string> &column_names, bool decode, const std::shared_ptr<SamplerObj> &sampler);
const std::vector<std::string> &column_names, bool decode, const std::shared_ptr<SamplerObj> &sampler,
const std::shared_ptr<DatasetCache> &cache);
/// \brief Destructor
~AlbumNode() = default;

@ -185,15 +185,21 @@ std::vector<std::shared_ptr<DatasetOp>> CLUENode::Build() {
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
// ClueOp by itself is a non-mappable dataset that does not support sampling.
// However, if a cache operator is injected at some other place higher in the tree, that cache can
// inherit this sampler from the leaf, providing sampling support from the caching layer.
// That is why we save the sampler here in a leaf node that does not use sampling.
std::shared_ptr<SamplerObj> sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_);
// Sort the dataset files in a lexicographical order
std::vector<std::string> sorted_dataset_files = dataset_files_;
std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end());
std::shared_ptr<ClueOp> clue_op =
std::make_shared<ClueOp>(num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map,
sorted_dataset_files, connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr);
std::shared_ptr<ClueOp> clue_op = std::make_shared<ClueOp>(
num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, sorted_dataset_files,
connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build()));
RETURN_EMPTY_IF_ERROR(clue_op->Init());
if (shuffle_ == ShuffleMode::kGlobal) {
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

@ -80,6 +80,12 @@ std::vector<std::shared_ptr<DatasetOp>> CSVNode::Build() {
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
// CSVOp by itself is a non-mappable dataset that does not support sampling.
// However, if a cache operator is injected at some other place higher in the tree, that cache can
// inherit this sampler from the leaf, providing sampling support from the caching layer.
// That is why we save the sampler here in a leaf node that does not use sampling.
std::shared_ptr<SamplerObj> sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_);
// Sort the dataset files in a lexicographical order
std::vector<std::string> sorted_dataset_files = dataset_files_;
std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end());
@ -98,11 +104,12 @@ std::vector<std::shared_ptr<DatasetOp>> CSVNode::Build() {
}
}
std::shared_ptr<CsvOp> csv_op = std::make_shared<CsvOp>(
sorted_dataset_files, field_delim_, column_default_list, column_names_, num_workers_, rows_per_buffer_,
num_samples_, worker_connector_size_, connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr);
std::shared_ptr<CsvOp> csv_op =
std::make_shared<CsvOp>(sorted_dataset_files, field_delim_, column_default_list, column_names_, num_workers_,
rows_per_buffer_, num_samples_, worker_connector_size_, connector_que_size_, shuffle_files,
num_shards_, shard_id_, std::move(sampler_->Build()));
RETURN_EMPTY_IF_ERROR(csv_op->Init());
if (shuffle_ == ShuffleMode::kGlobal) {
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

@ -59,6 +59,12 @@ std::vector<std::shared_ptr<DatasetOp>> TextFileNode::Build() {
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
// TextFileOp by itself is a non-mappable dataset that does not support sampling.
// However, if a cache operator is injected at some other place higher in the tree, that cache can
// inherit this sampler from the leaf, providing sampling support from the caching layer.
// That is why we save the sampler here in a leaf node that does not use sampling.
std::shared_ptr<SamplerObj> sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_);
// Sort the dataset files in a lexicographical order
std::vector<std::string> sorted_dataset_files = dataset_files_;
std::sort(sorted_dataset_files.begin(), sorted_dataset_files.end());
@ -71,10 +77,10 @@ std::vector<std::shared_ptr<DatasetOp>> TextFileNode::Build() {
// Create and initalize TextFileOp
std::shared_ptr<TextFileOp> text_file_op = std::make_shared<TextFileOp>(
num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, std::move(schema), sorted_dataset_files,
connector_que_size_, shuffle_files, num_shards_, shard_id_, nullptr);
connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build()));
RETURN_EMPTY_IF_ERROR(text_file_op->Init());
if (shuffle_ == ShuffleMode::kGlobal) {
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;
int64_t num_rows = 0;

@ -52,14 +52,21 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordNode::Build() {
bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles);
// TFReaderOp by itself is a non-mappable dataset that does not support sampling.
// However, if a cache operator is injected at some other place higher in the tree, that cache can
// inherit this sampler from the leaf, providing sampling support from the caching layer.
// That is why we save the sampler here in a leaf node that does not use sampling.
std::shared_ptr<SamplerObj> sampler_ = SelectSampler(num_samples_, shuffle_files, num_shards_, shard_id_);
// Create and initialize TFReaderOp
std::shared_ptr<TFReaderOp> tf_reader_op = std::make_shared<TFReaderOp>(
num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files, std::move(data_schema),
connector_que_size_, columns_list_, shuffle_files, num_shards_, shard_id_, shard_equal_rows_, nullptr);
std::shared_ptr<TFReaderOp> tf_reader_op =
std::make_shared<TFReaderOp>(num_workers_, worker_connector_size_, rows_per_buffer_, num_samples_, sorted_dir_files,
std::move(data_schema), connector_que_size_, columns_list_, shuffle_files, num_shards_,
shard_id_, shard_equal_rows_, std::move(sampler_->Build()));
RETURN_EMPTY_IF_ERROR(tf_reader_op->Init());
if (shuffle_ == ShuffleMode::kGlobal) {
if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) {
// Inject ShuffleOp
std::shared_ptr<DatasetOp> shuffle_op = nullptr;

@ -188,10 +188,7 @@ Status CacheTransformPass::CachePass::RunOnNode(std::shared_ptr<ImageFolderOp> n
// Perform leaf node cache transform identification
Status CacheTransformPass::CachePass::RunOnNode(std::shared_ptr<AlbumOp> node, bool *modified) {
if (is_caching_) {
RETURN_STATUS_UNEXPECTED("There is currently no support for AlbumOp under cache.");
}
return Status::OK();
return MappableCacheLeafSetup(std::static_pointer_cast<DatasetOp>(node));
}
// Perform leaf node cache transform identification

@ -144,10 +144,13 @@ std::shared_ptr<SchemaObj> Schema(const std::string &schema_file = "");
/// \param[in] decode the option to decode the images in dataset (default = false)
/// \param[in] sampler Object used to choose samples from the dataset. If sampler is not given,
/// a `RandomSampler` will be used to randomly iterate the entire dataset (default = RandomSampler())
/// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used).
/// The cache feature is under development and is not recommended.
/// \return Shared pointer to the current Dataset
std::shared_ptr<AlbumNode> Album(const std::string &dataset_dir, const std::string &data_schema,
const std::vector<std::string> &column_names = {}, bool decode = false,
const std::shared_ptr<SamplerObj> &sampler = RandomSampler());
const std::shared_ptr<SamplerObj> &sampler = RandomSampler(),
const std::shared_ptr<DatasetCache> &cache = nullptr);
/// \brief Function to create a CelebANode
/// \notes The generated dataset has two columns ['image', 'attr'].
@ -549,6 +552,17 @@ std::shared_ptr<DatasetCache> CreateDatasetCache(session_id_type id, uint64_t me
std::optional<int32_t> prefetch_sz = std::nullopt);
#endif
/// \brief Function to create a sampler for non-mappable dataset (to be used by cache op later).
/// \notes Non-mappable dataset does not directly support a sampler. It has provided sampling arguments (shuffle,
/// num_samples, num_shards, shard_id) and it DOES support sampling if somewhere above it in the pipeline contains
/// a cache. If there is no cache above it, then the sampler is not used.
/// \param[in] num_samples The number of samples to be included in the dataset.
/// \param[in] shuffle If true, the indices are shuffled.
/// \param[in] num_shards Number of shards to divide the dataset into.
/// \param[in] shard_id Shard ID of the current shard within num_shards.
/// \return Shared pointer to the current Sampler.
std::shared_ptr<SamplerObj> SelectSampler(int64_t num_samples, bool shuffle, int32_t num_shards, int32_t shard_id);
/// \brief Function to create a ZipNode
/// \notes Applies zip to the dataset
/// \param[in] datasets List of shared pointers to the datasets that we want to zip

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save