You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/v2/fluid/distribute_planner.py

191 lines
7.1 KiB

import framework
from backward import append_backward_ops
from regularizer import append_regularization_ops
import optimizer
from layer_helper import LayerHelper
__all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad']
def hash_name_to_server(parameters_and_grads, pserver_endpoints):
def _hash_param(param_name, total):
return hash(param_name) % total
param_map = dict()
grad_map = dict()
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
server_id = _hash_param(param_and_grad[0].name,
len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if param_map.has_key(server_for_param):
param_map[server_for_param].append(param_and_grad[0])
else:
param_map[server_for_param] = [param_and_grad[0]]
if grad_map.has_key(server_for_param):
grad_map[server_for_param].append(param_and_grad[1])
else:
grad_map[server_for_param] = [param_and_grad[1]]
return param_map, grad_map
def round_robin(parameters_and_grads, pserver_endpoints):
if len(parameters_and_grads) < len(pserver_endpoints):
raise Exception("parameters is less than pservers")
param_map = dict()
grad_map = dict()
pserver_idx = 0
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
server_for_param = pserver_endpoints[pserver_idx]
if param_map.has_key(server_for_param):
param_map[server_for_param].append(param_and_grad[0])
else:
param_map[server_for_param] = [param_and_grad[0]]
if grad_map.has_key(server_for_param):
grad_map[server_for_param].append(param_and_grad[1])
else:
grad_map[server_for_param] = [param_and_grad[1]]
pserver_idx += 1
if pserver_idx > len(pserver_endpoints):
pserver_idx = 0
return param_map, grad_map
def _append_sendop_for_trainer(loss,
parameters_and_grads,
pserver_endpoints,
split_method=round_robin):
assert (callable(split_method))
param_map, grad_map = \
split_method(parameters_and_grads, pserver_endpoints)
for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = loss.block.append_op(
type="send",
inputs={"X": param_map[ep]},
outputs={"Out": param_map[ep]},
attrs={"endpoint": ep})
return send_op
class DistributedPlanner(optimizer.Optimizer):
def __init__(self, global_step=None, parallelism_type='dp'):
"""
parallelism_type:
dp: data parallelism
mp: model parallelism
"""
super(DistributedPlanner).__init__(self, global_step)
if parallelism_type == "mp":
raise NotImplementedError("model parallelism not implemented")
elif parallelism_type == "dp":
self.parameter_server_program_map = dict()
self.worker_program = None
else:
raise NameError("parallelism_type %s not supported" %
parallelism_type)
def create_optimization_pass(self,
parameters_and_grads,
program,
startup_program=None):
# Create any accumulators
self.helper = LayerHelper(
self.__class__.__name__,
main_program=program,
startup_program=startup_program)
self._create_accumulators(program.global_block(),
[p[0] for p in parameters_and_grads])
optimize_ops = []
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
optimize_op = self._append_optimize_op(program.global_block(),
param_and_grad)
optimize_ops.append(optimize_op)
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(program.global_block())
if finish_ops is not None:
return_ops += finish_ops
if self._global_step is not None:
return_ops.append(
self._increment_global_step(program.global_block()))
return return_ops
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
split_method=round_robin):
"""
For distributed case, this call append backward ops and then
append sevaral send_ops at the end for each parameter server.
Then call get_pserver_program(idx/endpoint) will return the program of
coresponding pserver program to run.
"""
params_grads = append_backward_ops(loss, parameter_list, no_grad_set)
# Add regularization if any
params_grads = append_regularization_ops(params_grads)
_append_sendop_for_trainer(loss, params_grads, self.pserver_endpoints,
split_method)
self.worker_program = loss.block.program
optimize_sub_program = framework.Program()
optimize_ops = self.create_optimization_pass(
params_grads, optimize_sub_program, startup_program)
param_list = []
for param_and_grad in params_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
param_list.append(param_and_grad[0])
param_map, grad_map = \
split_method(params_grads, self.pserver_endpoints)
for ep in self.pserver_endpoints:
pserver_program = framework.Program()
self.parameter_server_program_map[ep] = pserver_program
pserver_program.global_block().append_op(
type="recv",
inputs={"RX": param_map[ep]},
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": ep
})
# FIXME(typhoonzero): when to use this return value?
return None
def get_pserver_program(self, endpoint):
return self.parameter_server_program_map.get(endpoint)
SGD = optimizer.SGDOptimizer
Momentum = optimizer.MomentumOptimizer
Adagrad = optimizer.AdagradOptimizer
Adam = optimizer.AdamOptimizer
Adamax = optimizer.AdamaxOptimizer
DecayedAdagrad = optimizer.DecayedAdagradOptimizer
for optcls in __all__:
eval(optcls).__base__ = DistributedPlanner