|
|
|
@ -87,6 +87,7 @@ class AsyncExecutor(object):
|
|
|
|
|
|
|
|
|
|
scope = global_scope()
|
|
|
|
|
self.executor = core.AsyncExecutor(scope, p)
|
|
|
|
|
self.instance = None
|
|
|
|
|
|
|
|
|
|
def run(self, program, data_feed, filelist, thread_num, fetch, mode="", debug=False):
|
|
|
|
|
"""
|
|
|
|
@ -154,6 +155,9 @@ class AsyncExecutor(object):
|
|
|
|
|
|
|
|
|
|
def download_data(self, afs_path, local_path, fs_default_name, ugi, process_num=12):
|
|
|
|
|
#hadoop_home = "$HADOOP_HOME"
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
|
|
|
|
|
hadoop_home = "~/tools/hadoop-xingtian/hadoop/"
|
|
|
|
|
|
|
|
|
|
configs = {
|
|
|
|
@ -182,15 +186,21 @@ class AsyncExecutor(object):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def get_instance(self):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
return self.instance
|
|
|
|
|
|
|
|
|
|
def stop_server(self):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
self.instance.barrier_all() #worker do all things
|
|
|
|
|
if self.instance.is_first_worker():
|
|
|
|
|
self.executor.stop_server()
|
|
|
|
|
self.instance.barrier_all() #sync
|
|
|
|
|
|
|
|
|
|
def init_server(self, dist_desc):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
self.executor.init_server(dist_desc, self.instance._rankid)
|
|
|
|
|
ip = self.executor.start_server()
|
|
|
|
|
self.instance.set_ip(ip)
|
|
|
|
@ -204,6 +214,8 @@ class AsyncExecutor(object):
|
|
|
|
|
self.instance.barrier_all() #sync
|
|
|
|
|
|
|
|
|
|
def init_worker(self, dist_desc, startup_program):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
place = core.CPUPlace()
|
|
|
|
|
executor = Executor(place)
|
|
|
|
|
executor.run(startup_program)
|
|
|
|
@ -217,8 +229,12 @@ class AsyncExecutor(object):
|
|
|
|
|
self.instance.barrier_all() #wait init model
|
|
|
|
|
|
|
|
|
|
def init_model(self):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
self.executor.init_model()
|
|
|
|
|
|
|
|
|
|
def save_model(self, save_path):
|
|
|
|
|
if self.instance is None:
|
|
|
|
|
raise ValueError('instance is None, please run config_distributed_nodes init instance')
|
|
|
|
|
self.executor.save_model(save_path)
|
|
|
|
|
|
|
|
|
|