Fix the bug of Profiler timestamp variate between each process in multi-card scene

pull/11539/head
gzhcv 4 years ago
parent 27dc6e19a3
commit e823152929

@ -80,7 +80,9 @@ class Profiler:
def __init__(self, **kwargs):
# get device_id and device_target
self._get_devid_and_devtarget()
format_time = int(time.time())
# to avoid get different timestamp between each process in multi-card training,
# set the timestamp which is divisible by 3
format_time = int(time.time() - time.time() % 3)
output_path = kwargs.pop("output_path", f"data-{format_time}")
self._output_path = validate_and_normalize_path(output_path)
self._output_path = os.path.join(self._output_path, f"profiler-{format_time}")
@ -171,105 +173,113 @@ class Profiler:
>>> profiler.analyse()
"""
if self._device_target and self._device_target == "GPU":
if context.get_auto_parallel_context('device_num') > 1 and self._dev_id != get_rank():
self._dev_id = get_rank()
logger.error('Please check the Profiler object initialized after set_auto_parallel_context() '
'and init(). Profiler should be initialized after these code. ')
self._gpu_profiler.stop()
timeline_generator = self._generate_timeline()
# parse minddata pipeline operator and queue for GPU
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
try:
self._analyse_step_trace(is_training_mode_flag=timeline_generator.check_op_name('Gradients'))
except ProfilerException as err:
logger.warning(err.message)
os.environ['PROFILING_MODE'] = str("false")
self._gpu_analyse()
elif self._device_target and self._device_target == "Ascend":
release()
self._ascend_analyse()
def _ascend_analyse(self):
"""Collect and analyse ascend performance data"""
release()
job_id = self._get_profiling_job_id()
logger.info("Profiling: job id is %s ", job_id)
source_path = os.path.join(self._output_path, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
source_path = validate_and_normalize_path(source_path)
hwts_output_filename = validate_and_normalize_path(hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
hwtslog_parser.execute()
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
opcompute_output_filename = validate_and_normalize_path(opcompute_output_filename)
optime_parser = OPComputeTimeParser(
hwts_output_filename, opcompute_output_filename,
op_task_dict, self._output_path, self._dev_id
)
optime_parser.execute()
job_id = self._get_profiling_job_id()
logger.info("Profiling: job id is %s ", job_id)
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
output_data_preprocess_aicpu = validate_and_normalize_path(output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
source_path = os.path.join(self._output_path, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
source_path = validate_and_normalize_path(source_path)
hwts_output_filename = validate_and_normalize_path(hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
_ = hwtslog_parser.execute()
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
opcompute_output_filename = validate_and_normalize_path(opcompute_output_filename)
optime_parser = OPComputeTimeParser(
hwts_output_filename, opcompute_output_filename,
op_task_dict, self._output_path, self._dev_id
)
optime_parser.execute()
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
output_data_preprocess_aicpu = validate_and_normalize_path(output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
points = None
try:
points = self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
# analyse timeline info
try:
self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
# analyse memory usage info
try:
self._analyse_memory_usage(points)
except (ProfilerIOException, ProfilerFileNotFoundException, ProfilerRawFileException) as err:
logger.warning(err.message)
os.environ['PROFILING_MODE'] = str("false")
context.set_context(enable_profiling=False)
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
points = None
try:
points = self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
# analyse timeline info
try:
self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
# analyse memory usage info
try:
self._analyse_memory_usage(points)
except (ProfilerIOException, ProfilerFileNotFoundException, ProfilerRawFileException) as err:
logger.warning(err.message)
os.environ['PROFILING_MODE'] = str("false")
context.set_context(enable_profiling=False)
def _gpu_analyse(self):
"""Collect and analyse gpu performance data"""
if context.get_auto_parallel_context('device_num') > 1 and self._dev_id != get_rank():
self._dev_id = get_rank()
logger.error('Please check the Profiler object initialized after set_auto_parallel_context() '
'and init(). Profiler should be initialized after these code. ')
self._gpu_profiler.stop()
timeline_generator = self._generate_timeline()
# parse minddata pipeline operator and queue for GPU
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
try:
self._analyse_step_trace(is_training_mode_flag=timeline_generator.check_op_name('Gradients'))
except ProfilerException as err:
logger.warning(err.message)
os.environ['PROFILING_MODE'] = str("false")
def _analyse_step_trace(self, source_path=None, framework_parser=None, is_training_mode_flag=True):
"""

Loading…
Cancel
Save