MindData optimizer infrastructure.

pull/1272/head
Junhan Hu 5 years ago
parent 6cbde2b3bb
commit f44d213503

@ -65,6 +65,7 @@ set(submodules
$<TARGET_OBJECTS:engine-datasetops-source>
$<TARGET_OBJECTS:engine-datasetops-source-sampler>
$<TARGET_OBJECTS:engine-datasetops>
$<TARGET_OBJECTS:engine-opt>
$<TARGET_OBJECTS:engine>
)

@ -1,4 +1,5 @@
add_subdirectory(datasetops)
add_subdirectory(opt)
if (ENABLE_TDTQUE)
add_subdirectory(tdt)
endif ()
@ -14,7 +15,7 @@ add_library(engine OBJECT
target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS})
if (ENABLE_TDTQUE)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt)
else()
add_dependencies(engine engine-datasetops engine-datasetops-source)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt)
endif ()

@ -22,6 +22,7 @@
#include "dataset/core/pybind_support.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/opt/pass.h"
using float16 = Eigen::half;
@ -462,5 +463,11 @@ Status BatchOp::PadHelper(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> d
return Status::OK();
}
// Visitor accept method for NodePass
Status BatchOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<BatchOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -192,6 +192,12 @@ class BatchOp : public ParallelOp {
Status PadTensor(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> *dst, const std::vector<dsize_t> &pad_shape,
float pad_val);
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
// recursive helper function. This function could be very expensive if called on a multi-dimensional tensor
// it is only meant to be called by PadTensor.

@ -25,6 +25,7 @@
#include "dataset/engine/datasetops/device_queue_op.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/opt/pass.h"
#include "utils/log_adapter.h"
@ -249,5 +250,11 @@ Status DatasetOp::AssignColMapFromChild() {
}
return Status::OK();
}
Status DatasetOp::Accept(NodePass *p, bool *modified) {
// DatasetOp is the base class of visitor target.
// This method will only be called if its derived class does not implement one.
return p->RunOnNode(shared_from_this(), modified);
}
} // namespace dataset
} // namespace mindspore

@ -32,6 +32,8 @@ class ExecutionTree;
class DataBuffer;
class NodePass;
// The base class DatasetOp is the main tree node. It is an abstract class, so
// the actual implementation of the operators will be derived from here.
class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
@ -209,6 +211,16 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @return - the column name map as a string
std::string ColumnNameMapAsString() const;
// Children Getter
// @return Vector or Children
std::vector<std::shared_ptr<DatasetOp>> Children() const { return child_; }
// Base method for NodePass visit.
// Subclass needs to override this if it requires special node visit access.
// Check "dataset/engine/opt/pass.h" for more details.
// @return Statue of the node visit
virtual Status Accept(NodePass *p, bool *modified);
protected:
// Adds a parent operator to this operator
// @notes External callers do not have access to this function.

