From 12b4684e2670c675f2b6aeaddb7d82ac90ee813c Mon Sep 17 00:00:00 2001 From: Zirui Wu Date: Tue, 15 Dec 2020 11:22:54 -0500 Subject: [PATCH] address testing ticket add tree_adapter usage flag --- .../minddata/dataset/engine/consumers/tree_consumer.cc | 4 ++-- .../minddata/dataset/engine/consumers/tree_consumer.h | 2 +- .../minddata/dataset/engine/opt/post/auto_worker_pass.cc | 9 ++++++--- mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc | 4 +--- mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h | 1 - mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc | 6 +++--- mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h | 9 +++++++-- mindspore/dataset/core/config.py | 3 +++ 8 files changed, 23 insertions(+), 15 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc index bdaec513f0..db7fa79a47 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc @@ -444,7 +444,7 @@ Status SaveToDisk::TransformTensor(const unsigned char *src, const TensorShape & #endif TreeGetters::TreeGetters() : dataset_size_(-1), init_flag_(false), first_row_obtained_(false) { - tree_adapter_ = std::make_unique(); + tree_adapter_ = std::make_unique(TreeAdapter::UsageFlag::kDeGetter); } Status TreeGetters::Init(std::shared_ptr d) { @@ -570,7 +570,7 @@ Status DatasetSizeGetter::Init(std::shared_ptr d) { return Status::OK(); } Status DatasetSizeGetter::DryRun(std::shared_ptr ir_node, int64_t *dataset_size) { - std::shared_ptr tree_adapter = std::make_shared(); + std::shared_ptr tree_adapter = std::make_shared(TreeAdapter::UsageFlag::kDeGetter); tree_adapters_.push_back(tree_adapter); tree_adapter->SetPrePassOverride([](OptPass pre) { pre.push_back( diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h index 8e0f46a240..ab78eec40a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.h @@ -228,7 +228,7 @@ class DatasetSizeGetter : public TreeConsumer, public std::enable_shared_from_th private: std::shared_ptr root_; - std::vector> tree_adapters_; + std::vector> tree_adapters_; // this is vector to handle different branch of zip int64_t dataset_size_; }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/opt/post/auto_worker_pass.cc b/mindspore/ccsrc/minddata/dataset/engine/opt/post/auto_worker_pass.cc index 5a97faee8f..aedcf4400a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/opt/post/auto_worker_pass.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/opt/post/auto_worker_pass.cc @@ -48,8 +48,8 @@ Status AutoWorkerPass::RunOnTree(std::shared_ptr root_ir, bool *mod for (const auto &p : pass.weight_profile_) max_weight = std::max(max_weight, p.second); RETURN_IF_NOT_OK(pass.Run(root_ir, modified)); if (pass.parallel_ops_.size() > 3) { - MS_LOG(WARNING) << "AutoWorkerPass at current stage is only optimized for simple network that has LeafNode, " - << "BatchNode and MapNode. User discretion is advised for usage on other complex networks."; + MS_LOG(WARNING) << "AutoNumWorker right now is only suitable for simple dataset pipelines that has at most, 1 leaf " + << "1 batch and 1 map. AutoNumWorker may not be optimal for usage on complex pipelines."; } for (auto &p : pass.parallel_ops_) { @@ -60,8 +60,11 @@ Status AutoWorkerPass::RunOnTree(std::shared_ptr root_ir, bool *mod int32_t cur_node_max = std::ceil(p.second * max_num_workers_ / max_weight); // this will ensure that num_workers will fall with the range of [1,cur_node_max] int32_t cur_node_num_worker = std::max(std::min(num_workers, cur_node_max), min_num_workers_); + + // if the num_worker to set is same as original, skip setting and printing the logs + if (cur_node_num_worker == p.first->num_workers()) continue; // log the change via warning msg so user can see what the num_worker is being set for which op - MS_LOG(WARNING) << "num_workers in " << p.first->Name() << " is auto-adjusted from " + MS_LOG(WARNING) << "AutoNumWorker enabled, num_workers in " << p.first->Name() << " is auto-adjusted from " << std::to_string(p.first->num_workers()) + " to " + std::to_string(cur_node_num_worker); p.first->SetNumWorkers(cur_node_num_worker); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc index 0e5b31ad0a..fac5f9a1f9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.cc @@ -13,10 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#include -#include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/engine/perf/monitor.h" +#include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/engine/execution_tree.h" namespace mindspore { diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h index 1e669dad71..0b592d1c6d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/monitor.h @@ -29,7 +29,6 @@ class ExecutionTree; class Monitor { public: // Monitor object constructor - explicit Monitor(ExecutionTree *tree); Monitor() = default; diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc index bfab8ea53a..3570448cfb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc @@ -29,8 +29,7 @@ namespace mindspore { namespace dataset { -TreeAdapter::TreeAdapter() { - tree_state_ = kCompileStateInit; +TreeAdapter::TreeAdapter(UsageFlag usage) : usage_(usage), tree_state_(kCompileStateInit) { optimize_ = common::GetEnv("OPTIMIZE") == "true"; } @@ -81,7 +80,8 @@ Status TreeAdapter::PostPass(std::shared_ptr ir) { MS_LOG(INFO) << "Running post pass loops."; // AutoWorkerPass should ideally precede CacheTransForm Pass to avoid complications of the setting - if (GlobalContext::config_manager()->auto_num_workers()) { + if (GlobalContext::config_manager()->auto_num_workers() && usage_ == kDeIterator) { + // skip this for getter pass actions.emplace_back(std::make_unique()); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h index 84a08cbc40..2bee9be54a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h +++ b/mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h @@ -33,7 +33,12 @@ class DatasetNode; class TreeAdapter { public: - TreeAdapter(); + // this flag is used to indicate the purpose of the creation of this tree adapter (type of the tree_consumer). + // Currently there are 3 types of consumer, Iterator, Getter and TDT/Vocab/Save ... + // To avoid premature optimization, the last type (TDT/Vocab/Save) is regarded as Iterator for now. + enum UsageFlag { kDeIterator = 0, kDeGetter = 1 }; + + explicit TreeAdapter(UsageFlag flag = kDeIterator); ~TreeAdapter() = default; @@ -92,7 +97,7 @@ class TreeAdapter { int32_t cur_connector_size_; // current connector size of root op, used for profiling int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling std::function pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare() - + UsageFlag usage_; // usage of this tree adapter (type of consumer) // State flags for the lifecycle of the tree enum CompileState { kCompileStateInit = 0, // The freshly initialized state diff --git a/mindspore/dataset/core/config.py b/mindspore/dataset/core/config.py index 28a5c2cc3e..50cc0657da 100644 --- a/mindspore/dataset/core/config.py +++ b/mindspore/dataset/core/config.py @@ -30,6 +30,7 @@ UINT32_MAX = 4294967295 _config = cde.GlobalContext.config_manager() + def _init_device_info(): """ INTERNAL USE ONLY! @@ -52,6 +53,7 @@ def _init_device_info(): rank_id = cuda_id _config.set_rank_id(rank_id) + def set_seed(seed): """ Set the seed to be used in any random generator. This is used to produce deterministic results. @@ -149,6 +151,7 @@ def set_num_parallel_workers(num): def get_num_parallel_workers(): """ Get the default number of parallel workers. + This is the DEFAULT num_parallel_workers value used for each op, it is not related to AutoNumWorker feature. Returns: Int, number of parallel workers to be used as a default for each operation