|
|
|
@ -98,7 +98,7 @@ class MPIRoleMaker(RoleMakerBase):
|
|
|
|
|
"""
|
|
|
|
|
all_gather(obj) will call MPI's allgather function
|
|
|
|
|
"""
|
|
|
|
|
self.barrier_all()
|
|
|
|
|
self._barrier_all()
|
|
|
|
|
return self.comm_.allgather(obj)
|
|
|
|
|
|
|
|
|
|
def _barrier_all(self):
|
|
|
|
@ -112,7 +112,7 @@ class MPIRoleMaker(RoleMakerBase):
|
|
|
|
|
collect current distributed job's ip list
|
|
|
|
|
"""
|
|
|
|
|
if self.ips_ == None:
|
|
|
|
|
self.ips_ = self.comm_.allgather(self.get_local_ip())
|
|
|
|
|
self.ips_ = self.comm_.allgather(self._get_local_ip())
|
|
|
|
|
return self.ips_
|
|
|
|
|
|
|
|
|
|
def _finalize(self):
|
|
|
|
@ -146,7 +146,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
return whether current process is the first worker assigned by role maker
|
|
|
|
|
"""
|
|
|
|
|
if self._check_role_generation():
|
|
|
|
|
return self.is_worker() and 0 == self.worker_index()
|
|
|
|
|
return self._is_worker() and 0 == self._worker_index()
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _is_worker(self):
|
|
|
|
@ -170,8 +170,8 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
return the current number of worker
|
|
|
|
|
"""
|
|
|
|
|
if self._check_role_generation():
|
|
|
|
|
if self.is_worker():
|
|
|
|
|
return self.get_size() / 2
|
|
|
|
|
if self._is_worker():
|
|
|
|
|
return self._get_size() / 2
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def _server_num(self):
|
|
|
|
@ -179,8 +179,8 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
return the current number of server
|
|
|
|
|
"""
|
|
|
|
|
if self._check_role_generation():
|
|
|
|
|
if self.is_server():
|
|
|
|
|
return self.get_size() / 2
|
|
|
|
|
if self._is_server():
|
|
|
|
|
return self._get_size() / 2
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def _worker_index(self):
|
|
|
|
@ -204,7 +204,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
barrier all workers in current distributed job
|
|
|
|
|
"""
|
|
|
|
|
if self._check_role_generation():
|
|
|
|
|
if self.is_worker():
|
|
|
|
|
if self._is_worker():
|
|
|
|
|
self.node_type_comm_.barrier()
|
|
|
|
|
|
|
|
|
|
def _barrier_server(self):
|
|
|
|
@ -212,7 +212,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
barrier all servers in current distributed job
|
|
|
|
|
"""
|
|
|
|
|
if self._check_role_generation():
|
|
|
|
|
if self.is_server():
|
|
|
|
|
if self._is_server():
|
|
|
|
|
self.node_type_comm_.barrier()
|
|
|
|
|
|
|
|
|
|
def _generate_role(self):
|
|
|
|
@ -221,10 +221,10 @@ class MPISymetricRoleMaker(MPIRoleMaker):
|
|
|
|
|
"""
|
|
|
|
|
if not self.role_is_generated_:
|
|
|
|
|
# TODO(guru4elephant): only allow to be called once
|
|
|
|
|
self.trainer_endpoints_ = self.get_ips()
|
|
|
|
|
self.pserver_endpoints_ = self.get_ips()
|
|
|
|
|
self.trainer_endpoints_ = self._get_ips()
|
|
|
|
|
self.pserver_endpoints_ = self._get_ips()
|
|
|
|
|
|
|
|
|
|
if 0 == self.get_rank() % self.proc_per_node_ % 2:
|
|
|
|
|
if 0 == self._get_rank() % self.proc_per_node_ % 2:
|
|
|
|
|
self.node_type_ = 0
|
|
|
|
|
else:
|
|
|
|
|
self.node_type_ = 1
|
|
|
|
|