@ -24,6 +24,7 @@
#include "dataset/engine/dataset_iterator.h"
#include "dataset/util/status.h"
#include "dataset/util/task_manager.h"
#include "dataset/engine/opt/pass.h"
#ifdef ENABLE_TDTQUE
#include "tdt/tsd_client.h"
@ -265,5 +266,12 @@ void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
out << "\nChannel name: " << channel_name_ << "\nPrefetch size: " << prefetch_size_ << "\n\n";
}
}
// Visitor accept method for NodePass
Status DeviceQueueOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<DeviceQueueOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -134,6 +134,12 @@ class DeviceQueueOp : public PipelineOp {
Status operator()() override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
// Name: checkExceptions(DataBuffer);
// Description: Check whether the dataBuffer meets the condition for performing DeviceQueueOp

@ -27,6 +27,7 @@
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/opt/pass.h"
#include "dataset/kernels/tensor_op.h"
#include "utils/log_adapter.h"
#include "dataset/util/task_manager.h"
@ -259,5 +260,11 @@ Status FilterOp::InvokePredicateFunc(const TensorRow &input, bool *out_predicate
}
return Status(StatusCode::kOK, "FilterOp predicate func call succeed");
}
// Visitor accept method for NodePass
Status FilterOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<FilterOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -121,6 +121,12 @@ class FilterOp : public ParallelOp {
// @param show_all A bool to control if you want to show all info or just a summary.
void Print(std::ostream &out, bool show_all) const override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
// predicate_func python callable which returns a boolean value.
py::function predicate_func_;

@ -27,6 +27,7 @@
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/opt/pass.h"
#include "dataset/kernels/tensor_op.h"
#include "utils/log_adapter.h"
#include "dataset/util/task_manager.h"
@ -370,5 +371,11 @@ void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name
column_name_id_map_ = final_col_name_id_map;
}
}
// Visitor accept method for NodePass
Status MapOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<MapOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -171,6 +171,12 @@ class MapOp : public ParallelOp {
// @return the number of threads consuming data from previous op's output Connector.
int32_t num_consumers() const override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
// Local queues where worker threads can pop from.
// Popping directly from the Connector can block if the previous designated threads haven't pop.

@ -25,6 +25,7 @@
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/opt/pass.h"
#include "utils/log_adapter.h"
namespace mindspore {
@ -144,5 +145,11 @@ Status ProjectOp::EoeReceived(int32_t worker_id) {
}
Status ProjectOp::EofReceived(int32_t worker_id) { return Status::OK(); }
// Visitor accept method for NodePass
Status ProjectOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ProjectOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -101,6 +101,12 @@ class ProjectOp : public PipelineOp {
// @return Status - The error code returned.
Status EofReceived(int32_t worker_id) override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
std::vector<std::string> columns_to_project_;
std::vector<int32_t> projected_column_indices_;

@ -24,6 +24,7 @@
#include "dataset/core/global_context.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/opt/pass.h"
#include "utils/log_adapter.h"
namespace mindspore {
@ -170,5 +171,11 @@ Status RenameOp::EoeReceived(int32_t) {
state_ = OpState::kDeOpIdle;
return Status::OK();
}
// Visitor accept method for NodePass
Status RenameOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<RenameOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -110,6 +110,12 @@ class RenameOp : public PipelineOp {
// @return Status - The error code return
Status operator()() override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
protected:
// Rename core functionality
Status RenameColumns();

@ -21,6 +21,7 @@
#include "dataset/engine/datasetops/repeat_op.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/opt/pass.h"
#include "utils/log_adapter.h"
@ -187,5 +188,11 @@ int32_t RepeatOp::num_producers() const {
return child_[0]->num_producers();
}
}
// Visitor accept method for NodePass
Status RepeatOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<RepeatOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -118,6 +118,12 @@ class RepeatOp : public PipelineOp {
// @param workerId - The worker id
int32_t num_producers() const override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
int32_t max_repeats_; // The number of repeats that the user requested
int32_t repeat_count_; // A counter for the current number of executed repeats

@ -30,6 +30,7 @@
#include "dataset/engine/dataset_iterator.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/opt/pass.h"
#include "dataset/util/random.h"
#include "dataset/util/status.h"
@ -296,5 +297,11 @@ Status ShuffleOp::EoeReceived(int32_t worker_id) {
state_ = OpState::kDeOpIdle;
return Status::OK();
}
// Visitor accept method for NodePass
Status ShuffleOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ShuffleOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -155,6 +155,12 @@ class ShuffleOp : public PipelineOp {
// @return Status - The error code return
Status EoeReceived(int32_t worker_id) override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
// Private function to add a new row to the shuffle buffer.
// @return Status - The error code return

@ -22,6 +22,7 @@
#include "dataset/engine/datasetops/skip_op.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/opt/pass.h"
#include "utils/log_adapter.h"
@ -128,5 +129,11 @@ Status SkipOp::EofReceived(int32_t worker_id) {
MS_LOG(DEBUG) << "Skip operator EOF received, do nothing now.";
return Status::OK();
}
// Visitor accept method for NodePass
Status SkipOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<SkipOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -74,6 +74,12 @@ class SkipOp : public PipelineOp {
// @param worker_id - The worker id
Status EofReceived(int32_t worker_id) override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
int32_t max_skips_; // The number of skips that the user requested
int32_t skip_count_; // A counter for the current number of executed skips

@ -20,6 +20,7 @@
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/util/task_manager.h"
#include "dataset/engine/opt/pass.h"
namespace mindspore {
namespace dataset {
@ -250,5 +251,11 @@ Status GeneratorOp::Reset() {
wp_.Set();
return Status(StatusCode::kOK, "GeneratorOp Reset Succeed");
}
// Visitor accept method for NodePass
Status GeneratorOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<GeneratorOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

@ -121,6 +121,12 @@ class GeneratorOp : public PipelineOp {
// @return Status - The error code return
Status Reset() override;
// Base-class override for NodePass visitor acceptor.
// @param p - Pointer to the NodePass to be accepted.
// @param modified - Whether this node visit modified the pipeline.
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;
private:
py::function generator_function_;
std::vector<std::string> column_names_;

@ -22,6 +22,7 @@
#include "dataset/engine/datasetops/source/sampler/sequential_sampler.h"
#include "dataset/engine/db_connector.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/opt/pass.h"
namespace mindspore {
namespace dataset {
@ -451,5 +452,11 @@ Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const int64_t
(*num_rows) = (row_cnt / num_dev) + (row_cnt % num_dev == 0 ? 0 : 1);
return Status::OK();
}
// Visitor accept method for NodePass
Status ImageFolderOp::Accept(NodePass *p, bool *modified) {
// Downcast shared pointer then call visitor
return p->RunOnNode(std::static_pointer_cast<ImageFolderOp>(shared_from_this()), modified);
}
} // namespace dataset
} // namespace mindspore

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save