From 2d08d8324764ed61e061ab3e997e40009a8711fd Mon Sep 17 00:00:00 2001 From: anthonyaje Date: Fri, 21 Aug 2020 13:02:53 -0400 Subject: [PATCH] Fixed connector_size and monitor. --- .../engine/datasetops/device_queue_op.cc | 9 ++--- .../engine/datasetops/device_queue_op.h | 17 ++++----- .../minddata/dataset/engine/execution_tree.cc | 3 +- .../minddata/dataset/engine/execution_tree.h | 2 - .../dataset/engine/perf/connector_size.cc | 32 +++++++++++++--- .../engine/perf/connector_throughput.cc | 38 +++++++++++++++---- .../minddata/dataset/engine/perf/profiling.cc | 13 +++++++ .../minddata/dataset/engine/perf/profiling.h | 7 +++- tests/ut/python/dataset/test_profiling.py | 33 ++++++++-------- 9 files changed, 104 insertions(+), 50 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 13f2a12338..4f54f89e3a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -32,8 +32,8 @@ namespace mindspore { namespace dataset { DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, - int32_t op_connector_size, bool send_epoch_end) - : PipelineOp(op_connector_size), + bool send_epoch_end) + : PipelineOp(1), channel_name_(channel_name), device_type_(device_type), device_id_(device_id), @@ -55,10 +55,7 @@ DeviceQueueOp::Builder::Builder(int32_t prefetch_size) : builder_prefetch_size_(prefetch_size), builder_device_id_(0), builder_device_type_(DeviceType::CPU), - builder_channel_name_("") { - std::shared_ptr cfg = GlobalContext::config_manager(); - builder_op_connector_size_ = cfg->op_connector_size(); -} + builder_channel_name_("") {} Status DeviceQueueOp::EoeReceived(int32_t worker_id) { state_ = OpState::kDeOpIdle; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index b697bae425..99feb4ea0e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -65,11 +65,6 @@ class DeviceQueueOp : public PipelineOp { return *this; } - Builder &SetOpConnectorSize(int32_t op_connector_size) { - builder_op_connector_size_ = op_connector_size; - return *this; - } - Builder &SetDeviceType(const std::string &device_type) { if (device_type == "Ascend") { builder_device_type_ = DeviceType::Ascend; @@ -96,9 +91,8 @@ class DeviceQueueOp : public PipelineOp { // to call this Build() method. It will instantiate the DeviceQueueOp // and return it to caller as a shared pointer. Status Build(std::shared_ptr *ptr) { - *ptr = - std::make_shared(builder_channel_name_, builder_device_type_, builder_device_id_, - builder_prefetch_size_, builder_op_connector_size_, builder_send_epoch_end_); + *ptr = std::make_shared(builder_channel_name_, builder_device_type_, builder_device_id_, + builder_prefetch_size_, builder_send_epoch_end_); return Status::OK(); } @@ -107,19 +101,22 @@ class DeviceQueueOp : public PipelineOp { int32_t builder_device_id_; DeviceType builder_device_type_; std::string builder_channel_name_; - int32_t builder_op_connector_size_; bool builder_send_epoch_end_; }; // Name: constructor // Description DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, - int32_t op_connector_size, bool send_epoch_end); + bool send_epoch_end); // Name: destructor // Description ~DeviceQueueOp(); + /// \brief Getter function + /// \return connector size of current op + int32_t ConnectorSize() const { return ChildOpConnectorSize(); } + Status EoeReceived(int32_t worker_id) override; const int32_t get_prefetch_size() { return prefetch_size_; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index 7012d85dd3..6aefe23986 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -37,7 +37,6 @@ ExecutionTree::ExecutionTree() : id_count_(0) { tg_ = std::make_unique(); tree_state_ = kDeTStateInit; prepare_flags_ = kDePrepNone; - perf_monitor_ = std::make_unique(this); profiling_manager_ = std::make_unique(this); optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false; } @@ -139,7 +138,7 @@ Status ExecutionTree::Launch() { // Setup profiling manager RETURN_IF_NOT_OK(profiling_manager_->Initialize()); // Launch Monitor Thread - RETURN_IF_NOT_OK(tg_->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_))); + RETURN_IF_NOT_OK(profiling_manager_->LaunchMonitor()); } MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index b0e372595a..0525395c05 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -30,7 +30,6 @@ namespace dataset { // Forward declares class TaskGroup; class DatasetOp; -class Monitor; class ExecutionTree { public: @@ -269,7 +268,6 @@ class ExecutionTree { uint32_t prepare_flags_; // Flags used during tree prepare TreeState tree_state_; // Tracking the current tree state int32_t num_epochs_; // Total number of epochs to run for this tree - std::unique_ptr perf_monitor_; // Performance Monitor std::unique_ptr profiling_manager_; // Profiling manager bool optimize_; // Flag to enable optional optimizations }; diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc index 20b4908030..2b7d001a77 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc @@ -62,24 +62,44 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector cfg = GlobalContext::config_manager(); - output["sampling_interval"] = cfg->monitor_sampling_interval(); + if (path.Exists()) { + MS_LOG(DEBUG) << file_path_ << " exists"; + std::ifstream file(file_path_); + file >> output; + } else { + output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); + } + + uint32_t idx = 0; // Traverse the ExecutionTree for JSON node generation for (auto &node : *tree_) { std::vector cur_queue_size; std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size), [&](const ConnectorSizeSample &sample) { return sample[idx]; }); - json json_node = ParseOpInfo(node, cur_queue_size); - output["op_info"].push_back(json_node); + if (!path.Exists()) { + json json_node = ParseOpInfo(node, cur_queue_size); + output["op_info"].push_back(json_node); + } else { + if (!node.inlined() && node.Name() != "DeviceQueueOp") { + auto &ops_data = output["op_info"]; + ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size; + ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity(); + } + } + idx++; } + + // Discard the content of the file when opening. + std::ofstream os(file_path_, std::ios::trunc); os << output; return Status::OK(); } + Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) { file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc index 693fb2d65d..7b6f9682bb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc @@ -20,6 +20,7 @@ #include #include #include +#include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/engine/perf/connector_throughput.h" #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/path.h" @@ -75,8 +76,11 @@ json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector> output; + } else { + output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval(); + } + // Traverse the ExecutionTree for JSON node generation int col = 0; for (auto &node : *tree_) { @@ -97,15 +109,27 @@ Status ConnectorThroughput::SaveToFile() { for (auto i = 0; i < throughput_.size(); i++) { throughput.push_back(throughput_[col][i]); } - json json_node = ParseOpInfo(node, throughput); - output["op_info"].push_back(json_node); + + if (!path.Exists()) { + json json_node = ParseOpInfo(node, throughput); + output["op_info"].push_back(json_node); + } else { + if (!node.inlined() && node.Name() != "DeviceQueueOp") { + auto &ops_data = output["op_info"]; + ops_data[col]["metrics"]["output_queue"]["throughput"] = throughput; + } + } col++; } + + // Discard the content of the file when opening. + std::ofstream os(file_path_, std::ios::trunc); os << output; return Status::OK(); } + Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) { - file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString(); + file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); return Status::OK(); } } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc index 4fdc6174a3..82a4b6818c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc @@ -29,6 +29,11 @@ namespace mindspore { namespace dataset { +// Constructor +ProfilingManager::ProfilingManager(ExecutionTree *tree) : tree_(tree) { + perf_monitor_ = std::make_unique(tree_); +} + bool ProfilingManager::IsProfilingEnable() const { auto profiling = common::GetEnv("PROFILING_MODE"); if (profiling.empty() || profiling != "true") { @@ -68,6 +73,7 @@ Status ProfilingManager::Initialize() { // device_queue node is used for graph mode std::shared_ptr device_queue_tracing = std::make_shared(); RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing)); + // dataset_iterator node is used for graph mode std::shared_ptr dataset_iterator_tracing = std::make_shared(); RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing)); @@ -77,6 +83,13 @@ Status ProfilingManager::Initialize() { std::shared_ptr connector_thr_sampling = std::make_shared(tree_); RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling)); + + return Status::OK(); +} + +// Launch monitoring thread. +Status ProfilingManager::LaunchMonitor() { + RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_))); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h index b6f0ad2ab7..41564df33b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h +++ b/mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h @@ -78,7 +78,7 @@ class Tracing : public Profiling { // 4) Manage profiling data serialization process class ProfilingManager { public: - explicit ProfilingManager(ExecutionTree *tree) : tree_(tree) {} + explicit ProfilingManager(ExecutionTree *tree); ~ProfilingManager() = default; @@ -105,7 +105,11 @@ class ProfilingManager { const std::unordered_map> &GetSamplingNodes() { return sampling_nodes_; } + // Launch monitoring thread. + Status LaunchMonitor(); + private: + std::unique_ptr perf_monitor_; std::unordered_map> tracing_nodes_; std::unordered_map> sampling_nodes_; @@ -138,7 +142,6 @@ class ProfilingTime { public: static int64_t GetCurMilliSecond(); }; - } // namespace dataset } // namespace mindspore #endif diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py index 486322cd4a..e013617370 100644 --- a/tests/ut/python/dataset/test_profiling.py +++ b/tests/ut/python/dataset/test_profiling.py @@ -15,6 +15,7 @@ """ Testing profiling support in DE """ +import json import os import numpy as np import mindspore.dataset as ds @@ -23,8 +24,7 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"] DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" -PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json" -PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json" +PIPELINE_FILE = "./pipeline_profiling_1.json" DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt" @@ -44,10 +44,8 @@ def test_profiling_simple_pipeline(): for _ in data1: pass - assert os.path.exists(PIPELINE_FILE_SIZE) is True - os.remove(PIPELINE_FILE_SIZE) - assert os.path.exists(PIPELINE_FILE_THR) is True - os.remove(PIPELINE_FILE_THR) + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE) del os.environ['PROFILING_MODE'] @@ -57,7 +55,7 @@ def test_profiling_simple_pipeline(): def test_profiling_complex_pipeline(): """ Generator -> Map -> - -> Zip -> Batch + -> Zip TFReader -> Shuffle -> """ os.environ['PROFILING_MODE'] = 'true' @@ -77,10 +75,17 @@ def test_profiling_complex_pipeline(): for _ in data3: pass - assert os.path.exists(PIPELINE_FILE_SIZE) is True - os.remove(PIPELINE_FILE_SIZE) - assert os.path.exists(PIPELINE_FILE_THR) is True - os.remove(PIPELINE_FILE_THR) + with open(PIPELINE_FILE) as f: + data = json.load(f) + op_info = data["op_info"] + assert len(op_info) == 5 + for i in range(5): + assert "size" in op_info[i]["metrics"]["output_queue"] + assert "length" in op_info[i]["metrics"]["output_queue"] + assert "throughput" in op_info[i]["metrics"]["output_queue"] + + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE) del os.environ['PROFILING_MODE'] @@ -108,10 +113,8 @@ def test_profiling_sampling_iterval(): for _ in data1: pass - assert os.path.exists(PIPELINE_FILE_SIZE) is True - os.remove(PIPELINE_FILE_SIZE) - assert os.path.exists(PIPELINE_FILE_THR) is True - os.remove(PIPELINE_FILE_THR) + assert os.path.exists(PIPELINE_FILE) is True + os.remove(PIPELINE_FILE) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE)