!1735 profiling for minddata and tdt

Merge pull request !1735 from yanghaitao/yht_profiling
pull/1735/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit 09cf02c5b1

@ -152,6 +152,23 @@ class Connector {
return out; return out;
} }
// Get current size of connector.
int32_t size() const {
int32_t size = 0;
for (int32_t i = 0; i < queues_.size(); ++i) {
size += queues_[i]->size();
}
return size;
}
int32_t capacity() const {
int32_t capacity = 0;
for (int32_t i = 0; i < queues_.size(); ++i) {
capacity += queues_[i]->capacity();
}
return capacity;
}
// Register the internal resources with Task group for interruption service. // Register the internal resources with Task group for interruption service.
// @param vg // @param vg
// @return // @return

@ -211,6 +211,22 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @return - the column name map as a string // @return - the column name map as a string
std::string ColumnNameMapAsString() const; std::string ColumnNameMapAsString() const;
// Getter function
// @return connector size of current op
virtual int32_t ConnectorSize() const { return out_connector_->size(); }
// Getter function
// @return connector size of current op
virtual int32_t ConnectorCapacity() const { return out_connector_->capacity(); }
// Getter function
// @return connector size of child op
int32_t ChildOpConnectorSize(int32_t child_index = 0) const { return child_[child_index]->ConnectorSize(); }
// Getter function
// @return connector capacity of child op
int32_t ChildOpConnectorCapacity(int32_t child_index = 0) const { return child_[child_index]->ConnectorCapacity(); }
// Children Getter // Children Getter
// @return Vector or Children // @return Vector or Children
std::vector<std::shared_ptr<DatasetOp>> Children() const { return child_; } std::vector<std::shared_ptr<DatasetOp>> Children() const { return child_; }

@ -25,9 +25,13 @@
#include "dataset/util/status.h" #include "dataset/util/status.h"
#include "dataset/util/task_manager.h" #include "dataset/util/task_manager.h"
#include "dataset/engine/opt/pass.h" #include "dataset/engine/opt/pass.h"
#include "dataset/util/profiling.h"
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
#define DEVICE_QUEUE_PROFILING_DATA(type, subtype, batch_num, value) \
std::to_string(type) + " " + std::to_string(subtype) + " " + std::to_string(batch_num) + " " + std::to_string(value)
DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, int64_t num_batch) int32_t op_connector_size, int64_t num_batch)
: PipelineOp(op_connector_size), : PipelineOp(op_connector_size),
@ -97,7 +101,25 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend."; MS_LOG(INFO) << "Device queue, sending data to Ascend.";
int64_t total_batch = 0; int64_t total_batch = 0;
bool is_break_loop = false; bool is_break_loop = false;
double batch_start_time, tdt_start_time, end_time;
int32_t batch_cost, tdt_cost;
int32_t connector_size = 0;
int32_t connector_capacity;
std::shared_ptr<Profiling> profiling_node;
bool isProfilingEnable = ProfilingManager::GetInstance().IsProfilingEnable();
if (isProfilingEnable) {
std::string file_name = "critical_point_profiling";
// Here can determine performance bottleneck is in pipeline or in tdt.
// Context format of this file "type subtype batchnum value"
// type:0: time, 1: queue depth
// subtype:0: pipeline time, 1: push tdt time, 2: all time
// batchnum: batch number
// value: value of time(ms) or queue depth
profiling_node = std::make_shared<Profiling>(file_name, device_id_);
RETURN_IF_NOT_OK(ProfilingManager::GetInstance().RegisterProfilingNode(&profiling_node));
batch_start_time = ProfilingTime::GetCurMilliSecond();
connector_capacity = ChildOpConnectorCapacity();
}
std::unique_ptr<DataBuffer> current_buffer; std::unique_ptr<DataBuffer> current_buffer;
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
@ -107,20 +129,51 @@ Status DeviceQueueOp::SendDataToAscend() {
TensorRow currRow; TensorRow currRow;
for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop; row_id++) { for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop; row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow));
if (isProfilingEnable) {
tdt_start_time = ProfilingTime::GetCurMilliSecond();
}
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_); auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_);
if (status == TdtStatus::FAILED) { if (status == TdtStatus::FAILED) {
return Status(StatusCode::kTDTPushFailure, "TDT Push Failed"); return Status(StatusCode::kTDTPushFailure, "TDT Push Failed");
} }
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
tdt_cost = (int32_t)(end_time - tdt_start_time);
// record push tdt time
profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost));
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, BATCH_TIME, total_batch + 1, batch_cost));
// record pipeline time
profiling_node->Record(
DEVICE_QUEUE_PROFILING_DATA(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost));
batch_start_time = end_time;
// record connector depth
profiling_node->Record(
DEVICE_QUEUE_PROFILING_DATA(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size));
}
total_batch++; total_batch++;
if (num_batch_ > 0 && total_batch == num_batch_) { if (num_batch_ > 0 && total_batch == num_batch_) {
is_break_loop = true; is_break_loop = true;
} }
} }
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
}
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
} }
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
}
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
} }
if (isProfilingEnable) {
profiling_node->SaveToFile();
}
MS_LOG(INFO) << "Device queue total batch is " << total_batch << ", number of batches is " << num_batch_ << "."; MS_LOG(INFO) << "Device queue total batch is " << total_batch << ", number of batches is " << num_batch_ << ".";
return Status::OK(); return Status::OK();

