Fixed connector_size and monitor.

pull/4950/head
anthonyaje 5 years ago
parent 45757e382c
commit 2d08d83247

@ -32,8 +32,8 @@
namespace mindspore {
namespace dataset {
DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, bool send_epoch_end)
: PipelineOp(op_connector_size),
bool send_epoch_end)
: PipelineOp(1),
channel_name_(channel_name),
device_type_(device_type),
device_id_(device_id),
@ -55,10 +55,7 @@ DeviceQueueOp::Builder::Builder(int32_t prefetch_size)
: builder_prefetch_size_(prefetch_size),
builder_device_id_(0),
builder_device_type_(DeviceType::CPU),
builder_channel_name_("") {
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
builder_op_connector_size_ = cfg->op_connector_size();
}
builder_channel_name_("") {}
Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
state_ = OpState::kDeOpIdle;

@ -65,11 +65,6 @@ class DeviceQueueOp : public PipelineOp {
return *this;
}
Builder &SetOpConnectorSize(int32_t op_connector_size) {
builder_op_connector_size_ = op_connector_size;
return *this;
}
Builder &SetDeviceType(const std::string &device_type) {
if (device_type == "Ascend") {
builder_device_type_ = DeviceType::Ascend;
@ -96,9 +91,8 @@ class DeviceQueueOp : public PipelineOp {
// to call this Build() method. It will instantiate the DeviceQueueOp
// and return it to caller as a shared pointer.
Status Build(std::shared_ptr<DeviceQueueOp> *ptr) {
*ptr =
std::make_shared<DeviceQueueOp>(builder_channel_name_, builder_device_type_, builder_device_id_,
builder_prefetch_size_, builder_op_connector_size_, builder_send_epoch_end_);
*ptr = std::make_shared<DeviceQueueOp>(builder_channel_name_, builder_device_type_, builder_device_id_,
builder_prefetch_size_, builder_send_epoch_end_);
return Status::OK();
}
@ -107,19 +101,22 @@ class DeviceQueueOp : public PipelineOp {
int32_t builder_device_id_;
DeviceType builder_device_type_;
std::string builder_channel_name_;
int32_t builder_op_connector_size_;
bool builder_send_epoch_end_;
};
// Name: constructor
// Description
DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, bool send_epoch_end);
bool send_epoch_end);
// Name: destructor
// Description
~DeviceQueueOp();
/// \brief Getter function
/// \return connector size of current op
int32_t ConnectorSize() const { return ChildOpConnectorSize(); }
Status EoeReceived(int32_t worker_id) override;
const int32_t get_prefetch_size() { return prefetch_size_; }

@ -37,7 +37,6 @@ ExecutionTree::ExecutionTree() : id_count_(0) {
tg_ = std::make_unique<TaskGroup>();
tree_state_ = kDeTStateInit;
prepare_flags_ = kDePrepNone;
perf_monitor_ = std::make_unique<Monitor>(this);
profiling_manager_ = std::make_unique<ProfilingManager>(this);
optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false;
}
@ -139,7 +138,7 @@ Status ExecutionTree::Launch() {
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->Initialize());
// Launch Monitor Thread
RETURN_IF_NOT_OK(tg_->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
RETURN_IF_NOT_OK(profiling_manager_->LaunchMonitor());
}
MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str();

@ -30,7 +30,6 @@ namespace dataset {
// Forward declares
class TaskGroup;
class DatasetOp;
class Monitor;
class ExecutionTree {
public:
@ -269,7 +268,6 @@ class ExecutionTree {
uint32_t prepare_flags_; // Flags used during tree prepare
TreeState tree_state_; // Tracking the current tree state
int32_t num_epochs_; // Total number of epochs to run for this tree
std::unique_ptr<Monitor> perf_monitor_; // Performance Monitor
std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager
bool optimize_; // Flag to enable optional optimizations
};

@ -62,24 +62,44 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
}
// Save profiling data to file
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
Status ConnectorSize::SaveToFile() {
std::ofstream os(file_path_, std::ios::trunc);
uint32_t idx = 0;
Path path = Path(file_path_);
json output;
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
output["sampling_interval"] = cfg->monitor_sampling_interval();
if (path.Exists()) {
MS_LOG(DEBUG) << file_path_ << " exists";
std::ifstream file(file_path_);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}
uint32_t idx = 0;
// Traverse the ExecutionTree for JSON node generation
for (auto &node : *tree_) {
std::vector<int32_t> cur_queue_size;
std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
if (!path.Exists()) {
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
} else {
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
auto &ops_data = output["op_info"];
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity();
}
}
idx++;
}
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}
Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();

@ -20,6 +20,7 @@
#include <memory>
#include <string>
#include <nlohmann/json.hpp>
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/engine/perf/connector_throughput.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/path.h"
@ -75,8 +76,11 @@ json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<d
json_node["op_type"] = node.Name();
json_node["num_workers"] = node.num_workers();
json metrics;
metrics["output_queue"] = {{"throughput", thr}};
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
// So we should not output its connector throughput.
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
metrics["output_queue"] = {{"throughput", thr}};
}
json_node["metrics"] = metrics;
if (!children_id.empty()) {
json_node["children"] = children_id;
@ -86,10 +90,18 @@ json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<d
}
// Save profiling data to file
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
Status ConnectorThroughput::SaveToFile() {
std::ofstream os(file_path_);
Path path = Path(file_path_);
json output;
output["sampling_interval"] = 10;
if (path.Exists()) {
MS_LOG(DEBUG) << file_path_ << " exists";
std::ifstream file(file_path_);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}
// Traverse the ExecutionTree for JSON node generation
int col = 0;
for (auto &node : *tree_) {
@ -97,15 +109,27 @@ Status ConnectorThroughput::SaveToFile() {
for (auto i = 0; i < throughput_.size(); i++) {
throughput.push_back(throughput_[col][i]);
}
json json_node = ParseOpInfo(node, throughput);
output["op_info"].push_back(json_node);
if (!path.Exists()) {
json json_node = ParseOpInfo(node, throughput);
output["op_info"].push_back(json_node);
} else {
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
auto &ops_data = output["op_info"];
ops_data[col]["metrics"]["output_queue"]["throughput"] = throughput;
}
}
col++;
}
// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}
Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString();
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();
}
} // namespace dataset

@ -29,6 +29,11 @@
namespace mindspore {
namespace dataset {
// Constructor
ProfilingManager::ProfilingManager(ExecutionTree *tree) : tree_(tree) {
perf_monitor_ = std::make_unique<Monitor>(tree_);
}
bool ProfilingManager::IsProfilingEnable() const {
auto profiling = common::GetEnv("PROFILING_MODE");
if (profiling.empty() || profiling != "true") {
@ -68,6 +73,7 @@ Status ProfilingManager::Initialize() {
// device_queue node is used for graph mode
std::shared_ptr<Tracing> device_queue_tracing = std::make_shared<DeviceQueueTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing));
// dataset_iterator node is used for graph mode
std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));
@ -77,6 +83,13 @@ Status ProfilingManager::Initialize() {
std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));
return Status::OK();
}
// Launch monitoring thread.
Status ProfilingManager::LaunchMonitor() {
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
return Status::OK();
}

@ -78,7 +78,7 @@ class Tracing : public Profiling {
// 4) Manage profiling data serialization process
class ProfilingManager {
public:
explicit ProfilingManager(ExecutionTree *tree) : tree_(tree) {}
explicit ProfilingManager(ExecutionTree *tree);
~ProfilingManager() = default;
@ -105,7 +105,11 @@ class ProfilingManager {
const std::unordered_map<std::string, std::shared_ptr<Sampling>> &GetSamplingNodes() { return sampling_nodes_; }
// Launch monitoring thread.
Status LaunchMonitor();
private:
std::unique_ptr<Monitor> perf_monitor_;
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;
std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;
@ -138,7 +142,6 @@ class ProfilingTime {
public:
static int64_t GetCurMilliSecond();
};
} // namespace dataset
} // namespace mindspore
#endif

@ -15,6 +15,7 @@
"""
Testing profiling support in DE
"""
import json
import os
import numpy as np
import mindspore.dataset as ds
@ -23,8 +24,7 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json"
PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json"
PIPELINE_FILE = "./pipeline_profiling_1.json"
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"
@ -44,10 +44,8 @@ def test_profiling_simple_pipeline():
for _ in data1:
pass
assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
@ -57,7 +55,7 @@ def test_profiling_simple_pipeline():
def test_profiling_complex_pipeline():
"""
Generator -> Map ->
-> Zip -> Batch
-> Zip
TFReader -> Shuffle ->
"""
os.environ['PROFILING_MODE'] = 'true'
@ -77,10 +75,17 @@ def test_profiling_complex_pipeline():
for _ in data3:
pass
assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
with open(PIPELINE_FILE) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
for i in range(5):
assert "size" in op_info[i]["metrics"]["output_queue"]
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]
assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
@ -108,10 +113,8 @@ def test_profiling_sampling_iterval():
for _ in data1:
pass
assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)

Loading…
Cancel
Save