|
|
|
@ -27,6 +27,7 @@ from . import core
|
|
|
|
|
from . import compiler
|
|
|
|
|
from .. import compat as cpt
|
|
|
|
|
from .trainer_factory import TrainerFactory
|
|
|
|
|
from .trainer_factory import FetchHandlerMonitor
|
|
|
|
|
|
|
|
|
|
__all__ = ['Executor', 'global_scope', 'scope_guard']
|
|
|
|
|
|
|
|
|
@ -377,6 +378,27 @@ def _as_lodtensor(data, place):
|
|
|
|
|
return tensor
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FetchHandler(object):
|
|
|
|
|
def __init__(self, fetch_target_names, period_secs=60, return_np=True):
|
|
|
|
|
self.fetch_target_names = fetch_target_names
|
|
|
|
|
self.period_secs = period_secs
|
|
|
|
|
self.return_np = return_np
|
|
|
|
|
|
|
|
|
|
def handler(self, fetch_target_vars):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def help():
|
|
|
|
|
print("""
|
|
|
|
|
class FetchHandlerExamlpe(FetchHandler):
|
|
|
|
|
def handler(self, fetch_target_vars):
|
|
|
|
|
b_auc = fetch_target_vars[0]
|
|
|
|
|
g_auc = fetch_target_vars[1]
|
|
|
|
|
|
|
|
|
|
print("b_auc: {}, g_auc: {} at time: {}".format(b_auc, g_auc, time.ctime()))
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Executor(object):
|
|
|
|
|
"""
|
|
|
|
|
An Executor in Python, supports single/multiple-GPU running,
|
|
|
|
@ -918,6 +940,67 @@ class Executor(object):
|
|
|
|
|
trainer._set_fetch_var_and_info(fetch_list, fetch_info, print_period)
|
|
|
|
|
return scope, trainer
|
|
|
|
|
|
|
|
|
|
def _run_from_dataset(self,
|
|
|
|
|
program=None,
|
|
|
|
|
dataset=None,
|
|
|
|
|
scope=None,
|
|
|
|
|
thread=0,
|
|
|
|
|
is_infer=False,
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=None,
|
|
|
|
|
fetch_info=None,
|
|
|
|
|
print_period=100,
|
|
|
|
|
fetch_handler=None):
|
|
|
|
|
if dataset is None:
|
|
|
|
|
raise RuntimeError("dataset is need and should be initialized")
|
|
|
|
|
|
|
|
|
|
if program._pipeline_opt:
|
|
|
|
|
thread = self._adjust_pipeline_resource(program._pipeline_opt,
|
|
|
|
|
dataset, thread)
|
|
|
|
|
|
|
|
|
|
dataset._prepare_to_run()
|
|
|
|
|
|
|
|
|
|
if fetch_handler is not None:
|
|
|
|
|
fetch_instance = fetch_handler
|
|
|
|
|
elif fetch_handler is None and fetch_list is not None:
|
|
|
|
|
|
|
|
|
|
class FH(FetchHandler):
|
|
|
|
|
def handler(self, fetch_target_vars):
|
|
|
|
|
for i in range(len(fetch_target_vars)):
|
|
|
|
|
print("{}: \n {}\n".format(fetch_info[i],
|
|
|
|
|
fetch_target_vars[i]))
|
|
|
|
|
|
|
|
|
|
fetch_target_names = [var.name for var in fetch_list]
|
|
|
|
|
fetch_instance = FH(fetch_target_names,
|
|
|
|
|
period_secs=print_period,
|
|
|
|
|
return_np=False)
|
|
|
|
|
else:
|
|
|
|
|
fetch_instance = FetchHandler([])
|
|
|
|
|
|
|
|
|
|
scope, trainer = self._prepare_trainer(
|
|
|
|
|
program=program,
|
|
|
|
|
dataset=dataset,
|
|
|
|
|
scope=scope,
|
|
|
|
|
thread=thread,
|
|
|
|
|
debug=debug)
|
|
|
|
|
|
|
|
|
|
trainer._set_infer(is_infer)
|
|
|
|
|
trainer._gen_trainer_desc()
|
|
|
|
|
|
|
|
|
|
self._dump_debug_info(program=program, trainer=trainer)
|
|
|
|
|
|
|
|
|
|
trainer_instance = self._default_executor.init_for_dataset(
|
|
|
|
|
program.desc, trainer._desc(), scope, dataset.dataset)
|
|
|
|
|
|
|
|
|
|
scope0 = trainer_instance.get_worker_scope(0)
|
|
|
|
|
|
|
|
|
|
fetch_monitor = FetchHandlerMonitor(scope0, fetch_instance)
|
|
|
|
|
fetch_monitor.start()
|
|
|
|
|
self._default_executor.run_from_dataset(trainer_instance)
|
|
|
|
|
fetch_monitor.stop()
|
|
|
|
|
dataset._finish_to_run()
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def infer_from_dataset(self,
|
|
|
|
|
program=None,
|
|
|
|
|
dataset=None,
|
|
|
|
@ -926,7 +1009,8 @@ class Executor(object):
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=None,
|
|
|
|
|
fetch_info=None,
|
|
|
|
|
print_period=100):
|
|
|
|
|
print_period=100,
|
|
|
|
|
fetch_handler=None):
|
|
|
|
|
"""
|
|
|
|
|
The document of infer_from_dataset is almost the same as
|
|
|
|
|
train_from_dataset, except that in distributed training,
|
|
|
|
@ -949,6 +1033,7 @@ class Executor(object):
|
|
|
|
|
will be printed during training, default is None
|
|
|
|
|
fetch_info(String List): print information for each variable, default is None
|
|
|
|
|
print_period(int): the number of mini-batches for each print, default is 100
|
|
|
|
|
fetch_handler(FetchHandler): a user define class for fetch output.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
@ -973,29 +1058,9 @@ class Executor(object):
|
|
|
|
|
dataset=dataset)
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if dataset == None:
|
|
|
|
|
raise RuntimeError("dataset is needed and should be initialized")
|
|
|
|
|
|
|
|
|
|
dataset._prepare_to_run()
|
|
|
|
|
scope, trainer = self._prepare_trainer(
|
|
|
|
|
program=program,
|
|
|
|
|
dataset=dataset,
|
|
|
|
|
scope=scope,
|
|
|
|
|
thread=thread,
|
|
|
|
|
debug=debug,
|
|
|
|
|
fetch_list=fetch_list,
|
|
|
|
|
fetch_info=fetch_info,
|
|
|
|
|
print_period=print_period)
|
|
|
|
|
trainer._set_infer(True)
|
|
|
|
|
trainer._gen_trainer_desc()
|
|
|
|
|
self._dump_debug_info(program=program, trainer=trainer)
|
|
|
|
|
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
|
|
|
|
|
self._default_executor.run_from_dataset(program.desc, scope,
|
|
|
|
|
dataset.dataset,
|
|
|
|
|
trainer._desc())
|
|
|
|
|
dataset._dynamic_adjust_after_train()
|
|
|
|
|
dataset._finish_to_run()
|
|
|
|
|
return None
|
|
|
|
|
return self._run_from_dataset(program, dataset, scope, thread, True,
|
|
|
|
|
debug, fetch_list, fetch_info,
|
|
|
|
|
print_period, fetch_handler)
|
|
|
|
|
|
|
|
|
|
def train_from_dataset(self,
|
|
|
|
|
program=None,
|
|
|
|
@ -1005,7 +1070,8 @@ class Executor(object):
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=None,
|
|
|
|
|
fetch_info=None,
|
|
|
|
|
print_period=100):
|
|
|
|
|
print_period=100,
|
|
|
|
|
fetch_handler=None):
|
|
|
|
|
"""
|
|
|
|
|
Train from a pre-defined Dataset. Dataset is defined in paddle.fluid.dataset.
|
|
|
|
|
Given a program, either a program or compiled program, train_from_dataset will
|
|
|
|
@ -1032,6 +1098,7 @@ class Executor(object):
|
|
|
|
|
will be printed during training
|
|
|
|
|
fetch_info(String List): print information for each variable
|
|
|
|
|
print_period(int): the number of mini-batches for each print
|
|
|
|
|
fetch_handler(FetchHandler): a user define class for fetch output.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
@ -1056,29 +1123,6 @@ class Executor(object):
|
|
|
|
|
dataset=dataset)
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if dataset == None:
|
|
|
|
|
raise RuntimeError("dataset is need and should be initialized")
|
|
|
|
|
|
|
|
|
|
if program._pipeline_opt:
|
|
|
|
|
thread = self._adjust_pipeline_resource(program._pipeline_opt,
|
|
|
|
|
dataset, thread)
|
|
|
|
|
|
|
|
|
|
dataset._prepare_to_run()
|
|
|
|
|
scope, trainer = self._prepare_trainer(
|
|
|
|
|
program=program,
|
|
|
|
|
dataset=dataset,
|
|
|
|
|
scope=scope,
|
|
|
|
|
thread=thread,
|
|
|
|
|
debug=debug,
|
|
|
|
|
fetch_list=fetch_list,
|
|
|
|
|
fetch_info=fetch_info,
|
|
|
|
|
print_period=print_period)
|
|
|
|
|
trainer._gen_trainer_desc()
|
|
|
|
|
self._dump_debug_info(program=program, trainer=trainer)
|
|
|
|
|
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
|
|
|
|
|
self._default_executor.run_from_dataset(program.desc, scope,
|
|
|
|
|
dataset.dataset,
|
|
|
|
|
trainer._desc())
|
|
|
|
|
dataset._dynamic_adjust_after_train()
|
|
|
|
|
dataset._finish_to_run()
|
|
|
|
|
return None
|
|
|
|
|
return self._run_from_dataset(program, dataset, scope, thread, False,
|
|
|
|
|
debug, fetch_list, fetch_info,
|
|
|
|
|
print_period, fetch_handler)
|
|
|
|
|