diff --git a/mindspore/ccsrc/dataset/engine/connector.h b/mindspore/ccsrc/dataset/engine/connector.h index 085b790ec5..cdce592c1b 100644 --- a/mindspore/ccsrc/dataset/engine/connector.h +++ b/mindspore/ccsrc/dataset/engine/connector.h @@ -152,6 +152,23 @@ class Connector { return out; } + // Get current size of connector. + int32_t size() const { + int32_t size = 0; + for (int32_t i = 0; i < queues_.size(); ++i) { + size += queues_[i]->size(); + } + return size; + } + + int32_t capacity() const { + int32_t capacity = 0; + for (int32_t i = 0; i < queues_.size(); ++i) { + capacity += queues_[i]->capacity(); + } + return capacity; + } + // Register the internal resources with Task group for interruption service. // @param vg // @return diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h index 315dc27219..29b59ba2f7 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h @@ -211,6 +211,22 @@ class DatasetOp : public std::enable_shared_from_this { // @return - the column name map as a string std::string ColumnNameMapAsString() const; + // Getter function + // @return connector size of current op + virtual int32_t ConnectorSize() const { return out_connector_->size(); } + + // Getter function + // @return connector size of current op + virtual int32_t ConnectorCapacity() const { return out_connector_->capacity(); } + + // Getter function + // @return connector size of child op + int32_t ChildOpConnectorSize(int32_t child_index = 0) const { return child_[child_index]->ConnectorSize(); } + + // Getter function + // @return connector capacity of child op + int32_t ChildOpConnectorCapacity(int32_t child_index = 0) const { return child_[child_index]->ConnectorCapacity(); } + // Children Getter // @return Vector or Children std::vector> Children() const { return child_; } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc index bcdb58db24..3857accbef 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc @@ -25,9 +25,13 @@ #include "dataset/util/status.h" #include "dataset/util/task_manager.h" #include "dataset/engine/opt/pass.h" +#include "dataset/util/profiling.h" namespace mindspore { namespace dataset { +#define DEVICE_QUEUE_PROFILING_DATA(type, subtype, batch_num, value) \ + std::to_string(type) + " " + std::to_string(subtype) + " " + std::to_string(batch_num) + " " + std::to_string(value) + DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, int32_t op_connector_size, int64_t num_batch) : PipelineOp(op_connector_size), @@ -97,7 +101,25 @@ Status DeviceQueueOp::SendDataToAscend() { MS_LOG(INFO) << "Device queue, sending data to Ascend."; int64_t total_batch = 0; bool is_break_loop = false; - + double batch_start_time, tdt_start_time, end_time; + int32_t batch_cost, tdt_cost; + int32_t connector_size = 0; + int32_t connector_capacity; + std::shared_ptr profiling_node; + bool isProfilingEnable = ProfilingManager::GetInstance().IsProfilingEnable(); + if (isProfilingEnable) { + std::string file_name = "critical_point_profiling"; + // Here can determine performance bottleneck is in pipeline or in tdt. + // Context format of this file "type subtype batchnum value" + // type:0: time, 1: queue depth + // subtype:0: pipeline time, 1: push tdt time, 2: all time + // batchnum: batch number + // value: value of time(ms) or queue depth + profiling_node = std::make_shared(file_name, device_id_); + RETURN_IF_NOT_OK(ProfilingManager::GetInstance().RegisterProfilingNode(&profiling_node)); + batch_start_time = ProfilingTime::GetCurMilliSecond(); + connector_capacity = ChildOpConnectorCapacity(); + } std::unique_ptr current_buffer; RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); @@ -107,20 +129,51 @@ Status DeviceQueueOp::SendDataToAscend() { TensorRow currRow; for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop; row_id++) { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); + if (isProfilingEnable) { + tdt_start_time = ProfilingTime::GetCurMilliSecond(); + } auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_); if (status == TdtStatus::FAILED) { return Status(StatusCode::kTDTPushFailure, "TDT Push Failed"); } + + if (isProfilingEnable) { + end_time = ProfilingTime::GetCurMilliSecond(); + tdt_cost = (int32_t)(end_time - tdt_start_time); + // record push tdt time + profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost)); + batch_cost = (int32_t)(end_time - batch_start_time); + // record batch time + profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, BATCH_TIME, total_batch + 1, batch_cost)); + // record pipeline time + profiling_node->Record( + DEVICE_QUEUE_PROFILING_DATA(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost)); + batch_start_time = end_time; + // record connector depth + profiling_node->Record( + DEVICE_QUEUE_PROFILING_DATA(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size)); + } total_batch++; if (num_batch_ > 0 && total_batch == num_batch_) { is_break_loop = true; } } + if (isProfilingEnable) { + connector_size = ChildOpConnectorSize(); + connector_capacity = ChildOpConnectorCapacity(); + } RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } + if (isProfilingEnable) { + connector_size = ChildOpConnectorSize(); + connector_capacity = ChildOpConnectorCapacity(); + } RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } + if (isProfilingEnable) { + profiling_node->SaveToFile(); + } MS_LOG(INFO) << "Device queue total batch is " << total_batch << ", number of batches is " << num_batch_ << "."; return Status::OK(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h index 718bc1922b..1bcfade0b8 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h @@ -124,6 +124,10 @@ class RepeatOp : public PipelineOp { // @return - Status of the node visit. Status Accept(NodePass *p, bool *modified) override; + virtual int32_t ConnectorSize() const { return child_[0]->ConnectorSize(); } + + virtual int32_t ConnectorCapacity() const { return child_[0]->ConnectorCapacity(); } + private: int32_t max_repeats_; // The number of repeats that the user requested int32_t repeat_count_; // A counter for the current number of executed repeats diff --git a/mindspore/ccsrc/dataset/engine/execution_tree.cc b/mindspore/ccsrc/dataset/engine/execution_tree.cc index bcb387082b..acd32b24c2 100644 --- a/mindspore/ccsrc/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/dataset/engine/execution_tree.cc @@ -19,6 +19,7 @@ #include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/datasetops/shuffle_op.h" #include "dataset/util/task_manager.h" +#include "dataset/util/profiling.h" #include "dataset/engine/opt/util/printer_pass.h" diff --git a/mindspore/ccsrc/dataset/util/CMakeLists.txt b/mindspore/ccsrc/dataset/util/CMakeLists.txt index b36d612435..6f5e37e88f 100644 --- a/mindspore/ccsrc/dataset/util/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/util/CMakeLists.txt @@ -14,4 +14,5 @@ add_library(utils OBJECT status.cc path.cc wait_post.cc - sig_handler.cc) + sig_handler.cc + profiling.cc) diff --git a/mindspore/ccsrc/dataset/util/profiling.cc b/mindspore/ccsrc/dataset/util/profiling.cc new file mode 100644 index 0000000000..8d9aad74f2 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/profiling.cc @@ -0,0 +1,112 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dataset/util/profiling.h" + +#include +#include +#include +#include "dataset/util/path.h" +#include "common/utils.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace dataset { +Profiling::Profiling(const std::string &file_name, const int32_t device_id) + : file_name_(file_name), device_id_(device_id) {} + +Status Profiling::Init() { + std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR"); + if (dir.empty()) { + RETURN_STATUS_UNEXPECTED("Profiling dir is not set."); + } + char real_path[PATH_MAX] = {0}; + if (dir.size() >= PATH_MAX) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#if defined(_WIN32) || defined(_WIN64) + if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#else + if (realpath(common::SafeCStr(dir), real_path) == nullptr) { + RETURN_STATUS_UNEXPECTED("Profiling dir is invalid."); + } +#endif + file_path_ = (Path(real_path) / Path(file_name_ + "_" + std::to_string(device_id_) + ".txt")).toString(); + return Status::OK(); +} + +Status Profiling::Record(const std::string &data) { + value_.emplace_back(data); + return Status::OK(); +} + +Status Profiling::SaveToFile() { + if (file_name_.empty()) { + RETURN_STATUS_UNEXPECTED("Profiling file name has not been set."); + } + std::ofstream handle(file_path_, std::ios::app); + if (!handle.is_open()) { + RETURN_STATUS_UNEXPECTED("Profiling file can not be opened."); + } + for (auto value : value_) { + handle << value << "\n"; + } + handle.close(); + + return Status::OK(); +} + +ProfilingManager &ProfilingManager::GetInstance() { + static ProfilingManager instance; + return instance; +} + +bool ProfilingManager::IsProfilingEnable() const { + auto profiling = common::GetEnv("PROFILING_MODE"); + if (profiling.empty() || profiling != "true") { + return false; + } + + return true; +} + +Status ProfilingManager::RegisterProfilingNode(std::shared_ptr *node) { + RETURN_IF_NOT_OK((*node)->Init()); + profiling_node_.emplace_back(*node); + return Status::OK(); +} + +Status ProfilingManager::SaveProfilingData() { + if (!IsProfilingEnable()) { + return Status::OK(); + } + MS_LOG(INFO) << "Start to save profile data."; + for (auto node : profiling_node_) { + RETURN_IF_NOT_OK(node->SaveToFile()); + } + MS_LOG(INFO) << "Save profile data end."; + + return Status::OK(); +} + +double ProfilingTime::GetCurMilliSecond() { + struct timeval tv = {0, 0}; + (void)gettimeofday(&tv, nullptr); + return tv.tv_sec * 1000 + tv.tv_usec / 1000; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/util/profiling.h b/mindspore/ccsrc/dataset/util/profiling.h new file mode 100644 index 0000000000..4de2a1e052 --- /dev/null +++ b/mindspore/ccsrc/dataset/util/profiling.h @@ -0,0 +1,92 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATASET_UTIL_PROFILE_H_ +#define DATASET_UTIL_PROFILE_H_ + +#include +#include +#include +#include "dataset/util/status.h" + +namespace mindspore { +namespace dataset { +enum ProfilingType { + TIME, + CONNECTOR_DEPTH, +}; + +enum ProfilingTimeSubType { + PIPELINE_TIME, + TDT_PUSH_TIME, + BATCH_TIME, + INVALID_TIME, +}; + +class Profiling { + public: + // Constructor + Profiling() = default; + + // Constructor if need save profile data to file + Profiling(const std::string &file_name, const int32_t device_id); + + // Destructor + ~Profiling() = default; + + Status Init(); + + // Record profile data + Status Record(const std::string &data); + + // Save profile data to file if necessary + Status SaveToFile(); + + private: + std::vector value_; + std::string file_name_; + std::string file_path_; + int32_t device_id_; +}; + +class ProfilingManager { + public: + ProfilingManager() = default; + ~ProfilingManager() = default; + + static ProfilingManager &GetInstance(); + + // Save profile data to file + // @return Status - The error code return + Status SaveProfilingData(); + + // Register profile node to tree + // @param node - Profiling node + // @return Status - The error code return + Status RegisterProfilingNode(std::shared_ptr *node); + + bool IsProfilingEnable() const; + + private: + std::vector> profiling_node_; +}; + +class ProfilingTime { + public: + static double GetCurMilliSecond(); +}; +} // namespace dataset +} // namespace mindspore +#endif diff --git a/mindspore/ccsrc/dataset/util/queue.h b/mindspore/ccsrc/dataset/util/queue.h index b97e6a5c28..9a51565861 100644 --- a/mindspore/ccsrc/dataset/util/queue.h +++ b/mindspore/ccsrc/dataset/util/queue.h @@ -230,6 +230,8 @@ class QueueList { std::unique_ptr> &operator[](const int index) { return queue_list_[index]; } + const std::unique_ptr> &operator[](const int index) const { return queue_list_[index]; } + ~QueueList() = default; private: