|
|
|
@ -64,7 +64,7 @@ def same_or_split_var(p_name, var_name):
|
|
|
|
|
return p_name == var_name or p_name.startswith(var_name + ".block")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def slice_variable(var_list, slice_count, min_block_size=8192):
|
|
|
|
|
def slice_variable(var_list, slice_count, min_block_size):
|
|
|
|
|
"""
|
|
|
|
|
We may need to split dense tensor to one or more blocks and put
|
|
|
|
|
them equally onto parameter server. One block is a sub-tensor
|
|
|
|
@ -110,6 +110,22 @@ def slice_variable(var_list, slice_count, min_block_size=8192):
|
|
|
|
|
return blocks
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DistributeTranspilerConfig(object):
|
|
|
|
|
"""
|
|
|
|
|
slice_var_up (bool): Do Tensor slice for pservers, default is True.
|
|
|
|
|
split_method (PSDispatcher): RoundRobin or HashName can be used
|
|
|
|
|
try to choose the best method to balance loads for pservers.
|
|
|
|
|
min_block_size (int): Minimum splitted element number in block.
|
|
|
|
|
According:https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156
|
|
|
|
|
We can use bandwidth effiently when data size is larger than 2MB.If you
|
|
|
|
|
want to change it, please be sure you see the slice_variable function.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
slice_var_up = True
|
|
|
|
|
split_method = None
|
|
|
|
|
min_block_size = 8192
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DistributeTranspiler(object):
|
|
|
|
|
"""
|
|
|
|
|
**DistributeTranspiler**
|
|
|
|
@ -146,13 +162,23 @@ class DistributeTranspiler(object):
|
|
|
|
|
trainer_program = t.get_trainer_program()
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config=None):
|
|
|
|
|
if config is not None:
|
|
|
|
|
self.config = config
|
|
|
|
|
else:
|
|
|
|
|
self.config = DistributeTranspilerConfig()
|
|
|
|
|
|
|
|
|
|
if self.config.split_method is None:
|
|
|
|
|
self.config.split_method = RoundRobin
|
|
|
|
|
|
|
|
|
|
assert (self.config.min_block_size >= 8192)
|
|
|
|
|
assert (self.config.split_method.__bases__[0] == PSDispatcher)
|
|
|
|
|
|
|
|
|
|
def transpile(self,
|
|
|
|
|
trainer_id,
|
|
|
|
|
program=None,
|
|
|
|
|
pservers="127.0.0.1:6174",
|
|
|
|
|
trainers=1,
|
|
|
|
|
slice_var_up=True,
|
|
|
|
|
split_method=RoundRobin,
|
|
|
|
|
sync_mode=True):
|
|
|
|
|
"""
|
|
|
|
|
Run the transpiler.
|
|
|
|
@ -165,12 +191,8 @@ class DistributeTranspiler(object):
|
|
|
|
|
pservers (str): comma separated ip:port string for the pserver
|
|
|
|
|
list.
|
|
|
|
|
trainers (int): number of trainers in the distributed job.
|
|
|
|
|
slice_var_up (bool): Do Tensor slice for pservers, default is True.
|
|
|
|
|
split_method (PSDispatcher): RoundRobin or HashName can be used
|
|
|
|
|
try to choose the best method to balance loads for pservers.
|
|
|
|
|
sync_mode (bool): Do sync training or not, default is True.
|
|
|
|
|
"""
|
|
|
|
|
assert (split_method.__bases__[0] == PSDispatcher)
|
|
|
|
|
if program is None:
|
|
|
|
|
program = default_main_program()
|
|
|
|
|
self.origin_program = program
|
|
|
|
@ -181,11 +203,11 @@ class DistributeTranspiler(object):
|
|
|
|
|
self.pserver_endpoints = pserver_endpoints
|
|
|
|
|
self.optimize_ops, self.params_grads = self._get_optimize_pass()
|
|
|
|
|
|
|
|
|
|
ps_dispatcher = split_method(self.pserver_endpoints)
|
|
|
|
|
ps_dispatcher = self.config.split_method(self.pserver_endpoints)
|
|
|
|
|
self.has_distributed_lookup_table = self._has_distributed_lookup_table()
|
|
|
|
|
|
|
|
|
|
# split and create vars, then put splited vars in dicts for later use.
|
|
|
|
|
self._init_splited_vars(slice_var_up)
|
|
|
|
|
self._init_splited_vars()
|
|
|
|
|
|
|
|
|
|
# step 3.1: insert send op to send gradient vars to parameter servers
|
|
|
|
|
ps_dispatcher.reset()
|
|
|
|
@ -197,14 +219,14 @@ class DistributeTranspiler(object):
|
|
|
|
|
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
|
|
|
|
|
# shuffle the map will avoid the uneven distribution above
|
|
|
|
|
grad_var_mapping_items = self.grad_var_mapping.items()
|
|
|
|
|
if not slice_var_up:
|
|
|
|
|
if not self.config.slice_var_up:
|
|
|
|
|
random.seed(self.trainer_num)
|
|
|
|
|
random.shuffle(grad_var_mapping_items)
|
|
|
|
|
|
|
|
|
|
for orig_varname, splited_vars in grad_var_mapping_items:
|
|
|
|
|
eplist = ps_dispatcher.dispatch(splited_vars)
|
|
|
|
|
|
|
|
|
|
if not slice_var_up:
|
|
|
|
|
if not self.config.slice_var_up:
|
|
|
|
|
assert (len(splited_vars) == 1)
|
|
|
|
|
|
|
|
|
|
if len(splited_vars) == 1:
|
|
|
|
@ -627,7 +649,7 @@ class DistributeTranspiler(object):
|
|
|
|
|
]
|
|
|
|
|
return param_list, grad_list
|
|
|
|
|
|
|
|
|
|
def _init_splited_vars(self, slice_var_up):
|
|
|
|
|
def _init_splited_vars(self):
|
|
|
|
|
# update these mappings for further transpile:
|
|
|
|
|
# 1. param_var_mapping: param var name -> [splited params vars]
|
|
|
|
|
# 2. grad_var_mapping: grad var name -> [splited grads vars]
|
|
|
|
@ -651,17 +673,22 @@ class DistributeTranspiler(object):
|
|
|
|
|
param_list, grad_list = self._update_dist_lookup_table_vars(
|
|
|
|
|
param_list, grad_list, self.params_grads)
|
|
|
|
|
|
|
|
|
|
if slice_var_up:
|
|
|
|
|
if self.config.slice_var_up:
|
|
|
|
|
# when we slice var up into blocks, we will slice the var according to
|
|
|
|
|
# pserver services' count. A pserver may have two or more listening ports.
|
|
|
|
|
grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints))
|
|
|
|
|
grad_blocks = slice_variable(grad_list,
|
|
|
|
|
len(self.pserver_endpoints),
|
|
|
|
|
self.config.min_block_size)
|
|
|
|
|
param_blocks = slice_variable(param_list,
|
|
|
|
|
len(self.pserver_endpoints))
|
|
|
|
|
len(self.pserver_endpoints),
|
|
|
|
|
self.config.min_block_size)
|
|
|
|
|
else:
|
|
|
|
|
# when we do NOT slice var up into blocks, we will always slice params
|
|
|
|
|
# grads into one block.
|
|
|
|
|
grad_blocks = slice_variable(grad_list, 1)
|
|
|
|
|
param_blocks = slice_variable(param_list, 1)
|
|
|
|
|
grad_blocks = slice_variable(grad_list, 1,
|
|
|
|
|
self.config.min_block_size)
|
|
|
|
|
param_blocks = slice_variable(param_list, 1,
|
|
|
|
|
self.config.min_block_size)
|
|
|
|
|
assert (len(grad_blocks) == len(param_blocks))
|
|
|
|
|
|
|
|
|
|
# origin_varname -> [splited_var]
|
|
|
|
@ -1001,6 +1028,7 @@ class DistributeTranspiler(object):
|
|
|
|
|
shape=splited_shape) # flattend splited var
|
|
|
|
|
var_mapping[varname].append(var)
|
|
|
|
|
program.global_block().sync_with_cpp()
|
|
|
|
|
|
|
|
|
|
return var_mapping
|
|
|
|
|
|
|
|
|
|
def create_splited_vars(self, source_var, block, tag):
|
|
|
|
|