diff --git a/mindspore/ccsrc/profiler/device/gpu/data_saver.cc b/mindspore/ccsrc/profiler/device/gpu/data_saver.cc index 8f54518747..cddff4938f 100644 --- a/mindspore/ccsrc/profiler/device/gpu/data_saver.cc +++ b/mindspore/ccsrc/profiler/device/gpu/data_saver.cc @@ -168,6 +168,7 @@ void DataSaver::WriteFile(std::string out_path_dir) { WriteOpType(out_path_dir); WriteActivity(out_path_dir); WriteOpTimestamp(out_path_dir); + WriteStepTrace(out_path_dir); } void DataSaver::WriteOpType(const std::string &saver_base_dir) { @@ -262,9 +263,56 @@ void DataSaver::WriteOpTimestamp(const std::string &saver_base_dir) { ChangeFileMode(file_path); } +void DataSaver::WriteStepTrace(const std::string &saver_base_dir) { + std::string file_path = saver_base_dir + "/step_trace_profiling_" + device_id_ + ".txt"; + std::ofstream ofs(file_path); + // check if the file is writable + if (!ofs.is_open()) { + MS_LOG(WARNING) << "Open file '" << file_path << "' failed!"; + return; + } + + // write step trace time info into file + uint32_t factor = 10; + std::vector op_name_arr; + op_name_arr.push_back(step_trace_op_name.trace_iter_start); + op_name_arr.push_back(step_trace_op_name.trace_fp_start); + op_name_arr.push_back(step_trace_op_name.trace_bp_end); + op_name_arr.push_back(step_trace_op_name.trace_iter_end); + if (!step_trace_op_name.trace_custom_node.empty()) { + auto start = step_trace_op_name.trace_custom_node.begin(); + auto end = step_trace_op_name.trace_custom_node.end(); + std::copy(start, end, std::back_inserter(op_name_arr)); + } + for (auto op_name : op_name_arr) { + auto iter_op_timestamp = op_timestamps_map_.find(op_name); + if (iter_op_timestamp != op_timestamps_map_.end()) { + try { + ofs << op_name << " "; + for (auto start_end : iter_op_timestamp->second) { + // convert the time unit from 1ns to 10ns (keep the same with ascend) + uint64_t duration = start_end.duration * kTimeUnit; + uint64_t end_timestamp = (duration + start_end.start_timestamp) / factor; + uint64_t start_timestamp = start_end.start_timestamp / factor; + ofs << start_timestamp << "," << end_timestamp << " "; + } + ofs << std::endl; + } catch (const std::exception &e) { + MS_LOG(ERROR) << "Write " << file_path << "failed:" << e.what(); + } + } + } + + ofs.close(); + ChangeFileMode(file_path); + MS_LOG(INFO) << "Write step trace infos into file: " << file_path; +} + +void DataSaver::SetStepTraceOpName(ProfilingTraceInfo trace_op_name) { step_trace_op_name = trace_op_name; } + void DataSaver::ChangeFileMode(const std::string &file_path) { if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) { - MS_LOG(INFO) << "Modify file:" << file_path << " to rw fail."; + MS_LOG(WARNING) << "Modify file:" << file_path << " to rw fail."; return; } } diff --git a/mindspore/ccsrc/profiler/device/gpu/data_saver.h b/mindspore/ccsrc/profiler/device/gpu/data_saver.h index 2f3ba54ddb..f28f8e0c39 100644 --- a/mindspore/ccsrc/profiler/device/gpu/data_saver.h +++ b/mindspore/ccsrc/profiler/device/gpu/data_saver.h @@ -17,6 +17,7 @@ #ifndef MINDSPORE_DATA_SAVER_H #define MINDSPORE_DATA_SAVER_H #include +#include #include #include #include @@ -124,6 +125,8 @@ class DataSaver { void ParseOpInfo(const OpInfoMap &op_info_maps); + void SetStepTraceOpName(ProfilingTraceInfo trace_op_name); + void ParseEvent(const std::vector &events); void WriteFile(std::string out_path); @@ -145,6 +148,8 @@ class DataSaver { void WriteOpTimestamp(const std::string &saver_base_dir); + void WriteStepTrace(const std::string &saver_base_dir); + void ChangeFileMode(const std::string &file_path); std::string device_id_; @@ -152,6 +157,7 @@ class DataSaver { OpTypeInfos op_type_infos_; OpDetailInfos op_detail_infos_; OpTimestampInfo op_timestamps_map_; + ProfilingTraceInfo step_trace_op_name; }; } // namespace gpu } // namespace profiler diff --git a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc index 85b3936916..44a2bc83a9 100644 --- a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc +++ b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc @@ -470,6 +470,7 @@ void GPUProfiler::SaveProfileData() { MS_LOG(WARNING) << "Profile data path is empty, skip save profile data."; } else { DataSaver dataSaver; + dataSaver.SetStepTraceOpName(step_trace_op_name); dataSaver.ParseOpInfo(op_info_map_); dataSaver.ParseEvent(events_); dataSaver.WriteFile(profile_data_path_); @@ -649,6 +650,9 @@ void GPUProfiler::HandleActivityRecord(CUpti_Activity *record) { AddEvent(std::move(profilingData)); } + +void GPUProfiler::SetStepTraceOpName(ProfilingTraceInfo trace_op_name) { step_trace_op_name = trace_op_name; } + void GPUProfiler::RegisterProfilingOp(std::shared_ptr node) { if (profiling_op_.find(node->Name()) != profiling_op_.end()) { return; diff --git a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h index d3510d9a27..76ebae26eb 100644 --- a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h +++ b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h @@ -27,6 +27,7 @@ #include #include #include +#include "profiler/device/gpu/gpu_profiling_utils.h" namespace mindspore { namespace profiler { @@ -144,6 +145,7 @@ class GPUProfiler { void OpDataProducerEnd(); void ProcessEvents(); void RegisterProfilingOp(std::shared_ptr node); + void SetStepTraceOpName(ProfilingTraceInfo trace_op_name); std::string ProfileDataPath() const { return profile_data_path_; } private: @@ -189,6 +191,7 @@ class GPUProfiler { uint64_t op_cupti_time_start_; std::string profile_data_path_; std::map> profiling_op_; + ProfilingTraceInfo step_trace_op_name; }; } // namespace gpu } // namespace profiler diff --git a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.cc b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.cc new file mode 100644 index 0000000000..52a1dfa31b --- /dev/null +++ b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.cc @@ -0,0 +1,187 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "profiler/device/gpu/gpu_profiling_utils.h" +#include "backend/kernel_compiler/kernel.h" +#include "backend/session/anf_runtime_algorithm.h" +#include "utils/ms_utils.h" +#include "utils/ms_context.h" +#include "utils/utils.h" + +namespace mindspore { +namespace profiler { +namespace gpu { +constexpr char kFpStartNode[] = "PROFILING_FP_START"; +constexpr char kBpEndNode[] = "PROFILING_BP_END"; +constexpr char kIterEndNode[] = "PROFILING_ITER_END"; +constexpr int fpExistGraphId = 2; + +uint32_t ProfilingUtils::last_graph_id = 0; +bool ProfilingUtils::have_communication_op = false; +ProfilingTraceInfo ProfilingUtils::profiling_trace = {"", "", "", "", false}; + +ProfilingTraceInfo ProfilingUtils::GetProfilingTraceFromEnv(NotNull graph_ptr) { + MS_LOG(INFO) << "get current subgraph op name start."; + auto &cnode_exec_order = graph_ptr->execution_order(); + if (cnode_exec_order.empty()) { + return profiling_trace; + } + uint32_t current_graph_id = graph_ptr->graph_id(); + // current graph id less than last graph id indicates all subgraph have called. + if (current_graph_id < last_graph_id) { + profiling_trace.IsFirstStepEnd = true; + OutputStepTraceOpNameStatus(); + return profiling_trace; + } + + SetTraceIterStart(cnode_exec_order); + SetTraceIterEnd(cnode_exec_order); + SetTraceFpStart(cnode_exec_order, current_graph_id); + SetTraceBpEnd(cnode_exec_order); + GetTraceHccl(cnode_exec_order); + + last_graph_id = current_graph_id; + return profiling_trace; +} + +void ProfilingUtils::OutputStepTraceOpNameStatus() { + if (!profiling_trace.IsValid()) { + MS_LOG(ERROR) << "Did not get all the step_trace op name."; + } + MS_LOG(INFO) << "[profiling]trace_iter_start: " << profiling_trace.trace_iter_start + << "trace_fp_start: " << profiling_trace.trace_fp_start + << "trace_bp_end: " << profiling_trace.trace_bp_end + << "trace_iter_end: " << profiling_trace.trace_iter_end; + MS_LOG(INFO) << "get step_trace op name end."; +} + +void ProfilingUtils::GetTraceHccl(const std::vector &cnode_exec_order) { + for (const auto &node : cnode_exec_order) { + if (AnfAlgo::IsCommunicationOp(node)) { + MS_EXCEPTION_IF_NULL(node); + if (std::find(profiling_trace.trace_custom_node.begin(), profiling_trace.trace_custom_node.end(), + node->fullname_with_scope()) == profiling_trace.trace_custom_node.end()) { + profiling_trace.trace_custom_node.push_back(node->fullname_with_scope()); + } + MS_LOG(INFO) << "[profiling]Get hccl node:" << node->fullname_with_scope(); + } + } +} + +void ProfilingUtils::SetTraceIterStart(const std::vector &cnode_exec_order) { + if (!profiling_trace.trace_iter_start.empty()) { + return; + } + + auto first_node = cnode_exec_order.front(); + MS_EXCEPTION_IF_NULL(first_node); + if (AnfAlgo::GetCNodeName(first_node) == kGetNextOpName) { + profiling_trace.trace_iter_start = first_node->fullname_with_scope(); + } +} + +void ProfilingUtils::SetTraceFpStart(const std::vector &cnode_exec_order, uint32_t graph_id) { + if (!profiling_trace.trace_fp_start.empty()) { + return; + } + + const char *trace_fp_start = std::getenv(kFpStartNode); + if (trace_fp_start != nullptr) { + profiling_trace.trace_fp_start = std::string(trace_fp_start); + MS_LOG(INFO) << "Set the Fp Start Op Name from Environment Variable:" << profiling_trace.trace_fp_start; + return; + } + + if (graph_id == fpExistGraphId) { + auto first_node = cnode_exec_order.front(); + MS_EXCEPTION_IF_NULL(first_node); + profiling_trace.trace_fp_start = first_node->fullname_with_scope(); + } +} + +void ProfilingUtils::SetTraceBpEnd(const std::vector &cnode_exec_order) { + const char *trace_bp_end = std::getenv(kBpEndNode); + if (trace_bp_end != nullptr) { + profiling_trace.trace_bp_end = std::string(trace_bp_end); + MS_LOG(INFO) << "Set the Bp End Op Name from Environment Variable:" << profiling_trace.trace_bp_end; + return; + } + + std::string bp_end_str; + // Contain hccl kernel (try to find the last communication op) + auto iter = cnode_exec_order.rbegin(); + while (iter != cnode_exec_order.rend()) { + if (AnfAlgo::IsCommunicationOp(*iter)) { + break; + } + ++iter; + } + // If find the communication op + if (iter != cnode_exec_order.rend()) { + // store communication op input nodes' name + std::set ar_input_node_names; + for (size_t i = 0; i < AnfAlgo::GetInputTensorNum(*iter); ++i) { + auto input_node_with_index = AnfAlgo::GetPrevNodeOutput(*iter, i); + auto input_node = input_node_with_index.first; + ar_input_node_names.insert(input_node->fullname_with_scope()); + } + // start from previous node + ++iter; + // find input names in previous node + while (iter != cnode_exec_order.rend()) { + if (ar_input_node_names.find((*iter)->fullname_with_scope()) != ar_input_node_names.end()) { + bp_end_str = (*iter)->fullname_with_scope(); + break; + } + ++iter; + } + } + + if (bp_end_str.empty() && !have_communication_op) { + bp_end_str = GetGraphSecondLastKernelName(cnode_exec_order); + } + + if (!bp_end_str.empty()) { + profiling_trace.trace_bp_end = bp_end_str; + } +} + +void ProfilingUtils::SetTraceIterEnd(const std::vector &cnode_exec_order) { + const char *trace_iter_end = std::getenv(kIterEndNode); + if (trace_iter_end != nullptr) { + profiling_trace.trace_iter_end = std::string(trace_iter_end); + MS_LOG(INFO) << "Set the Iter End Op Name from Environment Variable:" << profiling_trace.trace_iter_end; + return; + } + + auto iter_end = cnode_exec_order.rbegin(); + profiling_trace.trace_iter_end = (*iter_end)->fullname_with_scope(); +} + +std::string ProfilingUtils::GetGraphSecondLastKernelName(const std::vector &cnode_exec_order) { + std::string second_last_kernel_name; + auto iter = cnode_exec_order.rbegin(); + ++iter; + if (iter != cnode_exec_order.rend()) { + second_last_kernel_name = (*iter)->fullname_with_scope(); + } + + return second_last_kernel_name; +} + +} // namespace gpu +} // namespace profiler +} // namespace mindspore diff --git a/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.h b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.h new file mode 100644 index 0000000000..58a1c79572 --- /dev/null +++ b/mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.h @@ -0,0 +1,79 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_ +#define MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_ + +#include +#include +#include +#include +#include +#include + +#include "backend/session/kernel_graph.h" + +namespace mindspore { +namespace profiler { +namespace gpu { +struct ProfilingTraceInfo { + // support get all the op name from environment variable + // iteration start op is GetNext + std::string trace_iter_start; + // fp start op is the first op of Graph that ID is 2 + std::string trace_fp_start; + // bp end op is the input node op of the last communication op (if exist) + std::string trace_bp_end; + // iteration end op is the last executed op + std::string trace_iter_end; + + bool IsFirstStepEnd; + + // profiling specific op, such as AllReduce; + std::vector trace_custom_node; + + bool IsValid() const { + return !(trace_iter_start.empty() || trace_fp_start.empty() || trace_bp_end.empty() || trace_iter_end.empty()); + } +}; + +class ProfilingUtils { + public: + ProfilingUtils() = default; + ~ProfilingUtils() = default; + + // Get profiling trace point from envs. + // export PROFILING_FP_START='full name of the first cnode to execute' + // export PROFILING_BP_END='full name of the last backpropagation cnode to execute' + // export PROFILING_ITER_END='full name of last cnode in graph to execute' + static ProfilingTraceInfo GetProfilingTraceFromEnv(NotNull graph_ptr); + static void OutputStepTraceOpNameStatus(); + + static uint32_t last_graph_id; + static bool have_communication_op; + static ProfilingTraceInfo profiling_trace; + + private: + static void SetTraceIterStart(const std::vector &cnode_exec_order); + static void SetTraceFpStart(const std::vector &cnode_exec_order, uint32_t graph_id); + static void SetTraceBpEnd(const std::vector &cnode_exec_order); + static void SetTraceIterEnd(const std::vector &cnode_exec_order); + static std::string GetGraphSecondLastKernelName(const std::vector &cnode_exec_order); + static void GetTraceHccl(const std::vector &cnode_exec_order); +}; +} // namespace gpu +} // namespace profiler +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_ diff --git a/mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc b/mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc index 38facae809..3296746b52 100644 --- a/mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc +++ b/mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc @@ -1,5 +1,5 @@ /** - * Copyright 2019 Huawei Technologies Co., Ltd + * Copyright 2019-2020 Huawei Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ #include "common/trans.h" #include "ir/dtype.h" #include "profiler/device/gpu/gpu_profiling.h" +#include "profiler/device/gpu/gpu_profiling_utils.h" #include "utils/shape_utils.h" #include "debug/data_dump/dump_json_parser.h" #include "backend/kernel_compiler/gpu/gpu_kernel.h" @@ -586,6 +587,16 @@ bool GPUKernelRuntime::LaunchKernelDynamic(const session::KernelGraph *graph, bo auto profiler_inst = profiler::gpu::GPUProfiler::GetInstance(); MS_EXCEPTION_IF_NULL(profiler_inst); + static bool FlagGetStepTraceOpName = false; + if (!FlagGetStepTraceOpName) { + profiler::gpu::ProfilingTraceInfo profiling_trace = + profiler::gpu::ProfilingUtils::GetProfilingTraceFromEnv(NOT_NULL(graph)); + if (profiling_trace.IsFirstStepEnd) { + FlagGetStepTraceOpName = true; + profiler_inst->SetStepTraceOpName(profiling_trace); + } + } + for (const auto &kernel : kernels) { auto kernel_mod = AnfAlgo::GetKernelMod(kernel); MS_EXCEPTION_IF_NULL(kernel_mod); diff --git a/mindspore/profiler/parser/step_trace_parser.py b/mindspore/profiler/parser/step_trace_parser.py index 2bb35ae274..471c3ed5d8 100644 --- a/mindspore/profiler/parser/step_trace_parser.py +++ b/mindspore/profiler/parser/step_trace_parser.py @@ -33,7 +33,7 @@ StepTraceStruct = namedtuple( ) -class StepTraceParser: +class BaseStepTraceParser: """ The parser for step trace data. @@ -43,11 +43,6 @@ class StepTraceParser: job_id (int): The job id used to define the start of new step. Default: 0. skip_first_step (bool): Whether skip the first step or not. """ - _event_size = 20 - _fp_tag = 1 - _bp_tag = 2 - _end_tag = 255 - def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False): self._input_dir = input_dir self._output_path = output_file_path @@ -98,18 +93,6 @@ class StepTraceParser: Returns: dict, parsed point info. """ - points = { - 'fp_start': point_info.get(self._fp_tag, ''), - 'bp_end': point_info.get(self._bp_tag, '') - } - try: - with open(output_path, 'w') as json_file: - json.dump(points, json_file) - os.chmod(output_path, stat.S_IREAD) - except (IOError, OSError) as err: - log.warning('Failed to save point info. %s', err) - raise ProfilerIOException - return points def update_tag_op_type_map(self, point_info): """ @@ -153,17 +136,7 @@ class StepTraceParser: def _get_step_trace_files(self): """Get step trace files.""" - # step trace files may under $profiler_dir or $profiler_dir/data - profiler_dir = self._input_dir - step_trace_files = self._search_file(profiler_dir) - if not step_trace_files: - # try to find step trace files under $profiler_dir/data - profiler_dir = os.path.join(profiler_dir, 'data') - step_trace_files = self._search_file(profiler_dir) - if not step_trace_files: - raise ProfilerPathErrorException('Training trace file does not exist.') - - return step_trace_files + return self._input_dir @staticmethod def _search_file(input_dir): @@ -198,19 +171,6 @@ class StepTraceParser: def _parse(self, source_files): """Parse source step trace files.""" - log.info("Start to parse step trace file.") - event_info = {} - for source_file in source_files: - source_file = validate_and_normalize_path(source_file) - with open(source_file, 'rb') as handler: - content = handler.read() - for step_trace in self._get_next_step_trace(content, event_info): - if self._skip_first_step: - self._skip_first_step = False - continue - self._record_trace_event(step_trace) - self._record_average_info() - log.info("Finish to parse step trace file.") def _get_next_step_trace(self, content, event_info): """ @@ -337,22 +297,8 @@ class StepTraceParser: Returns: dict, reduce info. """ - reduce_info = {} - if end_point[0] - start_point[0] != 1 or end_point[0] % 2: - log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point) - return reduce_info - op_type = self._tag_map.get(start_point[0]) - # append field name with op type. - if not op_type: - log.warning("Can't recognize the inner type for point tag: %d.", start_point[0]) - field_name += '_parallel' - else: - field_name += '_' + op_type - reduce_info[field_name] = end_point[1] - start_point[1] - reduce_info[field_name + '_start_point'] = start_point[1] - reduce_info[field_name + '_end_point'] = end_point[1] - - return reduce_info + ret_dict = {} + return ret_dict def _record_average_info(self): """Calculate average info.""" @@ -383,3 +329,204 @@ class StepTraceParser: for row_data in self._result: csv_writer.writerow(row_data) os.chmod(self._output_path, stat.S_IREAD) + + +class GpuStepTraceParser(BaseStepTraceParser): + """The parser for gpu step trace data.""" + def record_point_info(self, source_file, output_path): + """ + Record point info into json. + + Args: + source_file (str): The file path of step trace original data. + output_path (str): The output path for saving point info. + + Returns: + dict, parsed point info. + """ + fp_start, bp_end = 1, 2 + try: + with open(source_file, 'r') as f: + lines = f.readlines() + fp_start_name = lines[fp_start].split()[0] + bp_end_name = lines[bp_end].split()[0] + except (IOError, OSError) as err: + log.warning(f'Failed to read {source_file}', err) + raise ProfilerIOException + + points = { + 'fp_start': fp_start_name, + 'bp_end': bp_end_name + } + try: + with open(output_path, 'w') as json_file: + json.dump(points, json_file) + os.chmod(output_path, stat.S_IREAD) + except (IOError, OSError) as err: + log.warning('Failed to save point info. %s', err) + raise ProfilerIOException + return points + + def _get_step_trace_files(self): + """Get step trace files.""" + return self._input_dir + + def _parse(self, source_file): + """Parse source step trace files.""" + log.info("Start to parse step trace file.") + iter_start, fp_start, bp_end, iter_end = 0, 1, 2, 3 + reduce_start = 4 + start_time, end_time = 0, 1 + + source_file = validate_and_normalize_path(source_file) + try: + with open(source_file, 'r') as f: + lines = f.readlines() + step_trace_info_all = [line.strip().split() for line in lines] + num_of_step = len(step_trace_info_all[0]) + # in callback mode that set the profiling step range, each op count is not equal + step_trace_info_all = [line[-num_of_step:] for line in step_trace_info_all] + except (IOError, OSError) as err: + log.warning(f'Failed to read {source_file}', err) + raise ProfilerIOException + + for step_num in range(1, num_of_step): + step_trace = { + 'start': int(step_trace_info_all[iter_start][step_num].split(',')[start_time]), + 'fp': int(step_trace_info_all[fp_start][step_num].split(',')[start_time]), + 'bp': int(step_trace_info_all[bp_end][step_num].split(',')[end_time]), + 'end': int(step_trace_info_all[iter_end][step_num].split(',')[end_time]), + 'reduce': {} + } + num_of_step_point = len(step_trace_info_all) + if num_of_step_point > reduce_start: + reduce_info = {} + reduce_time_info = [] + for reduce_idx in range(reduce_start, num_of_step_point): + cur_reduce_time = step_trace_info_all[reduce_idx][step_num] + reduce_time_info += cur_reduce_time.split(',') + reduce_info['ops'] = reduce_time_info + step_trace['reduce'] = reduce_info + self._record_trace_event(step_trace) + self._record_average_info() + log.info("Finish to parse step trace file.") + + def _get_single_reduce_event_info(self, field_name, start_point, end_point): + """ + Get single reduce info. + + Args: + field_name (str): The field name. + start_point (str): Start point time. + end_point (str): End point time. + + Returns: + dict, reduce info. + """ + reduce_info = {} + + op_type = 'AllReduce' + # append field name with op type. + field_name += '_' + op_type + reduce_info[field_name] = int(end_point) - int(start_point) + reduce_info[field_name + '_start_point'] = start_point + reduce_info[field_name + '_end_point'] = end_point + + return reduce_info + + +class AscendStepTraceParser(BaseStepTraceParser): + """The parser for ascend step trace data.""" + _event_size = 20 + _fp_tag = 1 + _bp_tag = 2 + _end_tag = 255 + + def record_point_info(self, point_info, output_path): + """ + Record point info into json. + + Args: + point_info (dict): The point info about tag id and relative op name. + output_path (str): The output path for saving point info. + + Returns: + dict, parsed point info. + """ + points = { + 'fp_start': point_info.get(self._fp_tag, ''), + 'bp_end': point_info.get(self._bp_tag, '') + } + try: + with open(output_path, 'w') as json_file: + json.dump(points, json_file) + os.chmod(output_path, stat.S_IREAD) + except (IOError, OSError) as err: + log.warning('Failed to save point info. %s', err) + raise ProfilerIOException + return points + + def _get_step_trace_files(self): + """Get step trace files.""" + # step trace files may under $profiler_dir or $profiler_dir/data + profiler_dir = self._input_dir + step_trace_files = self._search_file(profiler_dir) + if not step_trace_files: + # try to find step trace files under $profiler_dir/data + profiler_dir = os.path.join(profiler_dir, 'data') + step_trace_files = self._search_file(profiler_dir) + if not step_trace_files: + raise ProfilerPathErrorException('Training trace file does not exist.') + + return step_trace_files + + def _parse(self, source_files): + """Parse source step trace files.""" + log.info("Start to parse step trace file.") + event_info = {} + + for source_file in source_files: + source_file = validate_and_normalize_path(source_file) + try: + with open(source_file, 'rb') as handler: + content = handler.read() + for step_trace in self._get_next_step_trace(content, event_info): + if self._skip_first_step: + self._skip_first_step = False + continue + self._record_trace_event(step_trace) + except (IOError, OSError) as err: + log.warning(f'Failed to read {source_file}', err) + raise ProfilerIOException + + self._record_average_info() + log.info("Finish to parse step trace file.") + + def _get_single_reduce_event_info(self, field_name, start_point, end_point): + """ + Get single reduce info. + + Args: + field_name (str): The field name. + start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count). + end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count). + + Returns: + dict, reduce info. + """ + reduce_info = {} + if end_point[0] - start_point[0] != 1 or end_point[0] % 2: + log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point) + return reduce_info + op_type = self._tag_map.get(start_point[0]) + # append field name with op type. + if not op_type: + log.warning("Can't recognize the inner type for point tag: %d.", start_point[0]) + field_name += '_parallel' + else: + field_name += '_' + op_type + reduce_info[field_name] = end_point[1] - start_point[1] + reduce_info[field_name + '_start_point'] = start_point[1] + reduce_info[field_name + '_end_point'] = end_point[1] + + return reduce_info diff --git a/mindspore/profiler/profiling.py b/mindspore/profiler/profiling.py index f89c177411..0e84dfa85a 100644 --- a/mindspore/profiler/profiling.py +++ b/mindspore/profiler/profiling.py @@ -33,7 +33,7 @@ from mindspore.profiler.parser.minddata_parser import MinddataParser from mindspore.profiler.parser.minddata_pipeline_parser import \ MinddataPipelineParser from mindspore.profiler.parser.optime_parser import OPComputeTimeParser -from mindspore.profiler.parser.step_trace_parser import StepTraceParser +from mindspore.profiler.parser.step_trace_parser import GpuStepTraceParser, AscendStepTraceParser from mindspore.nn.cell import Cell PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling" @@ -154,6 +154,12 @@ class Profiler: except ProfilerException as err: logger.warning(err.message) + # analyse step trace info + try: + self._analyse_step_trace() + except ProfilerException as err: + logger.warning(err.message) + os.environ['PROFILING_MODE'] = str("false") elif self._device_target and self._device_target == "Ascend": @@ -227,7 +233,7 @@ class Profiler: os.environ['PROFILING_MODE'] = str("false") context.set_context(enable_profiling=False) - def _analyse_step_trace(self, source_path, framework_parser): + def _analyse_step_trace(self, source_path=None, framework_parser=None): """ Analyse step trace data and save the result. @@ -247,18 +253,29 @@ class Profiler: ) step_trace_intermediate_file_path = validate_and_normalize_path(step_trace_intermediate_file_path) point_info_file_path = validate_and_normalize_path(point_info_file_path) - # whether keep the first step - skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME) - point_info = framework_parser.point_info - # parser the step trace files and save the result to disk - source_path = validate_and_normalize_path(source_path) - parser = StepTraceParser(input_dir=source_path, - output_file_path=step_trace_intermediate_file_path, - job_id=self._job_id_env, - skip_first_step=skip_first_step_flag) - parser.update_tag_op_type_map(point_info) - parser.parse_and_save() - point_info = parser.record_point_info(point_info, point_info_file_path) + + if self._device_target and self._device_target == 'GPU': + input_file_path = os.path.join( + self._output_path, + f'step_trace_profiling_{self._dev_id}.txt' + ) + parser = GpuStepTraceParser(input_dir=input_file_path, + output_file_path=step_trace_intermediate_file_path) + parser.parse_and_save() + point_info = parser.record_point_info(input_file_path, point_info_file_path) + else: + # whether keep the first step + skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME) + point_info = framework_parser.point_info + # parser the step trace files and save the result to disk + source_path = validate_and_normalize_path(source_path) + parser = AscendStepTraceParser(input_dir=source_path, + output_file_path=step_trace_intermediate_file_path, + job_id=self._job_id_env, + skip_first_step=skip_first_step_flag) + parser.update_tag_op_type_map(point_info) + parser.parse_and_save() + point_info = parser.record_point_info(point_info, point_info_file_path) # print parser result parser.show() logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)