@ -124,6 +124,10 @@ class RepeatOp : public PipelineOp {
// @return - Status of the node visit. // @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override; Status Accept(NodePass *p, bool *modified) override;
virtual int32_t ConnectorSize() const { return child_[0]->ConnectorSize(); }
virtual int32_t ConnectorCapacity() const { return child_[0]->ConnectorCapacity(); }
private: private:
int32_t max_repeats_; // The number of repeats that the user requested int32_t max_repeats_; // The number of repeats that the user requested
int32_t repeat_count_; // A counter for the current number of executed repeats int32_t repeat_count_; // A counter for the current number of executed repeats

@ -19,6 +19,7 @@
#include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/datasetops/dataset_op.h"
#include "dataset/engine/datasetops/shuffle_op.h" #include "dataset/engine/datasetops/shuffle_op.h"
#include "dataset/util/task_manager.h" #include "dataset/util/task_manager.h"
#include "dataset/util/profiling.h"
#include "dataset/engine/opt/util/printer_pass.h" #include "dataset/engine/opt/util/printer_pass.h"

@ -14,4 +14,5 @@ add_library(utils OBJECT
status.cc status.cc
path.cc path.cc
wait_post.cc wait_post.cc
sig_handler.cc) sig_handler.cc
profiling.cc)

@ -0,0 +1,112 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/profiling.h"
#include <sys/time.h>
#include <cstdlib>
#include <fstream>
#include "dataset/util/path.h"
#include "common/utils.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace dataset {
Profiling::Profiling(const std::string &file_name, const int32_t device_id)
: file_name_(file_name), device_id_(device_id) {}
Status Profiling::Init() {
std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR");
if (dir.empty()) {
RETURN_STATUS_UNEXPECTED("Profiling dir is not set.");
}
char real_path[PATH_MAX] = {0};
if (dir.size() >= PATH_MAX) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#else
if (realpath(common::SafeCStr(dir), real_path) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#endif
file_path_ = (Path(real_path) / Path(file_name_ + "_" + std::to_string(device_id_) + ".txt")).toString();
return Status::OK();
}
Status Profiling::Record(const std::string &data) {
value_.emplace_back(data);
return Status::OK();
}
Status Profiling::SaveToFile() {
if (file_name_.empty()) {
RETURN_STATUS_UNEXPECTED("Profiling file name has not been set.");
}
std::ofstream handle(file_path_, std::ios::app);
if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
}
for (auto value : value_) {
handle << value << "\n";
}
handle.close();
return Status::OK();
}
ProfilingManager &ProfilingManager::GetInstance() {
static ProfilingManager instance;
return instance;
}
bool ProfilingManager::IsProfilingEnable() const {
auto profiling = common::GetEnv("PROFILING_MODE");
if (profiling.empty() || profiling != "true") {
return false;
}
return true;
}
Status ProfilingManager::RegisterProfilingNode(std::shared_ptr<Profiling> *node) {
RETURN_IF_NOT_OK((*node)->Init());
profiling_node_.emplace_back(*node);
return Status::OK();
}
Status ProfilingManager::SaveProfilingData() {
if (!IsProfilingEnable()) {
return Status::OK();
}
MS_LOG(INFO) << "Start to save profile data.";
for (auto node : profiling_node_) {
RETURN_IF_NOT_OK(node->SaveToFile());
}
MS_LOG(INFO) << "Save profile data end.";
return Status::OK();
}
double ProfilingTime::GetCurMilliSecond() {
struct timeval tv = {0, 0};
(void)gettimeofday(&tv, nullptr);
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,92 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_PROFILE_H_
#define DATASET_UTIL_PROFILE_H_
#include <string>
#include <vector>
#include <memory>
#include "dataset/util/status.h"
namespace mindspore {
namespace dataset {
enum ProfilingType {
TIME,
CONNECTOR_DEPTH,
};
enum ProfilingTimeSubType {
PIPELINE_TIME,
TDT_PUSH_TIME,
BATCH_TIME,
INVALID_TIME,
};
class Profiling {
public:
// Constructor
Profiling() = default;
// Constructor if need save profile data to file
Profiling(const std::string &file_name, const int32_t device_id);
// Destructor
~Profiling() = default;
Status Init();
// Record profile data
Status Record(const std::string &data);
// Save profile data to file if necessary
Status SaveToFile();
private:
std::vector<std::string> value_;
std::string file_name_;
std::string file_path_;
int32_t device_id_;
};
class ProfilingManager {
public:
ProfilingManager() = default;
~ProfilingManager() = default;
static ProfilingManager &GetInstance();
// Save profile data to file
// @return Status - The error code return
Status SaveProfilingData();
// Register profile node to tree
// @param node - Profiling node
// @return Status - The error code return
Status RegisterProfilingNode(std::shared_ptr<Profiling> *node);
bool IsProfilingEnable() const;
private:
std::vector<std::shared_ptr<Profiling>> profiling_node_;
};
class ProfilingTime {
public:
static double GetCurMilliSecond();
};
} // namespace dataset
} // namespace mindspore
#endif

@ -230,6 +230,8 @@ class QueueList {
std::unique_ptr<Queue<T>> &operator[](const int index) { return queue_list_[index]; } std::unique_ptr<Queue<T>> &operator[](const int index) { return queue_list_[index]; }
const std::unique_ptr<Queue<T>> &operator[](const int index) const { return queue_list_[index]; }
~QueueList() = default; ~QueueList() = default;
private: private:

Loading…
Cancel
Save