diff --git a/mindspore/ccsrc/dataset/engine/connector.h b/mindspore/ccsrc/dataset/engine/connector.h index cdce592c1b..acd92b4145 100644 --- a/mindspore/ccsrc/dataset/engine/connector.h +++ b/mindspore/ccsrc/dataset/engine/connector.h @@ -102,8 +102,10 @@ class Connector { RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; })); RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result)); pop_from_ = (pop_from_ + 1) % num_producers_; + out_buffers_count_++; expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; } + cv_.NotifyAll(); return Status::OK(); } @@ -119,6 +121,8 @@ class Connector { return (queues_[worker_id]->Add(el)); } + auto out_buffers_count() const { return out_buffers_count_.load(); } + // Add an element into the DbConnector without the overhead of synchronization. // It may block when the internal queue is full. // The element passed to this function will be forwarded into the internal queue. @@ -138,6 +142,7 @@ class Connector { } expect_consumer_ = 0; pop_from_ = 0; + out_buffers_count_ = 0; MS_LOG(DEBUG) << "Connector counters reset."; } @@ -198,6 +203,7 @@ class Connector { // Used in the Pop(), when a thread call pop() but it is not the expect_consumer_. std::mutex m_; CondVar cv_; + std::atomic out_buffers_count_ = 0; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h index c444004b79..370bf6a1bd 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h @@ -222,6 +222,7 @@ class DatasetOp : public std::enable_shared_from_this { // Getter function // @return connector size of current op + int32_t ConnectorSize() const { if (!inlined()) { return out_connector_->size(); @@ -230,6 +231,10 @@ class DatasetOp : public std::enable_shared_from_this { return ChildOpConnectorSize(); } + int64_t ConnectorOutBufferCount() const { + return out_connector_ == nullptr ? int64_t(-1) : static_cast(out_connector_->out_buffers_count()); + } + // Getter function // @return connector size of current op int32_t ConnectorCapacity() const { diff --git a/mindspore/ccsrc/dataset/engine/db_connector.h b/mindspore/ccsrc/dataset/engine/db_connector.h index b1fdd14ab6..54909f51ba 100644 --- a/mindspore/ccsrc/dataset/engine/db_connector.h +++ b/mindspore/ccsrc/dataset/engine/db_connector.h @@ -83,6 +83,7 @@ class DbConnector : public Connector> { expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; } } + out_buffers_count_++; cv_.NotifyAll(); return Status::OK(); } diff --git a/mindspore/ccsrc/dataset/engine/execution_tree.h b/mindspore/ccsrc/dataset/engine/execution_tree.h index e1c5e8ff54..b0391bf77b 100644 --- a/mindspore/ccsrc/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/dataset/engine/execution_tree.h @@ -88,8 +88,10 @@ class ExecutionTree { bool operator!=(const Iterator &rhs) { return nodes_[ind_] != rhs.nodes_[rhs.ind_]; } + int32_t NumNodes() { return nodes_.size(); } + private: - int ind_; // the cur node our Iterator points to + int32_t ind_; // the cur node our Iterator points to std::vector> nodes_; // store the nodes in post order void PostOrderTraverse(const std::shared_ptr &); }; diff --git a/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt b/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt index 0b67469d2d..e611add983 100644 --- a/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt +++ b/mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt @@ -3,4 +3,6 @@ add_library(engine-perf OBJECT monitor.cc device_queue_tracing.cc connector_size.cc - dataset_iterator_tracing.cc) + dataset_iterator_tracing.cc + connector_throughput.cc + ) diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_size.cc b/mindspore/ccsrc/dataset/engine/perf/connector_size.cc index 862ec51c49..0bd2754075 100644 --- a/mindspore/ccsrc/dataset/engine/perf/connector_size.cc +++ b/mindspore/ccsrc/dataset/engine/perf/connector_size.cc @@ -14,7 +14,6 @@ * limitations under the License. */ #include "dataset/engine/perf/connector_size.h" - #include #include #include diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_size.h b/mindspore/ccsrc/dataset/engine/perf/connector_size.h index 6840ffe244..2584289fb4 100644 --- a/mindspore/ccsrc/dataset/engine/perf/connector_size.h +++ b/mindspore/ccsrc/dataset/engine/perf/connector_size.h @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef MINDSPORE_QUEUE_DEPTH_H -#define MINDSPORE_QUEUE_DEPTH_H +#ifndef DATASET_CONNECTOR_SIZE_H +#define DATASET_CONNECTOR_SIZE_H #include #include @@ -50,7 +50,7 @@ class ConnectorSize : public Sampling { // This function samples the connector size of every nodes within the ExecutionTree Status Sample() override; - std::string Name() const override { return kDeviceQueueTracingName; }; + std::string Name() const override { return kConnectorSizeSamplingName; } // Save sampling data to file // @return Status - The error code return @@ -65,6 +65,8 @@ class ConnectorSize : public Sampling { ExecutionTree *tree_ = nullptr; // ExecutionTree pointer ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling }; + } // namespace dataset } // namespace mindspore -#endif // MINDSPORE_QUEUE_DEPTH_H + +#endif // DATASET_CONNECTOR_SIZE_H diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_throughput.cc b/mindspore/ccsrc/dataset/engine/perf/connector_throughput.cc new file mode 100644 index 0000000000..4fd59de390 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/connector_throughput.cc @@ -0,0 +1,109 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include "dataset/engine/perf/connector_throughput.h" +#include "dataset/engine/execution_tree.h" +#include "dataset/util/path.h" + +namespace mindspore { +namespace dataset { + +// temporary helper +int ConnectorThroughput::InitNodes() { + auto it = (*tree_).begin(); + return it.NumNodes(); +} +// Sample action +Status ConnectorThroughput::Sample() { + std::vector out_buffer_count_row(n_nodes_); + std::vector throughput_row(n_nodes_); + TimePoint cur_time; // initialised inside the loop, used outside the loop to update prev sample time. + auto col = 0; + for (const auto &node : *tree_) { + auto cur_out_buffer_count = node.ConnectorOutBufferCount(); + out_buffer_count_row[col] = cur_out_buffer_count; + auto sz = timestamps_.size(); + cur_time = std::chrono::steady_clock::now(); + auto _dt = std::chrono::duration_cast(timestamps_[0][sz - 1] - timestamps_[0][sz - 2]); + auto dt = std::chrono::duration(_dt).count(); + auto prev_out_buffer_count = out_buffer_count_table_[col][out_buffer_count_table_.size() - 1]; + if (dt != 0) { + auto thr = (cur_out_buffer_count - prev_out_buffer_count) / (1000 * dt); + throughput_row[col] = thr; + } else { + throughput_row[col] = -1; + } + col++; + } + std::vector v = {cur_time}; // temporary fix + timestamps_.AddSample(v); + // Push new row of sample + out_buffer_count_table_.AddSample(out_buffer_count_row); + throughput_.AddSample(throughput_row); + return Status::OK(); +} + +json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector &thr) { + auto children = node.Children(); + std::vector children_id; + std::transform(children.begin(), children.end(), std::back_inserter(children_id), + [](std::shared_ptr op) -> int32_t { return op->id(); }); + json json_node; + json_node["op_id"] = node.id(); + json_node["op_type"] = node.Name(); + json_node["num_workers"] = node.num_workers(); + json metrics; + metrics["output_queue"] = {{"throughput", thr}}; + + json_node["metrics"] = metrics; + if (!children_id.empty()) { + json_node["children"] = children_id; + } + + return json_node; +} + +// Save profiling data to file +Status ConnectorThroughput::SaveToFile() { + std::ofstream os(file_path_); + json output; + output["sampling_interval"] = 10; + // Traverse the ExecutionTree for JSON node generation + int col = 0; + for (auto &node : *tree_) { + std::vector throughput; + for (auto i = 0; i < throughput_.size(); i++) { + throughput.push_back(throughput_[col][i]); + } + json json_node = ParseOpInfo(node, throughput); + output["op_info"].push_back(json_node); + col++; + } + os << output; + return Status::OK(); +} +Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) { + file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString(); + return Status::OK(); +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/connector_throughput.h b/mindspore/ccsrc/dataset/engine/perf/connector_throughput.h new file mode 100644 index 0000000000..e873eb8315 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/connector_throughput.h @@ -0,0 +1,100 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATASET_CONNECTOR_THROUGHPUT_H +#define DATASET_CONNECTOR_THROUGHPUT_H + +#include +#include +#include +#include +#include +#include "dataset/engine/perf/profiling.h" +#include "dataset/engine/perf/perf_data.h" +#include "dataset/engine/perf/cyclic_array.h" +#include "dataset/engine/datasetops/dataset_op.h" + +using json = nlohmann::json; +namespace mindspore { +namespace dataset { +class ExecutionTree; + +// Connector throughput samples the output connector size of each op in the pipeline. +// For the description of the data structure see perf_buffer.h +// It support JSON serialization for external usage. +class ConnectorThroughput : public Sampling { + using OutBufferCount = PerfData>; + using Throughput = PerfData>; + using TimePoint = std::chrono::time_point; + using TimeStamps = PerfData>; + + public: + explicit ConnectorThroughput(ExecutionTree *tree, int64_t max_rows = 1000000) + : tree_(tree), + max_rows_(max_rows), + n_nodes_(InitNodes()), + out_buffer_count_table_(OutBufferCount(max_rows_, n_nodes_)), + throughput_(Throughput(max_rows_, n_nodes_)), + timestamps_(TimeStamps(max_rows_, 1)) { + timestamps_.AddSample(std::vector(1)); + out_buffer_count_table_.AddSample(std::vector(n_nodes_)); + } + // Driver function for connector size sampling. + // This function samples the connector size of every nodes within the ExecutionTree + Status Sample() override; + + /* Status TestPrint() override { + std::ofstream os("performance_monitor.txt"); + if (throughput_.size() == 0) { + os << "data is empty" << std::endl; + return Status::OK(); + } + for (int i = 0; i < throughput_.size(); i++) { + for (int j = 0; j < n_nodes_; j++) { + os << throughput_[j][i] << " "; + } + os << std::endl; + } + return Status::OK(); + };*/ + + // Traverse the tree nodes and count them + int InitNodes(); + + std::string Name() const override { return name_; }; + + // Save sampling data to file + // @return Status - The error code return + Status SaveToFile() override; + + Status Init(const std::string &dir_path, const std::string &device_id); + + json ParseOpInfo(const DatasetOp &node, const std::vector &thr); + + private: + ExecutionTree *tree_ = nullptr; // ExecutionTree pointer + int64_t max_rows_; + int32_t n_nodes_; + OutBufferCount out_buffer_count_table_; + Throughput throughput_; + TimeStamps timestamps_; + std::string name_ = kConnectorThroughputSamplingName; +}; + +} // namespace dataset +} // namespace mindspore + +#endif // DATASET_CONNECTOR_THROUGHPUT_H diff --git a/mindspore/ccsrc/dataset/engine/perf/cyclic_array.h b/mindspore/ccsrc/dataset/engine/perf/cyclic_array.h new file mode 100644 index 0000000000..fa60b401c5 --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/cyclic_array.h @@ -0,0 +1,197 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATASET_CYCLIC_ARRAY_H +#define DATASET_CYCLIC_ARRAY_H + +#include +#include +#include +#include +#include "dataset/core/constants.h" + +namespace mindspore { +namespace dataset { + +/// \class CyclicArray "include/cyclic_array.h +/// \brief This is a container with a contiguous memory layout that pnly keeps N last entries, +/// when the number of entries exceeds the capacity +/// Must be preallocated +template +class CyclicArray { + public: + using value_type = T; + class Iterator { + // Add operator[] and make fully compliant with random access iterator + // and add a const iterator + // add resize(), empty() + public: + using iterator_category = std::random_access_iterator_tag; + using value_type = CyclicArray::value_type; + using difference_type = std::ptrdiff_t; + using pointer = CyclicArray::value_type *; + using reference = CyclicArray::value_type &; + + Iterator() = default; + + Iterator(dsize_t idx, pointer ptr, dsize_t capacity, dsize_t head) + : cur_idx_(idx), ptr_(ptr), capacity_(capacity), head_(head) {} + + Iterator(const Iterator &rhs) = default; + + ~Iterator() = default; + + Iterator &operator++() { + cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1); + return *this; + } + + Iterator operator++(int) { + Iterator tmp(*this); + cur_idx_ = (cur_idx_ + 1) % (capacity_ + 1); + return tmp; + } + + Iterator &operator--() { + cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1); + return *this; + } + + Iterator operator--(int) { + Iterator tmp(*this); + cur_idx_ = (cur_idx_ + capacity_) % (capacity_ + 1); + return tmp; + } + + Iterator operator+(dsize_t x) { return Iterator((cur_idx_ + x) % (capacity_ + 1), ptr_, capacity_, head_); } + + Iterator operator-(dsize_t x) { + return Iterator((cur_idx_ + (capacity_ + 1 - x)) % (capacity_ + 1), ptr_, capacity_, head_); + } + + bool operator<(const Iterator &rhs) { + return (head_ + cur_idx_) % (capacity_ + 1) < (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); + } + + bool operator>(const Iterator &rhs) { + return (head_ + cur_idx_) % (capacity_ + 1) > (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); + } + + bool operator>=(const Iterator &rhs) { + return (head_ + cur_idx_) % (capacity_ + 1) >= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); + } + + bool operator<=(const Iterator &rhs) { + return (head_ + cur_idx_) % (capacity_ + 1) <= (rhs.head_ + rhs.cur_idx_) % (capacity_ + 1); + } + + difference_type operator-(const Iterator &rhs) { + return (cur_idx_ - rhs.cur_idx_ + capacity_ + 1) % (capacity_ + 1); + } + + reference operator*() { return ptr_[cur_idx_]; } + + pointer operator->() { return &(ptr_[cur_idx_]); } + + bool operator==(const Iterator &rhs) { return cur_idx_ == rhs.cur_idx_; } + + bool operator!=(const Iterator &rhs) { return cur_idx_ != rhs.cur_idx_; } + + private: + dsize_t cur_idx_; + pointer ptr_; + dsize_t capacity_; + dsize_t head_; + }; + + /// \brief Default constructor + CyclicArray() : buf_(nullptr), head_(0), tail_(0), size_(0), capacity_(0) {} + + /// \brief Constructor + /// \param[in] capacity + explicit CyclicArray(dsize_t capacity) + : buf_(std::make_unique(capacity + 1)), head_(0), tail_(0), size_(0), capacity_(capacity) {} + + CyclicArray(const CyclicArray &rhs) + : buf_(std::make_unique(rhs.capacity_ + 1)), + head_(rhs.head_), + tail_(rhs.tail_), + size_(rhs.size_), + capacity_(rhs.capacity_) { + std::copy(rhs.begin(), rhs.end(), begin()); + } + + CyclicArray(CyclicArray &&rhs) = default; + + ~CyclicArray() = default; + + /// \brief Iterator begin() + Iterator begin() { return Iterator(head_, buf_.get(), capacity_, head_); } + + /// \brief Iterator end() + Iterator end() { return Iterator(tail_, buf_.get(), capacity_, head_); } + + // not really const. + Iterator begin() const { return Iterator(head_, buf_.get(), capacity_, head_); } + + Iterator end() const { return Iterator(tail_, buf_.get(), capacity_, head_); } + + /// \brief clear the array. Does not deallocate memory, capacity remains the same + void clear() { + head_ = 0; + tail_ = 0; + size_ = 0; + } + + /// \brief returns current size + dsize_t size() { return size_; } + + /// \brief returns capacity + dsize_t capacity() { return capacity_; } + + /// \brief pushes a value + /// \param[in] val value + void push_back(T val) { + buf_[tail_] = val; + if (size_ >= capacity_) { + (tail_ != capacity_) ? tail_++ : tail_ = 0; + (head_ != capacity_) ? head_++ : head_ = 0; + } else { + tail_++; + size_++; + } + } + + /// \brief returns const reference to an element of the array + /// \param[in] idx index of the element + /// \param[out] const T& reference to an element of the array + const T &operator[](dsize_t idx) const { return buf_[(head_ + idx) % (capacity_ + 1)]; } + + /// \brief returns non-const reference to an element of the array + /// \param[in] idx index of the element + /// \param[out] T& reference to an element of the array + T &operator[](dsize_t idx) { return buf_[(head_ + idx) % (capacity_ + 1)]; } + + private: + std::unique_ptr buf_; + dsize_t head_; + dsize_t tail_; + dsize_t size_; + dsize_t capacity_; +}; +} // namespace dataset +} // namespace mindspore +#endif // DATASET_CYCLIC_ARRAY_H diff --git a/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h index 00264939fc..129863c6d1 100644 --- a/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h +++ b/mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #ifndef MINDSPORE_DATASET_ITERATOR_TRACING_H #define MINDSPORE_DATASET_ITERATOR_TRACING_H diff --git a/mindspore/ccsrc/dataset/engine/perf/monitor.cc b/mindspore/ccsrc/dataset/engine/perf/monitor.cc index c9dce004b5..8a0d682b81 100644 --- a/mindspore/ccsrc/dataset/engine/perf/monitor.cc +++ b/mindspore/ccsrc/dataset/engine/perf/monitor.cc @@ -28,7 +28,6 @@ Monitor::Monitor(ExecutionTree *tree) : tree_(tree) { max_samples_ = 0; cur_row_ = 0; } - Status Monitor::operator()() { // Register this thread with TaskManager to receive proper interrupt signal. TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/dataset/engine/perf/monitor.h b/mindspore/ccsrc/dataset/engine/perf/monitor.h index 2a482a6ad7..8b4245db8e 100644 --- a/mindspore/ccsrc/dataset/engine/perf/monitor.h +++ b/mindspore/ccsrc/dataset/engine/perf/monitor.h @@ -29,6 +29,7 @@ class ExecutionTree; class Monitor { public: // Monitor object constructor + explicit Monitor(ExecutionTree *tree); Monitor() = default; diff --git a/mindspore/ccsrc/dataset/engine/perf/perf_data.h b/mindspore/ccsrc/dataset/engine/perf/perf_data.h new file mode 100644 index 0000000000..a201d705ea --- /dev/null +++ b/mindspore/ccsrc/dataset/engine/perf/perf_data.h @@ -0,0 +1,88 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATASET_PERF_DATA_H +#define DATASET_PERF_DATA_H + +#include +#include "dataset/core/constants.h" + +namespace mindspore { +namespace dataset { + +// PerfData is a convenience class to record and store the data produced by Monitor +// and represents a 2D column major table with every column storing samples +// for an operator. The number of rows equals to the number of samples, +// the number of columns equals to the number of operators. +// The capacity is determined on construction and cannot be changed. +// ColumnType can be std::vector or CyclicArray. In case of the latter data can be added +// indefinitely without the risk of overflowing otherwise the capacity must not be exceeded. +// Given PerfData pd(n_rows, n_cols) an element in the column i and row j can be accessed as +// pd[i][j] + +template +class PerfData { + public: + PerfData() = default; + ~PerfData() = default; + PerfData(dsize_t max_rows, dsize_t n_cols) : counter_(0), max_rows_(max_rows), n_cols_(n_cols) { + for (auto i = 0; i < n_cols_; i++) { + data_.push_back(ColumnType(max_rows_)); + } + } + PerfData(const PerfData &rhs) = default; + PerfData(PerfData &&rhs) = default; + + // Adds a row of data + // T must be any container working with range based loops + template + void AddSample(const T &row) { + auto i = 0; + for (const auto &e : row) { + data_[i++].push_back(e); + } + counter_++; + } + + // Fetches a row of data by copy + template + auto Row(dsize_t idx) { + std::vector row(n_cols_); + for (auto i = 0; i < n_cols_; i++) { + row[i] = data_[i][idx]; + } + return row; + } + + // returns a column of data + ColumnType &operator[](size_t idx) { return data_[idx]; } + + const ColumnType &operator[](size_t idx) const { return data_[idx]; } + + dsize_t size() { return counter_ < max_rows_ ? counter_ : max_rows_; } + + dsize_t capacity() { return max_rows_; } + + private: + std::vector data_; + dsize_t counter_; + dsize_t max_rows_; + int n_cols_; +}; + +} // namespace dataset +} // namespace mindspore +#endif // DATASET_PERF_DATA_H diff --git a/mindspore/ccsrc/dataset/engine/perf/profiling.cc b/mindspore/ccsrc/dataset/engine/perf/profiling.cc index 4786b8dd69..66f27c46ba 100644 --- a/mindspore/ccsrc/dataset/engine/perf/profiling.cc +++ b/mindspore/ccsrc/dataset/engine/perf/profiling.cc @@ -14,7 +14,6 @@ * limitations under the License. */ #include "dataset/engine/perf/profiling.h" - #include #include #include @@ -23,6 +22,7 @@ #include "dataset/engine/perf/monitor.h" #include "dataset/engine/perf/device_queue_tracing.h" #include "dataset/engine/perf/connector_size.h" +#include "dataset/engine/perf/connector_throughput.h" #include "dataset/engine/perf/dataset_iterator_tracing.h" #include "utils/log_adapter.h" @@ -72,9 +72,11 @@ Status ProfilingManager::Initialize() { std::shared_ptr dataset_iterator_tracing = std::make_shared(); RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing)); - std::shared_ptr monitor_sampling = std::make_shared(tree_); - RETURN_IF_NOT_OK(RegisterSamplingNode(monitor_sampling)); + std::shared_ptr connector_size_sampling = std::make_shared(tree_); + RETURN_IF_NOT_OK(RegisterSamplingNode(connector_size_sampling)); + std::shared_ptr connector_thr_sampling = std::make_shared(tree_); + RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling)); return Status::OK(); } @@ -140,14 +142,15 @@ Status ProfilingManager::SaveProfilingData() { RETURN_IF_NOT_OK(node.second->SaveToFile()); } MS_LOG(INFO) << "Save profiling data end."; - return Status::OK(); } -double ProfilingTime::GetCurMilliSecond() { - struct timeval tv = {0, 0}; - (void)gettimeofday(&tv, nullptr); - return tv.tv_sec * 1000 + tv.tv_usec / 1000; +int64_t ProfilingTime::GetCurMilliSecond() { + // because cpplint does not allow using namespace + using std::chrono::duration_cast; + using std::chrono::milliseconds; + using std::chrono::steady_clock; + return duration_cast(steady_clock::now().time_since_epoch()).count(); } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/dataset/engine/perf/profiling.h b/mindspore/ccsrc/dataset/engine/perf/profiling.h index d0ea91d566..e38c2d5e54 100644 --- a/mindspore/ccsrc/dataset/engine/perf/profiling.h +++ b/mindspore/ccsrc/dataset/engine/perf/profiling.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "dataset/util/status.h" namespace mindspore { @@ -28,9 +29,10 @@ namespace dataset { class Monitor; class ExecutionTree; -const char kDeviceQueueTracingName[] = "Device Queue Tracing"; -const char kDatasetIteratorTracingName[] = "Dataset Iterator Tracing"; -const char kConnectorSizeSamplingName[] = "Connector Size Sampling"; +const char kDeviceQueueTracingName[] = "Device_Queue_Tracing"; +const char kDatasetIteratorTracingName[] = "Dataset_Iterator_Tracing"; +const char kConnectorSizeSamplingName[] = "Connector_Size_Sampling"; +const char kConnectorThroughputSamplingName[] = "Connector_Throughput_Sampling"; // Profiling is a class of basic unit of profiling action // This base class encapsulate the serialization output logic @@ -59,6 +61,8 @@ class Sampling : public Profiling { public: // Sampling action function. This function will be invoked by performance monitor thread. virtual Status Sample() = 0; + // virtual Status TestPrint() = 0; + virtual ~Sampling() = default; }; // Tracing is class of profiling which record samples upon request. @@ -132,7 +136,7 @@ enum ProfilingTimeSubType { class ProfilingTime { public: - static double GetCurMilliSecond(); + static int64_t GetCurMilliSecond(); }; } // namespace dataset diff --git a/tests/ut/cpp/dataset/CMakeLists.txt b/tests/ut/cpp/dataset/CMakeLists.txt index 8478c8257e..bfdc2b4cb3 100644 --- a/tests/ut/cpp/dataset/CMakeLists.txt +++ b/tests/ut/cpp/dataset/CMakeLists.txt @@ -79,6 +79,8 @@ SET(DE_UT_SRCS mask_test.cc trucate_pair_test.cc concatenate_op_test.cc + cyclic_array_test.cc + perf_data_test.cc ) add_executable(de_ut_tests ${DE_UT_SRCS}) diff --git a/tests/ut/cpp/dataset/cyclic_array_test.cc b/tests/ut/cpp/dataset/cyclic_array_test.cc new file mode 100644 index 0000000000..746e482439 --- /dev/null +++ b/tests/ut/cpp/dataset/cyclic_array_test.cc @@ -0,0 +1,128 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include "common/common.h" +#include "common/cvop_common.h" +#include "gtest/gtest.h" +#include "securec.h" +#include "dataset/util/de_error.h" +#include "dataset/engine/perf/cyclic_array.h" +#include + +using namespace mindspore::dataset; + +class MindDataTestCyclicArray : public UT::Common { + public: + MindDataTestCyclicArray() {} +}; + +TEST_F(MindDataTestCyclicArray, Test1) { + CyclicArray arr(5); + EXPECT_EQ(5, arr.capacity()); + EXPECT_EQ(0, arr.size()); + arr.push_back(0); + EXPECT_EQ(5, arr.capacity()); + EXPECT_EQ(1, arr.size()); + EXPECT_EQ(arr[0], 0); + arr.push_back(1); + EXPECT_EQ(arr[1], 1); + for (auto i = 2; i < 5; i++) { + arr.push_back(i); + } + EXPECT_EQ(arr.capacity(), arr.size()); + EXPECT_EQ(1, arr[1]); + EXPECT_EQ(4, arr[4]); + arr[4] = 42; + EXPECT_EQ(arr[4], 42); + auto a = arr[4]; + EXPECT_EQ(a, 42); + arr.push_back(5); + EXPECT_EQ(arr[0], 1); + EXPECT_EQ(arr[4], 5); + + CyclicArray arr2 = arr; + EXPECT_EQ(arr2.capacity(), arr.capacity()); + EXPECT_EQ(arr2.size(), arr.size()); + auto last = arr2.end(); + auto first = arr2.begin(); + for (auto i = 0; i < arr.size(); i++) { + EXPECT_EQ(arr2[i], arr[i]); + } + + arr.clear(); + EXPECT_EQ(arr.size(), 0); + arr.push_back(42); + arr.push_back(43); + EXPECT_EQ(arr.size(), 2); + EXPECT_EQ(arr.capacity(), 5); + EXPECT_EQ(arr[0], 42); + EXPECT_EQ(arr[1], 43); + auto arr3 = arr; + EXPECT_EQ(arr3.size(), 2); + EXPECT_EQ(arr3.capacity(), 5); + EXPECT_EQ(arr.size(), 2); + EXPECT_EQ(arr.capacity(), 5); + EXPECT_EQ(arr[0], arr3[0]); + EXPECT_EQ(arr[1], arr3[1]); + + arr.clear(); + arr.push_back(21); + arr.push_back(22); + EXPECT_EQ(arr[arr.size() - 1], 22); + for (auto i = 23; i < 27; i++) { + arr.push_back(i); + } + EXPECT_EQ(arr[0], 22); + EXPECT_EQ(arr[arr.size() - 1], 26); +} + +TEST_F(MindDataTestCyclicArray, TestIterator) { + CyclicArray arr(5); + for (auto i = 0; i < arr.capacity(); i++) { + arr.push_back(i); + } + arr.push_back(6); + arr.push_back(7); + auto i = 0; + for (auto it = arr.begin(); it != arr.end(); ++it) { + EXPECT_EQ(*it, arr[i++]); + } + + std::iota(arr.begin(), arr.end(), -4); + EXPECT_EQ(arr[0], -4); + EXPECT_EQ(arr[4], 0); + const auto sz = 1000000; + CyclicArray arr2(sz); + for (auto i = 0; i < sz - 1; i++) { + arr.push_back(0); + } + const auto val = -500000; + std::iota(arr2.begin(), arr2.end() + sz, val); + EXPECT_EQ(*arr2.begin(), val); + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(arr2.begin(), arr2.end(), g); + std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; }); + EXPECT_EQ(*arr2.begin(), val); + const auto new_val = -600000; + for (auto i = 0; i < 100; i++) { + arr2.push_back(new_val); + } + EXPECT_EQ(*(--arr2.end()), new_val); + std::sort(arr2.begin(), arr2.end(), [](const auto a, const auto b) { return a > b; }); + EXPECT_EQ(*arr2.begin(), new_val); +} diff --git a/tests/ut/cpp/dataset/perf_data_test.cc b/tests/ut/cpp/dataset/perf_data_test.cc new file mode 100644 index 0000000000..eaa5e85fa1 --- /dev/null +++ b/tests/ut/cpp/dataset/perf_data_test.cc @@ -0,0 +1,71 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/common.h" +#include "common/cvop_common.h" +#include "gtest/gtest.h" +#include "securec.h" +#include "dataset/util/de_error.h" +#include "dataset/engine/perf/cyclic_array.h" +#include "dataset/engine/perf/perf_data.h" + +using namespace mindspore::dataset; + +class MindDataTestPerfData : public UT::Common { + public: + MindDataTestPerfData() {} +}; + +TEST_F(MindDataTestPerfData, Test1) { + PerfData> p1(2, 3); + PerfData> p2(2, 3); + EXPECT_EQ(p1.capacity(), p2.capacity()); + std::vector row = {1, 2, 3}; + p1.AddSample(row); + p2.AddSample(row); + EXPECT_EQ(p1.size(), p2.size()); + p1.AddSample(row); + p2.AddSample(row); + EXPECT_EQ(p1.size(), p2.size()); + row = {4, 5, 6}; + p2.AddSample(row); + auto r1 = p2.Row(static_cast(0)); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(r1[i], i + 1); + } + + auto r2 = p2.Row(1); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(r2[i], i + 4); + } + + EXPECT_EQ(p2[0][1], 4); + EXPECT_EQ(p2[1][1], 5); + EXPECT_EQ(p2[2][1], 6); +} + +TEST_F(MindDataTestPerfData, Test2) { + auto pd = PerfData>(1000000, 3); + auto row = {1, 2, 3}; + pd.AddSample(row); + EXPECT_EQ(pd[0][0], 1); + EXPECT_EQ(pd[1][0], 2); + EXPECT_EQ(pd[2][0], 3); + row = {4, 5, 6}; + pd.AddSample(row); + EXPECT_EQ(pd[0][0], 1); + EXPECT_EQ(pd[1][0], 2); + EXPECT_EQ(pd[2][0], 3); +} \ No newline at end of file diff --git a/tests/ut/python/dataset/test_profiling.py b/tests/ut/python/dataset/test_profiling.py index fca0a4c1dc..a4ee68e435 100644 --- a/tests/ut/python/dataset/test_profiling.py +++ b/tests/ut/python/dataset/test_profiling.py @@ -23,7 +23,8 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"] DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" -PIPELINE_FILE = "./pipeline_profiling_1.json" +PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json" +PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json" DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt" @@ -43,8 +44,10 @@ def test_profiling_simple_pipeline(): for _ in data1: pass - assert os.path.exists(PIPELINE_FILE) is True - os.remove(PIPELINE_FILE) + assert os.path.exists(PIPELINE_FILE_SIZE) is True + os.remove(PIPELINE_FILE_SIZE) + assert os.path.exists(PIPELINE_FILE_THR) is True + os.remove(PIPELINE_FILE_THR) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE) del os.environ['PROFILING_MODE'] @@ -74,8 +77,10 @@ def test_profiling_complex_pipeline(): for _ in data3: pass - assert os.path.exists(PIPELINE_FILE) is True - os.remove(PIPELINE_FILE) + assert os.path.exists(PIPELINE_FILE_SIZE) is True + os.remove(PIPELINE_FILE_SIZE) + assert os.path.exists(PIPELINE_FILE_THR) is True + os.remove(PIPELINE_FILE_THR) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE) del os.environ['PROFILING_MODE'] @@ -103,8 +108,10 @@ def test_profiling_sampling_iterval(): for _ in data1: pass - assert os.path.exists(PIPELINE_FILE) is True - os.remove(PIPELINE_FILE) + assert os.path.exists(PIPELINE_FILE_SIZE) is True + os.remove(PIPELINE_FILE_SIZE) + assert os.path.exists(PIPELINE_FILE_THR) is True + os.remove(PIPELINE_FILE_THR) assert os.path.exists(DATASET_ITERATOR_FILE) is True os.remove(DATASET_ITERATOR_FILE)