From ed55f1b9d49ed3f6609246159594da85810a9a2e Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 5 Jan 2018 19:25:30 +0800 Subject: [PATCH 01/14] transpiler_split_tensor --- .../paddle/v2/fluid/distribute_transpiler.py | 241 +++++++++++------- 1 file changed, 148 insertions(+), 93 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 49ece7b725..e5314cf272 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -1,51 +1,20 @@ +from __future__ import print_function import framework from framework import Program, default_main_program, Parameter, Variable import optimizer from layer_helper import LayerHelper +from distributed_spliter import * -def hash_name_to_server(params_grads, pserver_endpoints): - """ - :param param_grads: - :return: a map of pserver endpoint -> - params -> [param list] - grads -> [grad list] - """ +class VarBlock: + def __init__(self, varname, offset, size): + self.varname = varname + # NOTE: real offset is offset * size + self.offset = offset + self.size = size - def _hash_param(param_name, total): - return hash(param_name) % total - - param_grad_map = dict() - for param, grad in params_grads: - if param.trainable is True and grad is not None: - server_id = _hash_param(param.name, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - return param_grad_map - - -def round_robin(params_grads, pserver_endpoints): - assert (len(params_grads) > len(pserver_endpoints)) - - param_grad_map = dict() - pserver_idx = 0 - for param, grad in params_grads: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return param_grad_map + def __str__(self): + return "%s:%d:%d" % (self.varname, self.offset, self.size) class DistributeTranspiler: @@ -58,7 +27,6 @@ class DistributeTranspiler: split_method=round_robin): """ Transpile the program to a distributed data-parallelism programs. - The main_program will be transform to use a remote parameter server to do parameter optimization. And the optimization graph will be put in to a parameter server program. @@ -66,45 +34,84 @@ class DistributeTranspiler: Use different methods to split trainable varialbles to different parameter servers. - Example to run: - - exe = fluid.Executor(place) - t = fluid.DistributeTranspiler() - t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) - - pserver_endpoint = os.getenv("PSERVER") - if pserver_endpoint: - pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) - exe.run(fluid.default_startup_program()) - exe.run(pserver_prog) - else: - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - ... - :param optimize_ops: op list of optimization, should be the return value of Optimizer.minimize :type optimize_ops: list :param program: program to optimize, default default_main_program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :return: return a list of programs """ + assert (callable(split_method)) if program is None: program = default_main_program() self.program = program self.trainers = trainers self.optimize_ops = optimize_ops - self._optimize_distributed( - optimize_ops, - program, - params_grads, - pservers=pservers, - trainers=trainers, - split_method=split_method) + # steps to transpile: + # 1. split variable to multiple blocks, align by product(dim[1:]) (width). + # 2. modify trainer program add split_op to each Grad. + # 3. append send_op to trainer. + # 4. append concat_op to trainer to update local weights. + # 5. create new program as parameter server. + # 5. create parameter server program by split_method generated endpoint->VarBlock + # 6. run compile time infershape for parameter server program + + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + pserver_endpoints = kwargs["pservers"].split(",") + + grad2param = dict() + for param, grad in params_and_grads: + grad2param[grad.name()] = param.name() + + # step1 + param_list = [pg[0] for pg in params_and_grads] + grad_list = [pg[1] for pg in params_and_grads] + # TODO: add split selected rows support + grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints)) + param_blocks = _split_dense_variable(param_list, len(pserver_endpoints)) + ep2gradblock = split_method(grad_blocks, pserver_endpoints) + # self.param_grad_map + # step2 + var2splited = self._split_trainer_vars(program, grad_blocks) + + # step3 + send_inputs = [] + send_outputs = [] + for _, splited in var2splited.iteritems(): + send_inputs.extend(splited) + send_outputs = self._create_vars_from_blocklist(program, param_blocks) + + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_inputs}, + outputs={"Out": send_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": epmap}) + + def _create_vars_from_blocklist(self, program, block_list): + block_map = dict() + ret_vars = [] + for block_str in block_list: + varname, offset, size = block_str.split(":") + if not block_map.has_key(varname): + block_map[varname] = [] + block_map[varname].append((long(offset), long(size))) + + for varname, splited in block_map.iteritems(): + orig_var = program.global_block().vars[varname] + for block in splited: + size = block[1] + var = program.global_block().create_var( + name="%s.block%d" % (varname, i), + psersistable=False, + dtype=orig_var.dtype, + shape=[1, size]) # flattend splited var + ret_vars.append(var) + return ret_vars def _clone_param(self, block, v): assert isinstance(v, Parameter) @@ -131,32 +138,80 @@ class DistributeTranspiler: lod_level=var.lod_level, persistable=var.persistable) - def _optimize_distributed(self, optimize_ops, program, params_and_grads, - **kwargs): - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin + def _split_dense_variable(self, + var_list, + pserver_count, + min_block_size=1024, + max_block_size=1048576): + """ + We may need to split dense tensor to one or several blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + mininum block size is 1024. The max block size is used to prevent + too large block that may causing send error. + """ + block_sizes = [] + blocks = [] + for grad in var_list: + dim1 = reduce(lambda x, y: x * y, grad.shape[1:]) + grad_numel = reduce(lambda x, y: x * y, grad.shape) + if grad_numel < min_block_size: + block_sizes.append(grad_numel) + block_size = grad_numel / min_block_size + if block_size < min_block_size: + block_size = min_block_size + # align by dim1(width) + remains = block_size % dim1 + if remains != 0: + block_size += dim1 - remains + block_sizes.append(block_size) + num_blocks = grad_numel / block_size + print("grad numel :%d, blocksize: %d" % grad_numel, block_size) + for block_id in xrange(num_blocks): + block = VarBlock(grad.name(), block_id, block_size) + blocks.append(str(block)) + return blocks - assert (callable(split_method)) - pserver_endpoints = kwargs["pservers"].split(",") - self.param_grad_map = split_method(params_and_grads, pserver_endpoints) - - send_op_ordered_inputs = [] - send_op_ordered_outputs = [] - epmap = [] - for ep, v in self.param_grad_map.iteritems(): - send_op_ordered_inputs.extend(v["grads"]) - send_op_ordered_outputs.extend(v["params"]) - for i in v["grads"]: - epmap.append(ep) - send_op = program.global_block().append_op( - type="send", - inputs={"X": send_op_ordered_inputs - }, # inputs is a list of tensors to be send - outputs={"Out": send_op_ordered_outputs}, - attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) + def _split_trainer_vars(self, program, gradblocks, params_and_grads): + var2blocks = dict() + splited = dict() + for block_str in gradblocks: + varname, offset, size = block_str.split(":") + if not var2blocks.has_key(varname): + var2blocks[varname] = [] + var2blocks[varname].append((long(offset), long(size))) + for varname, blocks in var2blocks.iteritems(): + orig_var = program.global_block().vars[varname] + split_outs = [] + for i in xrange(len(blocks)): + size = blocks[i][1] + var = program.global_block().create_var( + name="%s.block%d" % (varname, i), + psersistable=False, + dtype=orig_var.dtype, + shape=[1, size]) # flattend splited var + split_outs.append(var) + + splited[varname] = split_outs + program.global_block().append_op( + type="split", + inputs={"X": orig_var}, + outputs={"Out": split_outs}, + attrs={"num": len(blocks)} # assume split evenly + ) + return splited + + def _concat_trainer_vars(self, program, splited): + for varname, to_merge_list in splited.iteritems(): + orig_var = program.global_block().vars[varname] + program.global_block().append_op( + type="concat", + inputs={"X": to_merge_list}, + outputs={"Out": orig_var}, + attrs={}) def get_trainer_program(self): # remove optimize ops and add a send op to main_program From c70ea1cc30cfa54228d10e56934119e3a68e3e51 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 5 Jan 2018 19:35:52 +0800 Subject: [PATCH 02/14] add splitter --- .../paddle/v2/fluid/distribute_transpiler.py | 1 - python/paddle/v2/fluid/distributed_spliter.py | 38 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 python/paddle/v2/fluid/distributed_spliter.py diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index e5314cf272..4c90b4a853 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -80,7 +80,6 @@ class DistributeTranspiler: # step3 send_inputs = [] - send_outputs = [] for _, splited in var2splited.iteritems(): send_inputs.extend(splited) send_outputs = self._create_vars_from_blocklist(program, param_blocks) diff --git a/python/paddle/v2/fluid/distributed_spliter.py b/python/paddle/v2/fluid/distributed_spliter.py new file mode 100644 index 0000000000..e7ba53390d --- /dev/null +++ b/python/paddle/v2/fluid/distributed_spliter.py @@ -0,0 +1,38 @@ +def hash_name(varblocks, pserver_endpoints): + """ + :param varblocks: a list of VarBlock string indicating + sub blocks of variables + :return: a map of pserver endpoint -> varblock_str + """ + + def _hash_block(block_str, total): + return hash(block_str) % total + + ep2block = dict() + for varblock_str in varblocks: + if param.trainable is True and grad is not None: + server_id = _hash_block(varblock_str, len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + if not ep2block.has_key(server_for_param): + ep2block[server_for_param] = [] + ep2block[server_for_param].append(varblock_str) + + return ep2block + + +def round_robin(varblocks, pserver_endpoints): + assert (len(varblocks) > len(pserver_endpoints)) + + ep2block = dict() + pserver_idx = 0 + for varblock_str in varblocks: + if param.trainable is True: + server_for_param = pserver_endpoints[pserver_idx] + if not ep2block.has_key(server_for_param): + ep2block[server_for_param] = [] + ep2block[server_for_param].append(varblock_str) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return ep2block From f35c56060c4af6a8b36ceb8ad9021c447d6dc2a0 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 8 Jan 2018 19:59:44 +0800 Subject: [PATCH 03/14] split tensor to pservers --- .../paddle/v2/fluid/distribute_transpiler.py | 187 +++++++------- .../v2/fluid/distribute_transpiler_simple.py | 242 ++++++++++++++++++ python/paddle/v2/fluid/distributed_spliter.py | 51 ++-- .../tests/book_distribute/test_split_var.py | 38 +++ 4 files changed, 398 insertions(+), 120 deletions(-) create mode 100644 python/paddle/v2/fluid/distribute_transpiler_simple.py create mode 100644 python/paddle/v2/fluid/tests/book_distribute/test_split_var.py diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 4c90b4a853..58d32bac12 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -4,6 +4,7 @@ from framework import Program, default_main_program, Parameter, Variable import optimizer from layer_helper import LayerHelper from distributed_spliter import * +import math class VarBlock: @@ -17,6 +18,47 @@ class VarBlock: return "%s:%d:%d" % (self.varname, self.offset, self.size) +def split_dense_variable(var_list, + pserver_count, + min_block_size=1024, + max_block_size=1048576): + """ + We may need to split dense tensor to one or several blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + mininum block size is 1024. The max block size is used to prevent + too large block that may causing send error. + """ + blocks = [] + for var in var_list: + split_count = pserver_count + var_numel = reduce(lambda x, y: x * y, var.shape) + max_pserver_count = int(math.floor(var_numel / float(min_block_size))) + if max_pserver_count == 0: + max_pserver_count = 1 + if max_pserver_count < pserver_count: + split_count = max_pserver_count + block_size = int(math.ceil(var_numel / float(split_count))) + + if len(var.shape) >= 2: + # align by dim1(width) + dim1 = reduce(lambda x, y: x * y, var.shape[1:]) + remains = block_size % dim1 + if remains != 0: + block_size += dim1 - remains + # update split_count after align + split_count = int(math.ceil(var_numel / float(block_size))) + for block_id in xrange(split_count): + curr_block_size = min(block_size, var_numel - ( + (block_id) * block_size)) + block = VarBlock(var.name, block_id, curr_block_size) + blocks.append(str(block)) + return blocks + + class DistributeTranspiler: def transpile(self, optimize_ops, @@ -57,43 +99,49 @@ class DistributeTranspiler: # 5. create parameter server program by split_method generated endpoint->VarBlock # 6. run compile time infershape for parameter server program - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin - pserver_endpoints = kwargs["pservers"].split(",") - - grad2param = dict() - for param, grad in params_and_grads: - grad2param[grad.name()] = param.name() + pserver_endpoints = pservers.split(",") # step1 - param_list = [pg[0] for pg in params_and_grads] - grad_list = [pg[1] for pg in params_and_grads] + param_list = [pg[0] for pg in params_grads] + grad_list = [pg[1] for pg in params_grads] # TODO: add split selected rows support - grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints)) - param_blocks = _split_dense_variable(param_list, len(pserver_endpoints)) - ep2gradblock = split_method(grad_blocks, pserver_endpoints) - # self.param_grad_map + grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) + param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 - var2splited = self._split_trainer_vars(program, grad_blocks) + grad_var_mapping = self._append_split_op(program, grad_blocks) # step3 send_inputs = [] - for _, splited in var2splited.iteritems(): + send_outputs = [] + for _, splited in grad_var_mapping.iteritems(): send_inputs.extend(splited) - send_outputs = self._create_vars_from_blocklist(program, param_blocks) + param_var_mapping = self._create_vars_from_blocklist(program, + param_blocks) + for _, splited in param_var_mapping.iteritems(): + send_outputs.extend(splited) + # let send_op know which endpoint to send which var, eplist is of the same + # order of send_inputs. + eplist = split_method(send_inputs, pserver_endpoints) send_op = program.global_block().append_op( type="send", inputs={"X": send_inputs}, outputs={"Out": send_outputs}, attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) + "epmap": eplist}) + + # step4 + for varname, splited_var in param_var_mapping.iteritems(): + orig_param = program.global_block().vars[varname] + concat = program.global_block().append_op( + type="concat", + inputs={"X": send_outputs}, + outputs={"Out": orig_param}, + attrs={"axis": 0}) def _create_vars_from_blocklist(self, program, block_list): block_map = dict() - ret_vars = [] + var_mapping = dict() for block_str in block_list: varname, offset, size = block_str.split(":") if not block_map.has_key(varname): @@ -102,15 +150,26 @@ class DistributeTranspiler: for varname, splited in block_map.iteritems(): orig_var = program.global_block().vars[varname] - for block in splited: + orig_shape = orig_var.shape + orig_dim1_flatten = 1 + if len(orig_shape) >= 2: + orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:]) + var_list = [] + for i, block in enumerate(splited): size = block[1] + rows = size / orig_dim1_flatten + splited_shape = [rows] + if len(orig_shape) >= 2: + splited_shape.extend(orig_shape[1:]) + print("block, splited shape:", block, splited_shape) var = program.global_block().create_var( name="%s.block%d" % (varname, i), psersistable=False, dtype=orig_var.dtype, - shape=[1, size]) # flattend splited var - ret_vars.append(var) - return ret_vars + shape=splited_shape) # flattend splited var + var_list.append(var) + var_mapping[varname] = var_list + return var_mapping def _clone_param(self, block, v): assert isinstance(v, Parameter) @@ -137,80 +196,22 @@ class DistributeTranspiler: lod_level=var.lod_level, persistable=var.persistable) - def _split_dense_variable(self, - var_list, - pserver_count, - min_block_size=1024, - max_block_size=1048576): - """ - We may need to split dense tensor to one or several blocks and put - them equally onto parameter server. One block is a sub-tensor - aligned by dim[0] of the tensor. - - We need to have a minimal block size so that the calculations in - the parameter server side can gain better performance. By default - mininum block size is 1024. The max block size is used to prevent - too large block that may causing send error. - """ - block_sizes = [] - blocks = [] - for grad in var_list: - dim1 = reduce(lambda x, y: x * y, grad.shape[1:]) - grad_numel = reduce(lambda x, y: x * y, grad.shape) - if grad_numel < min_block_size: - block_sizes.append(grad_numel) - block_size = grad_numel / min_block_size - if block_size < min_block_size: - block_size = min_block_size - # align by dim1(width) - remains = block_size % dim1 - if remains != 0: - block_size += dim1 - remains - block_sizes.append(block_size) - num_blocks = grad_numel / block_size - print("grad numel :%d, blocksize: %d" % grad_numel, block_size) - for block_id in xrange(num_blocks): - block = VarBlock(grad.name(), block_id, block_size) - blocks.append(str(block)) - return blocks - - def _split_trainer_vars(self, program, gradblocks, params_and_grads): - var2blocks = dict() - splited = dict() - for block_str in gradblocks: - varname, offset, size = block_str.split(":") - if not var2blocks.has_key(varname): - var2blocks[varname] = [] - var2blocks[varname].append((long(offset), long(size))) - for varname, blocks in var2blocks.iteritems(): + def _append_split_op(self, program, gradblocks): + var_mapping = self._create_vars_from_blocklist(program, gradblocks) + for varname, splited_vars in var_mapping.iteritems(): + if len(splited_vars) == 1: + continue orig_var = program.global_block().vars[varname] - split_outs = [] - for i in xrange(len(blocks)): - size = blocks[i][1] - var = program.global_block().create_var( - name="%s.block%d" % (varname, i), - psersistable=False, - dtype=orig_var.dtype, - shape=[1, size]) # flattend splited var - split_outs.append(var) - - splited[varname] = split_outs + sections = [] + for v in splited_vars: + sections.append(v.shape[0]) program.global_block().append_op( type="split", inputs={"X": orig_var}, - outputs={"Out": split_outs}, - attrs={"num": len(blocks)} # assume split evenly + outputs={"Out": splited_vars}, + attrs={"sections": sections} # assume split evenly ) - return splited - - def _concat_trainer_vars(self, program, splited): - for varname, to_merge_list in splited.iteritems(): - orig_var = program.global_block().vars[varname] - program.global_block().append_op( - type="concat", - inputs={"X": to_merge_list}, - outputs={"Out": orig_var}, - attrs={}) + return var_mapping def get_trainer_program(self): # remove optimize ops and add a send op to main_program diff --git a/python/paddle/v2/fluid/distribute_transpiler_simple.py b/python/paddle/v2/fluid/distribute_transpiler_simple.py new file mode 100644 index 0000000000..49ece7b725 --- /dev/null +++ b/python/paddle/v2/fluid/distribute_transpiler_simple.py @@ -0,0 +1,242 @@ +import framework +from framework import Program, default_main_program, Parameter, Variable +import optimizer +from layer_helper import LayerHelper + + +def hash_name_to_server(params_grads, pserver_endpoints): + """ + :param param_grads: + :return: a map of pserver endpoint -> + params -> [param list] + grads -> [grad list] + """ + + def _hash_param(param_name, total): + return hash(param_name) % total + + param_grad_map = dict() + for param, grad in params_grads: + if param.trainable is True and grad is not None: + server_id = _hash_param(param.name, len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + return param_grad_map + + +def round_robin(params_grads, pserver_endpoints): + assert (len(params_grads) > len(pserver_endpoints)) + + param_grad_map = dict() + pserver_idx = 0 + for param, grad in params_grads: + if param.trainable is True: + server_for_param = pserver_endpoints[pserver_idx] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return param_grad_map + + +class DistributeTranspiler: + def transpile(self, + optimize_ops, + params_grads, + program=None, + pservers="127.0.0.1:6174", + trainers=1, + split_method=round_robin): + """ + Transpile the program to a distributed data-parallelism programs. + + The main_program will be transform to use a remote parameter server + to do parameter optimization. And the optimization graph will be put + in to a parameter server program. + + Use different methods to split trainable varialbles to different + parameter servers. + + Example to run: + + exe = fluid.Executor(place) + t = fluid.DistributeTranspiler() + t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) + + pserver_endpoint = os.getenv("PSERVER") + if pserver_endpoint: + pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) + exe.run(fluid.default_startup_program()) + exe.run(pserver_prog) + else: + feeder = fluid.DataFeeder(feed_list=[images, label], place=place) + exe.run(fluid.default_startup_program()) + + for pass_id in range(PASS_NUM): + ... + + :param optimize_ops: op list of optimization, should be the + return value of Optimizer.minimize + :type optimize_ops: list + :param program: program to optimize, default default_main_program + :param pservers: parameter server endpoints like "m1:6174,m2:6174" + :type pservers: string + + :return: return a list of programs + """ + if program is None: + program = default_main_program() + self.program = program + self.trainers = trainers + self.optimize_ops = optimize_ops + self._optimize_distributed( + optimize_ops, + program, + params_grads, + pservers=pservers, + trainers=trainers, + split_method=split_method) + + def _clone_param(self, block, v): + assert isinstance(v, Parameter) + new_p = Parameter( + block=block, + shape=v.shape, + dtype=v.dtype, + type=v.type, + lod_level=v.lod_level, + stop_gradient=v.stop_gradient, + trainable=v.trainable, + optimize_attr=v.optimize_attr, + regularizer=v.regularizer, + name=v.name) + block.vars[new_p.name] = new_p + + def _clone_var(self, block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=var.persistable) + + def _optimize_distributed(self, optimize_ops, program, params_and_grads, + **kwargs): + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + + assert (callable(split_method)) + pserver_endpoints = kwargs["pservers"].split(",") + self.param_grad_map = split_method(params_and_grads, pserver_endpoints) + + send_op_ordered_inputs = [] + send_op_ordered_outputs = [] + epmap = [] + for ep, v in self.param_grad_map.iteritems(): + send_op_ordered_inputs.extend(v["grads"]) + send_op_ordered_outputs.extend(v["params"]) + for i in v["grads"]: + epmap.append(ep) + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_op_ordered_inputs + }, # inputs is a list of tensors to be send + outputs={"Out": send_op_ordered_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": epmap}) + + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def _create_var_for_trainers(self, block, var, trainers): + var_list = [] + for i in xrange(trainers): + var_each = block.create_var( + name="%s.trainer_%d" % (var.name, i), + psersistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + var_list.append(var_each) + return var_list + + def get_pserver_program(self, endpoint, optimize_ops): + pserver_program = Program() + for v in self.param_grad_map[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + + optimize_sub_program = Program() + grad_var_names = [ + var.name for var in self.param_grad_map[endpoint]["grads"] + ] + for opt_op in optimize_ops: + for _, var in opt_op.inputs.iteritems(): + # NOTE: append operators to merge gradients from multiple + # trainers. If trainers == 1, this is not needed. + if self.trainers > 1 and var.name in grad_var_names: + vars2merge = self._create_var_for_trainers( + optimize_sub_program.global_block(), var, self.trainers) + merged_var = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type="sum", + inputs={"X": vars2merge}, + outputs={"Out": merged_var}) + optimize_sub_program.global_block().append_op( + type="scale", + inputs={"X": merged_var}, + outputs={"Out": merged_var}, + attrs={"scale": 1.0 / float(self.trainers)}) + else: + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + + if opt_op.inputs.has_key("Grad"): + if opt_op.inputs["Grad"].name in grad_var_names: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + else: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": + self.param_grad_map[endpoint]["grads"]}, # grads to recv + outputs={}, + attrs={ + "OptimizeProgram": optimize_sub_program.desc, + "endpoint": endpoint, + "ParamList": + [p.name for p in self.param_grad_map[endpoint]["params"]], + "GradList": + [p.name for p in self.param_grad_map[endpoint]["grads"]], + "Trainers": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program diff --git a/python/paddle/v2/fluid/distributed_spliter.py b/python/paddle/v2/fluid/distributed_spliter.py index e7ba53390d..eff30f7bb6 100644 --- a/python/paddle/v2/fluid/distributed_spliter.py +++ b/python/paddle/v2/fluid/distributed_spliter.py @@ -1,38 +1,35 @@ -def hash_name(varblocks, pserver_endpoints): +def hash_name(varlist, pserver_endpoints): """ - :param varblocks: a list of VarBlock string indicating - sub blocks of variables - :return: a map of pserver endpoint -> varblock_str + hash variable names to several endpoints. + + :param varlist: a list of Variables + :return: a map of pserver endpoint -> varname """ def _hash_block(block_str, total): return hash(block_str) % total - ep2block = dict() - for varblock_str in varblocks: - if param.trainable is True and grad is not None: - server_id = _hash_block(varblock_str, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not ep2block.has_key(server_for_param): - ep2block[server_for_param] = [] - ep2block[server_for_param].append(varblock_str) - - return ep2block + eplist = [] + for var in varlist: + server_id = _hash_block(var.name(), len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + eplist.append(server_for_param) + return eplist -def round_robin(varblocks, pserver_endpoints): - assert (len(varblocks) > len(pserver_endpoints)) +def round_robin(varlist, pserver_endpoints): + """ + distribute variables to several endpoints. + """ + assert (len(varlist) > len(pserver_endpoints)) - ep2block = dict() + eplist = [] pserver_idx = 0 - for varblock_str in varblocks: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not ep2block.has_key(server_for_param): - ep2block[server_for_param] = [] - ep2block[server_for_param].append(varblock_str) + for var in varlist: + server_for_param = pserver_endpoints[pserver_idx] + eplist.append(server_for_param) - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return ep2block + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return eplist diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py new file mode 100644 index 0000000000..1355e13e1c --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py @@ -0,0 +1,38 @@ +import math +import unittest +from paddle.v2.fluid.distribute_transpiler import split_dense_variable +import paddle.v2.fluid as fluid +import random + + +class TestSplitVar(unittest.TestCase): + def test_check_output(self): + # split below shapes to 10 servers + shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]] + expected_sizes = [ + [15], [1024], + [2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784], + [2040, 2040, 2040, 2040], + [1150, 1150, 1150, 1150, 1150, 1150, 1100] + ] + var_list = [] + program = fluid.Program() + for shape in shapes: + var = program.global_block().create_var( + name=str(random.randint(10000)), + persistable=True, + dtype=core.VarDesc.VarType.LOD_TENSOR, + shape=shape) + var_list.append(var) + blocks = split_dense_variable(var_list, 10) + all_sizes = [] + for s in expected_sizes: + for s2 in s: + all_sizes.append(s2) + for i, block_str in enumerate(blocks): + varname, block_id, size = block_str.split(":") + self.assertEqual(int(size), all_sizes[i]) + + +if __name__ == '__main__': + unittest.main() From 56e758fc10e2db646a82bc8a93c44d6427718c11 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 9 Jan 2018 16:23:41 +0800 Subject: [PATCH 04/14] trainer ok --- .../paddle/v2/fluid/distribute_transpiler.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 58d32bac12..7f3da67463 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -56,6 +56,8 @@ def split_dense_variable(var_list, (block_id) * block_size)) block = VarBlock(var.name, block_id, curr_block_size) blocks.append(str(block)) + print("$$ splited var: ", var.name, var.shape, split_count, len(blocks), + block_size) return blocks @@ -132,10 +134,12 @@ class DistributeTranspiler: # step4 for varname, splited_var in param_var_mapping.iteritems(): + if len(splited_var) <= 1: + continue orig_param = program.global_block().vars[varname] concat = program.global_block().append_op( type="concat", - inputs={"X": send_outputs}, + inputs={"X": splited_var}, outputs={"Out": orig_param}, attrs={"axis": 0}) @@ -147,28 +151,29 @@ class DistributeTranspiler: if not block_map.has_key(varname): block_map[varname] = [] block_map[varname].append((long(offset), long(size))) - for varname, splited in block_map.iteritems(): orig_var = program.global_block().vars[varname] + var_mapping[varname] = [] + if len(splited) == 1: + var_mapping[varname] = [orig_var] + continue orig_shape = orig_var.shape orig_dim1_flatten = 1 if len(orig_shape) >= 2: orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:]) - var_list = [] + for i, block in enumerate(splited): size = block[1] rows = size / orig_dim1_flatten splited_shape = [rows] if len(orig_shape) >= 2: splited_shape.extend(orig_shape[1:]) - print("block, splited shape:", block, splited_shape) var = program.global_block().create_var( name="%s.block%d" % (varname, i), psersistable=False, dtype=orig_var.dtype, shape=splited_shape) # flattend splited var - var_list.append(var) - var_mapping[varname] = var_list + var_mapping[varname].append(var) return var_mapping def _clone_param(self, block, v): @@ -199,7 +204,8 @@ class DistributeTranspiler: def _append_split_op(self, program, gradblocks): var_mapping = self._create_vars_from_blocklist(program, gradblocks) for varname, splited_vars in var_mapping.iteritems(): - if len(splited_vars) == 1: + # variable that don't need to split have empty splited_vars + if len(splited_vars) <= 1: continue orig_var = program.global_block().vars[varname] sections = [] From 9c0b1cf1c2e28f7f6fb7b7a39c8bdffe07981bcd Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 9 Jan 2018 20:32:51 +0800 Subject: [PATCH 05/14] update wip pserver transpile --- .../paddle/v2/fluid/distribute_transpiler.py | 152 ++++++++++++------ 1 file changed, 106 insertions(+), 46 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 7f3da67463..ac13a7cb60 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -98,8 +98,7 @@ class DistributeTranspiler: # 3. append send_op to trainer. # 4. append concat_op to trainer to update local weights. # 5. create new program as parameter server. - # 5. create parameter server program by split_method generated endpoint->VarBlock - # 6. run compile time infershape for parameter server program + # 6. create parameter server program by split_method generated endpoint->VarBlock pserver_endpoints = pservers.split(",") @@ -124,6 +123,15 @@ class DistributeTranspiler: # let send_op know which endpoint to send which var, eplist is of the same # order of send_inputs. eplist = split_method(send_inputs, pserver_endpoints) + # create mapping of endpoint -> var to create pserver side program + self.param_grad_ep_mapping = dict() + for i, ep in enumerate(eplist): + param = send_outputs[i] + grad = send_inputs[i] + if not self.param_grad_ep_mapping.has_key(ep): + self.param_grad_ep_mapping[ep] = {"params": [], "grads": []} + self.param_grad_ep_mapping[ep]["params"].append(param) + self.param_grad_ep_mapping[ep]["grads"].append(grad) send_op = program.global_block().append_op( type="send", @@ -235,27 +243,29 @@ class DistributeTranspiler: var_list.append(var_each) return var_list - def get_pserver_program(self, endpoint, optimize_ops): - pserver_program = Program() - for v in self.param_grad_map[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) - - optimize_sub_program = Program() - grad_var_names = [ - var.name for var in self.param_grad_map[endpoint]["grads"] - ] - for opt_op in optimize_ops: - for _, var in opt_op.inputs.iteritems(): - # NOTE: append operators to merge gradients from multiple - # trainers. If trainers == 1, this is not needed. - if self.trainers > 1 and var.name in grad_var_names: + def _append_pserver_ops(self, opt_op, endpoint): + new_inputs = dict() + for key, var in opt_op.inputs.iteritems(): + if key == "Grad": + grad_block = None + for g in self.param_grad_ep_mapping[endpoint]["grads"]: + if g.name.startswith(var.name): + grad_block = g + break + if not grad_block: + # do not append this op if current endpoint + # is not dealing with this grad block + return + merged_var = optimize_sub_program.global_block().create_var( + name=grad_block.name, + persistable=grad_block.persistable, + dtype=grad_block.dtype, + shape=grad_block.shape) + # append merging ops if trainers > 1 + if self.trainers > 1: vars2merge = self._create_var_for_trainers( - optimize_sub_program.global_block(), var, self.trainers) - merged_var = optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) + optimize_sub_program.global_block(), grad_block, + self.trainers) optimize_sub_program.global_block().append_op( type="sum", inputs={"X": vars2merge}, @@ -265,38 +275,88 @@ class DistributeTranspiler: inputs={"X": merged_var}, outputs={"Out": merged_var}, attrs={"scale": 1.0 / float(self.trainers)}) - else: - optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) + new_inputs[key] = merged_var + elif key == "Param": + # param is already created on global program + param_block = None + for p in self.param_grad_ep_mapping[endpoint]["params"]: + if p.name.startswith(var.name): + param_block = p + break + if not param_block: + return + tmpvar = optimize_sub_program.global_block().create_var( + name=param_block.name, + persistable=param_block.persistable, + dtype=param_block.dtype, + shape=param_block.shape) + new_inputs[key] = tmpvar + else: + tmpvar = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + new_inputs[key] = tmpvar - if opt_op.inputs.has_key("Grad"): - if opt_op.inputs["Grad"].name in grad_var_names: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + # FIXME: change outputs ParamOut + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=new_inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def _append_pserver_non_opt_ops(self, opt_op): + for _, var in opt_op.inputs.iteritems(): + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=new_inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def get_pserver_program(self, endpoint, optimize_ops): + """ + get pserver side program by endpoint + + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. For each pserver endpoint, server side + program must be a sub-set of the original optimization program. + """ + # step5 + pserver_program = Program() + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + # step6 + optimize_sub_program = Program() + for opt_op in optimize_ops: + if opt_ops.inputs.has_key("Grad"): + # append optimize_op + self._append_pserver_ops(opt_op, endpoint) else: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + self._append_pserver_non_opt_ops(opt_op) + pserver_program.global_block().append_op( type="recv", - inputs={"RX": - self.param_grad_map[endpoint]["grads"]}, # grads to recv + inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] + }, # grads to recv outputs={}, attrs={ "OptimizeProgram": optimize_sub_program.desc, "endpoint": endpoint, - "ParamList": - [p.name for p in self.param_grad_map[endpoint]["params"]], - "GradList": - [p.name for p in self.param_grad_map[endpoint]["grads"]], + "ParamList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["params"] + ], + "GradList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["grads"] + ], "Trainers": self.trainers }) pserver_program.sync_with_cpp() From 50a02adf5e08abaa24f4c74d904be833c2f9e68a Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 10 Jan 2018 11:59:59 +0800 Subject: [PATCH 06/14] transpile program ok --- .../paddle/v2/fluid/distribute_transpiler.py | 93 +++++++++++++------ 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index ac13a7cb60..76e8734f13 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -114,12 +114,15 @@ class DistributeTranspiler: # step3 send_inputs = [] send_outputs = [] - for _, splited in grad_var_mapping.iteritems(): - send_inputs.extend(splited) + for b in grad_blocks: # append by order + varname, block_id, _ = b.split(":") + send_inputs.append(grad_var_mapping[varname][int(block_id)]) + param_var_mapping = self._create_vars_from_blocklist(program, param_blocks) - for _, splited in param_var_mapping.iteritems(): - send_outputs.extend(splited) + for b in param_blocks: + varname, block_id, _ = b.split(":") + send_outputs.append(param_var_mapping[varname][int(block_id)]) # let send_op know which endpoint to send which var, eplist is of the same # order of send_inputs. eplist = split_method(send_inputs, pserver_endpoints) @@ -243,8 +246,37 @@ class DistributeTranspiler: var_list.append(var_each) return var_list - def _append_pserver_ops(self, opt_op, endpoint): + def _get_optimizer_input_shape(self, op_type, varkey, orig_shape, + param_shape): + """ + Returns the shape for optimizer inputs that need to be reshaped when + Param and Grad is splited to multiple servers. + """ + # HACK(typhoonzero): Should use functions of corresponding optimizer in + # optimizer.py to get the shape, do not bind this in the transpiler. + if op_type == "adam": + if varkey in ["Moment1", "Moment2"]: + return param_shape + elif op_type == "adagrad": + if varkey == "Moment": + return param_shape + elif op_type == "adamax": + if varkey in ["Moment", "InfNorm"]: + return param_shape + elif op_type == "momentum": + if varkey == "Velocity": + return param_shape + elif op_type == "": + if varkey == "Moment": + return param_shape + elif op_type == "sgd": + pass + return orig_shape + + def _append_pserver_ops(self, program, opt_op, endpoint): new_inputs = dict() + # update param/grad shape first, then other inputs like + # moment can use the updated shape for key, var in opt_op.inputs.iteritems(): if key == "Grad": grad_block = None @@ -256,7 +288,7 @@ class DistributeTranspiler: # do not append this op if current endpoint # is not dealing with this grad block return - merged_var = optimize_sub_program.global_block().create_var( + merged_var = program.global_block().create_var( name=grad_block.name, persistable=grad_block.persistable, dtype=grad_block.dtype, @@ -264,13 +296,12 @@ class DistributeTranspiler: # append merging ops if trainers > 1 if self.trainers > 1: vars2merge = self._create_var_for_trainers( - optimize_sub_program.global_block(), grad_block, - self.trainers) - optimize_sub_program.global_block().append_op( + program.global_block(), grad_block, self.trainers) + program.global_block().append_op( type="sum", inputs={"X": vars2merge}, outputs={"Out": merged_var}) - optimize_sub_program.global_block().append_op( + program.global_block().append_op( type="scale", inputs={"X": merged_var}, outputs={"Out": merged_var}, @@ -285,37 +316,45 @@ class DistributeTranspiler: break if not param_block: return - tmpvar = optimize_sub_program.global_block().create_var( + tmpvar = program.global_block().create_var( name=param_block.name, persistable=param_block.persistable, dtype=param_block.dtype, shape=param_block.shape) new_inputs[key] = tmpvar - else: - tmpvar = optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - new_inputs[key] = tmpvar + + for key, var in opt_op.inputs.iteritems(): + if key in ["Param", "Grad"]: + continue + # update accumulator variable shape + param_shape = new_inputs["Param"].shape + new_shape = self._get_optimizer_input_shape(opt_op.type, key, + var.shape, param_shape) + print("var, new shape", key, var.name, new_shape) + tmpvar = program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=new_shape) + new_inputs[key] = tmpvar # FIXME: change outputs ParamOut - optimize_sub_program.global_block().append_op( + program.global_block().append_op( type=opt_op.type, inputs=new_inputs, outputs=opt_op.outputs, attrs=opt_op.attrs) - def _append_pserver_non_opt_ops(self, opt_op): + def _append_pserver_non_opt_ops(self, program, opt_op): for _, var in opt_op.inputs.iteritems(): - optimize_sub_program.global_block().create_var( + program.global_block().create_var( name=var.name, persistable=var.persistable, dtype=var.dtype, shape=var.shape) - optimize_sub_program.global_block().append_op( + program.global_block().append_op( type=opt_op.type, - inputs=new_inputs, + inputs=opt_op.inputs, outputs=opt_op.outputs, attrs=opt_op.attrs) @@ -331,15 +370,15 @@ class DistributeTranspiler: # step5 pserver_program = Program() for v in self.param_grad_ep_mapping[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) + self._clone_var(pserver_program.global_block(), v) # step6 optimize_sub_program = Program() for opt_op in optimize_ops: - if opt_ops.inputs.has_key("Grad"): + if opt_op.inputs.has_key("Grad"): # append optimize_op - self._append_pserver_ops(opt_op, endpoint) + self._append_pserver_ops(optimize_sub_program, opt_op, endpoint) else: - self._append_pserver_non_opt_ops(opt_op) + self._append_pserver_non_opt_ops(optimize_sub_program, opt_op) pserver_program.global_block().append_op( type="recv", From 6fa56b9d014d1c82c1fe41d7395bce8484c4ba2e Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 10 Jan 2018 20:40:54 +0800 Subject: [PATCH 07/14] left startup program bug --- .../paddle/v2/fluid/distribute_transpiler.py | 82 +++++++++++++------ 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 76e8734f13..009f079e83 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -56,8 +56,6 @@ def split_dense_variable(var_list, (block_id) * block_size)) block = VarBlock(var.name, block_id, curr_block_size) blocks.append(str(block)) - print("$$ splited var: ", var.name, var.shape, split_count, len(blocks), - block_size) return blocks @@ -126,7 +124,7 @@ class DistributeTranspiler: # let send_op know which endpoint to send which var, eplist is of the same # order of send_inputs. eplist = split_method(send_inputs, pserver_endpoints) - # create mapping of endpoint -> var to create pserver side program + # create mapping of endpoint -> splited var to create pserver side program self.param_grad_ep_mapping = dict() for i, ep in enumerate(eplist): param = send_outputs[i] @@ -142,7 +140,6 @@ class DistributeTranspiler: outputs={"Out": send_outputs}, attrs={"endpoints": pserver_endpoints, "epmap": eplist}) - # step4 for varname, splited_var in param_var_mapping.iteritems(): if len(splited_var) <= 1: @@ -187,21 +184,6 @@ class DistributeTranspiler: var_mapping[varname].append(var) return var_mapping - def _clone_param(self, block, v): - assert isinstance(v, Parameter) - new_p = Parameter( - block=block, - shape=v.shape, - dtype=v.dtype, - type=v.type, - lod_level=v.lod_level, - stop_gradient=v.stop_gradient, - trainable=v.trainable, - optimize_attr=v.optimize_attr, - regularizer=v.regularizer, - name=v.name) - block.vars[new_p.name] = new_p - def _clone_var(self, block, var): assert isinstance(var, Variable) return block.create_var( @@ -210,7 +192,9 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - persistable=var.persistable) + # HACK: let all param in pserver persistable so child + # program in recv can get them + persistable=True) def _append_split_op(self, program, gradblocks): var_mapping = self._create_vars_from_blocklist(program, gradblocks) @@ -318,9 +302,10 @@ class DistributeTranspiler: return tmpvar = program.global_block().create_var( name=param_block.name, - persistable=param_block.persistable, + persistable=True, dtype=param_block.dtype, shape=param_block.shape) + new_inputs[key] = tmpvar for key, var in opt_op.inputs.iteritems(): @@ -330,7 +315,6 @@ class DistributeTranspiler: param_shape = new_inputs["Param"].shape new_shape = self._get_optimizer_input_shape(opt_op.type, key, var.shape, param_shape) - print("var, new shape", key, var.name, new_shape) tmpvar = program.global_block().create_var( name=var.name, persistable=var.persistable, @@ -338,7 +322,8 @@ class DistributeTranspiler: shape=new_shape) new_inputs[key] = tmpvar - # FIXME: change outputs ParamOut + # change outputs ParamOut variable + opt_op.outputs["ParamOut"] = new_inputs["Param"] program.global_block().append_op( type=opt_op.type, inputs=new_inputs, @@ -380,6 +365,7 @@ class DistributeTranspiler: else: self._append_pserver_non_opt_ops(optimize_sub_program, opt_op) + print("####", optimize_sub_program) pserver_program.global_block().append_op( type="recv", inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] @@ -400,3 +386,53 @@ class DistributeTranspiler: }) pserver_program.sync_with_cpp() return pserver_program + + def get_startup_program(self, endpoint): + """ + Get startup program for current parameter server. + Modify operator input variables if there are variables that + was splited to several blocks. + """ + s_prog = Program() + orig_s_prog = framework.default_startup_program() + params = self.param_grad_ep_mapping[endpoint]["params"] + + def _get_splited_name_and_shape(varname): + for idx, splited_param in enumerate(params): + pname = splited_param.name + if pname.startswith(varname) and varname != pname: + return pname, splited_param.shape + return "", [] + + # 1. create vars + created_var_map = dict() + for var in params: + print("%%%% append var", var.name, var.shape) + tmpvar = s_prog.global_block().create_var( + name=var.name, + persistable=True, + dtype=var.dtype, + shape=var.shape) + created_var_map[var.name] = tmpvar + + # 2. rename op outputs + for op in orig_s_prog.global_block().ops: + new_outputs = dict() + for key, var in op.outputs.iteritems(): + newname, _ = _get_splited_name_and_shape(var.name) + if newname: + new_outputs[key] = created_var_map[newname] + else: + new_outputs[key] = var + # do not append startup op if var is not on this pserver + var_on_pserver = False + for _, var in new_outputs.iteritems(): + if var.name in created_var_map: + var_on_pserver = True + if var_on_pserver: + s_prog.global_block().append_op( + type=op.type, + inputs=op.inputs, + outputs=new_outputs, + attrs=op.attrs) + return s_prog From 2827607fa891328e31f84dc328301754b3c6ba1c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 10 Jan 2018 23:04:05 +0800 Subject: [PATCH 08/14] fix startup program shape --- python/paddle/v2/fluid/distribute_transpiler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 009f079e83..b064220ca2 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -365,7 +365,6 @@ class DistributeTranspiler: else: self._append_pserver_non_opt_ops(optimize_sub_program, opt_op) - print("####", optimize_sub_program) pserver_program.global_block().append_op( type="recv", inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] @@ -407,7 +406,6 @@ class DistributeTranspiler: # 1. create vars created_var_map = dict() for var in params: - print("%%%% append var", var.name, var.shape) tmpvar = s_prog.global_block().create_var( name=var.name, persistable=True, @@ -430,6 +428,8 @@ class DistributeTranspiler: if var.name in created_var_map: var_on_pserver = True if var_on_pserver: + # gaussian_random use attr to determine tensor shape + op.attrs["shape"] = new_outputs["Out"].shape s_prog.global_block().append_op( type=op.type, inputs=op.inputs, From 5325313e4cb1fb8c45cffcc223239cf5d85620af Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 11 Jan 2018 20:06:13 +0800 Subject: [PATCH 09/14] debugging shape match --- .../paddle/v2/fluid/distribute_transpiler.py | 95 +++++++++++++++---- 1 file changed, 79 insertions(+), 16 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index b064220ca2..75e103cb80 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -257,7 +257,45 @@ class DistributeTranspiler: pass return orig_shape - def _append_pserver_ops(self, program, opt_op, endpoint): + def _is_op_on_pserver(self, endpoint, all_ops, idx): + """ + Recursively check if the op need to run on current server. + Assume that ops are in the execution order. + """ + param_names = [ + p.name for p in self.param_grad_ep_mapping[endpoint]["params"] + ] + op = all_ops[idx] + if op.inputs.has_key("Param"): + if op.inputs["Param"].name in param_names: + return True + else: + for n in param_names: + if n.startswith(op.inputs["Param"].name+".block") and \ + n != op.inputs["Param"].name: + return True + return False + else: + j = idx - 1 + while j >= 0: + prev_op = all_ops[j] + prev_output_names = [o.name for o in prev_op.outputs.values()] + prev_input_names = [o.name for o in prev_op.inputs.values()] + found1 = False + found2 = False + for _, v in op.inputs.iteritems(): + if v.name in prev_output_names: + found1 = self._is_op_on_pserver(endpoint, all_ops, j) + # later ops may produce output for prev op's next batch use. + for _, v in op.outputs.iteritems(): + if v.name in prev_input_names: + found2 = self._is_op_on_pserver(endpoint, all_ops, j) + if found1 or found2: + return True + j -= 1 + return False + + def _append_pserver_ops(self, program, pserver_program, opt_op, endpoint): new_inputs = dict() # update param/grad shape first, then other inputs like # moment can use the updated shape @@ -321,6 +359,14 @@ class DistributeTranspiler: dtype=var.dtype, shape=new_shape) new_inputs[key] = tmpvar + # create var in pserver program global block. + # TODO(typhoonzero): put blocks in one program to avoid create two + # variables. + pserver_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=new_shape) # change outputs ParamOut variable opt_op.outputs["ParamOut"] = new_inputs["Param"] @@ -330,13 +376,18 @@ class DistributeTranspiler: outputs=opt_op.outputs, attrs=opt_op.attrs) - def _append_pserver_non_opt_ops(self, program, opt_op): + def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op): for _, var in opt_op.inputs.iteritems(): program.global_block().create_var( name=var.name, persistable=var.persistable, dtype=var.dtype, shape=var.shape) + pserver_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) program.global_block().append_op( type=opt_op.type, inputs=opt_op.inputs, @@ -358,13 +409,18 @@ class DistributeTranspiler: self._clone_var(pserver_program.global_block(), v) # step6 optimize_sub_program = Program() - for opt_op in optimize_ops: + for idx, opt_op in enumerate(optimize_ops): + is_op_on_pserver = self._is_op_on_pserver(endpoint, optimize_ops, + idx) + if not is_op_on_pserver: + continue if opt_op.inputs.has_key("Grad"): - # append optimize_op - self._append_pserver_ops(optimize_sub_program, opt_op, endpoint) + self._append_pserver_ops(optimize_sub_program, pserver_program, + opt_op, endpoint) else: - self._append_pserver_non_opt_ops(optimize_sub_program, opt_op) - + self._append_pserver_non_opt_ops(optimize_sub_program, + pserver_program, opt_op) + print("****subprogram", optimize_sub_program) pserver_program.global_block().append_op( type="recv", inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] @@ -386,7 +442,7 @@ class DistributeTranspiler: pserver_program.sync_with_cpp() return pserver_program - def get_startup_program(self, endpoint): + def get_startup_program(self, endpoint, pserver_program): """ Get startup program for current parameter server. Modify operator input variables if there are variables that @@ -405,13 +461,17 @@ class DistributeTranspiler: # 1. create vars created_var_map = dict() - for var in params: + for _, var in pserver_program.global_block().vars.iteritems(): + print("create var for startup", var.name, var.shape) tmpvar = s_prog.global_block().create_var( name=var.name, - persistable=True, + persistable=var.persistable, dtype=var.dtype, shape=var.shape) created_var_map[var.name] = tmpvar + optimize_op_input_var_names = [ + v.name for v in pserver_program.global_block().vars.values() + ] # 2. rename op outputs for op in orig_s_prog.global_block().ops: @@ -423,13 +483,16 @@ class DistributeTranspiler: else: new_outputs[key] = var # do not append startup op if var is not on this pserver - var_on_pserver = False - for _, var in new_outputs.iteritems(): - if var.name in created_var_map: - var_on_pserver = True - if var_on_pserver: + op_on_pserver = False + for _, var in op.outputs.iteritems(): + if var.name in optimize_op_input_var_names: + op_on_pserver = True + break + + if op_on_pserver: # gaussian_random use attr to determine tensor shape - op.attrs["shape"] = new_outputs["Out"].shape + if op.type in ["gaussian_random", "fill_constant"]: + op.attrs["shape"] = new_outputs["Out"].shape s_prog.global_block().append_op( type=op.type, inputs=op.inputs, From 5d901d00bf9c93225548d707b1c3b79634b801b4 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 11 Jan 2018 22:41:24 +0800 Subject: [PATCH 10/14] update --- .../paddle/v2/fluid/distribute_transpiler.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 75e103cb80..59e74e0d6f 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -459,9 +459,10 @@ class DistributeTranspiler: return pname, splited_param.shape return "", [] - # 1. create vars + # 1. create vars in pserver program to startup program + pserver_vars = pserver_program.global_block().vars created_var_map = dict() - for _, var in pserver_program.global_block().vars.iteritems(): + for _, var in pserver_vars.iteritems(): print("create var for startup", var.name, var.shape) tmpvar = s_prog.global_block().create_var( name=var.name, @@ -469,30 +470,36 @@ class DistributeTranspiler: dtype=var.dtype, shape=var.shape) created_var_map[var.name] = tmpvar - optimize_op_input_var_names = [ - v.name for v in pserver_program.global_block().vars.values() - ] # 2. rename op outputs for op in orig_s_prog.global_block().ops: new_outputs = dict() + # do not append startup op if var is not on this pserver + op_on_pserver = False for key, var in op.outputs.iteritems(): newname, _ = _get_splited_name_and_shape(var.name) if newname: + op_on_pserver = True new_outputs[key] = created_var_map[newname] - else: - new_outputs[key] = var - # do not append startup op if var is not on this pserver - op_on_pserver = False - for _, var in op.outputs.iteritems(): - if var.name in optimize_op_input_var_names: + elif var.name in pserver_vars: op_on_pserver = True - break + new_outputs[key] = pserver_vars[var.name] + + # newname, _ = _get_splited_name_and_shape(var.name) + # if newname: + # print("updating output", newname, created_var_map[newname]) + # new_outputs[key] = created_var_map[newname] + # else: + # print("no update output", key, var) + # new_outputs[key] = var + # if var.name in created_var_map or \ + # newname: + # op_on_pserver = True if op_on_pserver: - # gaussian_random use attr to determine tensor shape if op.type in ["gaussian_random", "fill_constant"]: op.attrs["shape"] = new_outputs["Out"].shape + print("updated shape", op.attrs["shape"]) s_prog.global_block().append_op( type=op.type, inputs=op.inputs, From 5faebab375d5e039f5f7cc3169b8de8167494d31 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 12 Jan 2018 14:18:55 +0800 Subject: [PATCH 11/14] Done, need support selectedrows --- .../paddle/v2/fluid/distribute_transpiler.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 59e74e0d6f..d17f9815cc 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -148,7 +148,7 @@ class DistributeTranspiler: concat = program.global_block().append_op( type="concat", inputs={"X": splited_var}, - outputs={"Out": orig_param}, + outputs={"Out": [orig_param]}, attrs={"axis": 0}) def _create_vars_from_blocklist(self, program, block_list): @@ -420,7 +420,6 @@ class DistributeTranspiler: else: self._append_pserver_non_opt_ops(optimize_sub_program, pserver_program, opt_op) - print("****subprogram", optimize_sub_program) pserver_program.global_block().append_op( type="recv", inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] @@ -463,7 +462,6 @@ class DistributeTranspiler: pserver_vars = pserver_program.global_block().vars created_var_map = dict() for _, var in pserver_vars.iteritems(): - print("create var for startup", var.name, var.shape) tmpvar = s_prog.global_block().create_var( name=var.name, persistable=var.persistable, @@ -485,21 +483,11 @@ class DistributeTranspiler: op_on_pserver = True new_outputs[key] = pserver_vars[var.name] - # newname, _ = _get_splited_name_and_shape(var.name) - # if newname: - # print("updating output", newname, created_var_map[newname]) - # new_outputs[key] = created_var_map[newname] - # else: - # print("no update output", key, var) - # new_outputs[key] = var - # if var.name in created_var_map or \ - # newname: - # op_on_pserver = True - if op_on_pserver: - if op.type in ["gaussian_random", "fill_constant"]: + if op.type in [ + "gaussian_random", "fill_constant", "uniform_random" + ]: op.attrs["shape"] = new_outputs["Out"].shape - print("updated shape", op.attrs["shape"]) s_prog.global_block().append_op( type=op.type, inputs=op.inputs, From c24da0d3eefa1cf0a9dcee9b96fa120ca2e421b3 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 12 Jan 2018 16:30:45 +0800 Subject: [PATCH 12/14] update unit test split var --- python/paddle/v2/fluid/tests/CMakeLists.txt | 1 + .../paddle/v2/fluid/tests/book_distribute/test_split_var.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index e795627bfe..9a0240cbf6 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -5,3 +5,4 @@ foreach(src ${TEST_OPS}) endforeach() add_subdirectory(book) +add_subdirectory(book_distribute) diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py index 1355e13e1c..cfb48a5915 100644 --- a/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py +++ b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py @@ -2,6 +2,7 @@ import math import unittest from paddle.v2.fluid.distribute_transpiler import split_dense_variable import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core import random @@ -19,9 +20,9 @@ class TestSplitVar(unittest.TestCase): program = fluid.Program() for shape in shapes: var = program.global_block().create_var( - name=str(random.randint(10000)), + name=str(random.randint(10000, 99999)), persistable=True, - dtype=core.VarDesc.VarType.LOD_TENSOR, + # dtype=core.VarDesc.VarType.LOD_TENSOR, shape=shape) var_list.append(var) blocks = split_dense_variable(var_list, 10) From 06b326b681e2d9a831e02a7f2f0a1e8c18c1bdeb Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 12 Jan 2018 17:07:11 +0800 Subject: [PATCH 13/14] follow comments --- python/paddle/v2/fluid/__init__.py | 3 ++- python/paddle/v2/fluid/distribute_transpiler_simple.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 5e01b87198..a14422ee92 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -18,13 +18,14 @@ from param_attr import ParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler +from distribute_transpiler_simple import SimpleDistributeTranspiler import clip Tensor = LoDTensor __all__ = framework.__all__ + executor.__all__ + [ 'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward', 'regularizer', 'LoDTensor', 'CPUPlace', 'CUDAPlace', 'Tensor', 'ParamAttr' - 'DataFeeder', 'clip', 'DistributeTranspiler' + 'DataFeeder', 'clip', 'SimpleDistributeTranspiler', 'DistributeTranspiler' ] diff --git a/python/paddle/v2/fluid/distribute_transpiler_simple.py b/python/paddle/v2/fluid/distribute_transpiler_simple.py index 49ece7b725..32db3df9aa 100644 --- a/python/paddle/v2/fluid/distribute_transpiler_simple.py +++ b/python/paddle/v2/fluid/distribute_transpiler_simple.py @@ -48,7 +48,7 @@ def round_robin(params_grads, pserver_endpoints): return param_grad_map -class DistributeTranspiler: +class SimpleDistributeTranspiler: def transpile(self, optimize_ops, params_grads, From c996eb8a0b89117b843da5f3c0124e94be9ad375 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 15 Jan 2018 11:59:59 +0800 Subject: [PATCH 14/14] rename dist tests --- .../{test_dist_fit_a_line.py => notest_dist_fit_a_line.py} | 0 ...abel_semantic_roles.py => notest_dist_label_semantic_roles.py} | 0 .../{test_dist_word2vec.py => notest_dist_word2vec.py} | 0 ...ment_conv_dist.py => notest_understand_sentiment_conv_dist.py} | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename python/paddle/v2/fluid/tests/book_distribute/{test_dist_fit_a_line.py => notest_dist_fit_a_line.py} (100%) rename python/paddle/v2/fluid/tests/book_distribute/{test_dist_label_semantic_roles.py => notest_dist_label_semantic_roles.py} (100%) rename python/paddle/v2/fluid/tests/book_distribute/{test_dist_word2vec.py => notest_dist_word2vec.py} (100%) rename python/paddle/v2/fluid/tests/book_distribute/{test_understand_sentiment_conv_dist.py => notest_understand_sentiment_conv_dist.py} (100%) diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_fit_a_line.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_fit_a_line.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_label_semantic_roles.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_label_semantic_roles.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_word2vec.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_word2vec.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_understand_sentiment_conv_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_understand_sentiment_conv_dist.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py