|
|
|
@ -102,6 +102,8 @@ def split_dense_variable(var_list,
|
|
|
|
|
the parameter server side can gain better performance. By default
|
|
|
|
|
minimum block size is 1024. The max block size is used to prevent
|
|
|
|
|
very large blocks that may cause send error.
|
|
|
|
|
:return: A list of VarBlocks. Each VarBlock specifies a shard of
|
|
|
|
|
the var.
|
|
|
|
|
"""
|
|
|
|
|
blocks = []
|
|
|
|
|
for var in var_list:
|
|
|
|
@ -192,22 +194,24 @@ class DistributeTranspiler:
|
|
|
|
|
self.trainer_id = trainer_id
|
|
|
|
|
pserver_endpoints = pservers.split(",")
|
|
|
|
|
|
|
|
|
|
# step1
|
|
|
|
|
# step1: For large parameters and gradients, split them into smaller
|
|
|
|
|
# blocks.
|
|
|
|
|
param_list = [pg[0] for pg in params_grads]
|
|
|
|
|
grad_list = [pg[1] for pg in params_grads]
|
|
|
|
|
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
|
|
|
|
|
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
|
|
|
|
|
# step2
|
|
|
|
|
# step2: Create new vars for the parameters and gradients blocks and
|
|
|
|
|
# add ops to do the split.
|
|
|
|
|
grad_var_mapping = self._append_split_op(program, grad_blocks)
|
|
|
|
|
# step3
|
|
|
|
|
param_var_mapping = self._create_vars_from_blocklist(program,
|
|
|
|
|
param_blocks)
|
|
|
|
|
# step3: Add gradients as send op inputs and parameters as send
|
|
|
|
|
# op outputs.
|
|
|
|
|
send_inputs = []
|
|
|
|
|
send_outputs = []
|
|
|
|
|
for b in grad_blocks: # append by order
|
|
|
|
|
varname, block_id, _ = b.split(":")
|
|
|
|
|
send_inputs.append(grad_var_mapping[varname][int(block_id)])
|
|
|
|
|
|
|
|
|
|
param_var_mapping = self._create_vars_from_blocklist(program,
|
|
|
|
|
param_blocks)
|
|
|
|
|
for b in param_blocks:
|
|
|
|
|
varname, block_id, _ = b.split(":")
|
|
|
|
|
send_outputs.append(param_var_mapping[varname][int(block_id)])
|
|
|
|
@ -237,7 +241,7 @@ class DistributeTranspiler:
|
|
|
|
|
"RPCClient": rpc_client_var},
|
|
|
|
|
attrs={"endpoints": pserver_endpoints,
|
|
|
|
|
"epmap": eplist})
|
|
|
|
|
# step4
|
|
|
|
|
# step4: Concat the parameters splits together after recv.
|
|
|
|
|
for varname, splited_var in param_var_mapping.iteritems():
|
|
|
|
|
if len(splited_var) <= 1:
|
|
|
|
|
continue
|
|
|
|
@ -258,13 +262,14 @@ class DistributeTranspiler:
|
|
|
|
|
def get_pserver_program(self, endpoint):
|
|
|
|
|
"""
|
|
|
|
|
Get pserver side program using the endpoint.
|
|
|
|
|
TODO(panyx0718): Revisit this assumption. what if #blocks > #pservers.
|
|
|
|
|
NOTE: assume blocks of the same variable is not distributed
|
|
|
|
|
on the same pserver, only change param/grad varnames for
|
|
|
|
|
trainers to fetch.
|
|
|
|
|
"""
|
|
|
|
|
# step1
|
|
|
|
|
pserver_program = Program()
|
|
|
|
|
# step2
|
|
|
|
|
# step2: Create vars to receive vars at parameter servers.
|
|
|
|
|
recv_inputs = []
|
|
|
|
|
for v in self.param_grad_ep_mapping[endpoint]["params"]:
|
|
|
|
|
self._clone_var(pserver_program.global_block(), v)
|
|
|
|
@ -278,6 +283,8 @@ class DistributeTranspiler:
|
|
|
|
|
orig_var_name = v.name[:suff_idx]
|
|
|
|
|
else:
|
|
|
|
|
orig_var_name = v.name
|
|
|
|
|
#TODO(panyx0718): Should this be put in the else block below? It's
|
|
|
|
|
# only used there and it's called single_trainer_var.
|
|
|
|
|
single_trainer_var = pserver_program.global_block().create_var(
|
|
|
|
|
name=orig_var_name,
|
|
|
|
|
persistable=True,
|
|
|
|
@ -344,7 +351,7 @@ class DistributeTranspiler:
|
|
|
|
|
self._append_pserver_non_opt_ops(block, op)
|
|
|
|
|
|
|
|
|
|
append_block = optimize_block
|
|
|
|
|
# append lr decay ops to the child block if exits
|
|
|
|
|
# append lr decay ops to the child block if exists
|
|
|
|
|
lr_ops = self._get_lr_ops()
|
|
|
|
|
if len(lr_ops) > 0:
|
|
|
|
|
for _, op in enumerate(lr_ops):
|
|
|
|
@ -447,8 +454,10 @@ class DistributeTranspiler:
|
|
|
|
|
block_list,
|
|
|
|
|
add_trainer_suffix=False):
|
|
|
|
|
"""
|
|
|
|
|
Create vars for each split.
|
|
|
|
|
NOTE: only grads need to be named for different trainers, use
|
|
|
|
|
add_trainer_suffix to rename the grad vars.
|
|
|
|
|
:return: A dict mapping from original var name to each var split.
|
|
|
|
|
"""
|
|
|
|
|
block_map = dict()
|
|
|
|
|
var_mapping = dict()
|
|
|
|
@ -615,6 +624,7 @@ class DistributeTranspiler:
|
|
|
|
|
type="sum",
|
|
|
|
|
inputs={"X": vars2merge},
|
|
|
|
|
outputs={"Out": merged_var})
|
|
|
|
|
# TODO(panyx0718): What if it's SELECTED_ROWS.
|
|
|
|
|
if not merged_var.type == core.VarDesc.VarType.SELECTED_ROWS:
|
|
|
|
|
optimize_block.append_op(
|
|
|
|
|
type="scale",
|
|
|
|
@ -638,7 +648,7 @@ class DistributeTranspiler:
|
|
|
|
|
shape=param_block.shape)
|
|
|
|
|
new_inputs[key] = tmpvar
|
|
|
|
|
elif key == "LearningRate":
|
|
|
|
|
# leraning rate variable has already be created by non-optimize op,
|
|
|
|
|
# learning rate variable has already be created by non-optimize op,
|
|
|
|
|
# don't create it once again.
|
|
|
|
|
lr_varname = opt_op.input(key)[0]
|
|
|
|
|
if pserver_block.vars.has_key(lr_varname):
|
|
|
|
@ -773,6 +783,7 @@ class DistributeTranspiler:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _get_input_map_from_op(self, varmap, op):
|
|
|
|
|
"""Returns a dict from op input name to the vars in varmap."""
|
|
|
|
|
iomap = dict()
|
|
|
|
|
for key in op.input_names:
|
|
|
|
|
vars = []
|
|
|
|
@ -785,6 +796,7 @@ class DistributeTranspiler:
|
|
|
|
|
return iomap
|
|
|
|
|
|
|
|
|
|
def _get_output_map_from_op(self, varmap, op):
|
|
|
|
|
"""Returns a dict from op output name to the vars in varmap."""
|
|
|
|
|
iomap = dict()
|
|
|
|
|
for key in op.output_names:
|
|
|
|
|
vars = []
|
|
|
|
@ -812,6 +824,9 @@ class DistributeTranspiler:
|
|
|
|
|
find_ops.append(op)
|
|
|
|
|
# make a union find struct by the ops in default_main_program
|
|
|
|
|
ufind = UnionFind(block.ops)
|
|
|
|
|
|
|
|
|
|
# TODO(panyx0718): If lr_ops connects with other training
|
|
|
|
|
# ops, could they be considered as lr_ops?
|
|
|
|
|
for op1 in block.ops:
|
|
|
|
|
for op2 in block.ops:
|
|
|
|
|
# NOTE: we need to skip all optimize ops, since it is connected
|
|
|
|
|