You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/v2/fluid/distribute_transpiler.py

297 lines
12 KiB

from __future__ import print_function
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
from distributed_spliter import *
class VarBlock:
def __init__(self, varname, offset, size):
self.varname = varname
# NOTE: real offset is offset * size
self.offset = offset
self.size = size
def __str__(self):
return "%s:%d:%d" % (self.varname, self.offset, self.size)
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
assert (callable(split_method))
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
# steps to transpile:
# 1. split variable to multiple blocks, align by product(dim[1:]) (width).
# 2. modify trainer program add split_op to each Grad.
# 3. append send_op to trainer.
# 4. append concat_op to trainer to update local weights.
# 5. create new program as parameter server.
# 5. create parameter server program by split_method generated endpoint->VarBlock
# 6. run compile time infershape for parameter server program
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
pserver_endpoints = kwargs["pservers"].split(",")
grad2param = dict()
for param, grad in params_and_grads:
grad2param[grad.name()] = param.name()
# step1
param_list = [pg[0] for pg in params_and_grads]
grad_list = [pg[1] for pg in params_and_grads]
# TODO: add split selected rows support
grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = _split_dense_variable(param_list, len(pserver_endpoints))
ep2gradblock = split_method(grad_blocks, pserver_endpoints)
# self.param_grad_map
# step2
var2splited = self._split_trainer_vars(program, grad_blocks)
# step3
send_inputs = []
for _, splited in var2splited.iteritems():
send_inputs.extend(splited)
send_outputs = self._create_vars_from_blocklist(program, param_blocks)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def _create_vars_from_blocklist(self, program, block_list):
block_map = dict()
ret_vars = []
for block_str in block_list:
varname, offset, size = block_str.split(":")
if not block_map.has_key(varname):
block_map[varname] = []
block_map[varname].append((long(offset), long(size)))
for varname, splited in block_map.iteritems():
orig_var = program.global_block().vars[varname]
for block in splited:
size = block[1]
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var
ret_vars.append(var)
return ret_vars
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _split_dense_variable(self,
var_list,
pserver_count,
min_block_size=1024,
max_block_size=1048576):
"""
We may need to split dense tensor to one or several blocks and put
them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent
too large block that may causing send error.
"""
block_sizes = []
blocks = []
for grad in var_list:
dim1 = reduce(lambda x, y: x * y, grad.shape[1:])
grad_numel = reduce(lambda x, y: x * y, grad.shape)
if grad_numel < min_block_size:
block_sizes.append(grad_numel)
block_size = grad_numel / min_block_size
if block_size < min_block_size:
block_size = min_block_size
# align by dim1(width)
remains = block_size % dim1
if remains != 0:
block_size += dim1 - remains
block_sizes.append(block_size)
num_blocks = grad_numel / block_size
print("grad numel :%d, blocksize: %d" % grad_numel, block_size)
for block_id in xrange(num_blocks):
block = VarBlock(grad.name(), block_id, block_size)
blocks.append(str(block))
return blocks
def _split_trainer_vars(self, program, gradblocks, params_and_grads):
var2blocks = dict()
splited = dict()
for block_str in gradblocks:
varname, offset, size = block_str.split(":")
if not var2blocks.has_key(varname):
var2blocks[varname] = []
var2blocks[varname].append((long(offset), long(size)))
for varname, blocks in var2blocks.iteritems():
orig_var = program.global_block().vars[varname]
split_outs = []
for i in xrange(len(blocks)):
size = blocks[i][1]
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var
split_outs.append(var)
splited[varname] = split_outs
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": split_outs},
attrs={"num": len(blocks)} # assume split evenly
)
return splited
def _concat_trainer_vars(self, program, splited):
for varname, to_merge_list in splited.iteritems():
orig_var = program.global_block().vars[varname]
program.global_block().append_op(
type="concat",
inputs={"X": to_merge_list},
outputs={"Out": orig_var},
attrs={})
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program