|
|
|
@ -2661,57 +2661,67 @@ class ExponentialMovingAverage(object):
|
|
|
|
|
class PipelineOptimizer(object):
|
|
|
|
|
"""
|
|
|
|
|
Pipeline Optimizer
|
|
|
|
|
Train with pipeline mode. The program will be splited by cut_list.
|
|
|
|
|
If the len of cut_list is k, then the whole program (including
|
|
|
|
|
backward part) will be splited to 2*k-1 sections. So the length of place_list
|
|
|
|
|
and concurrency_list must be also 2*k-1.
|
|
|
|
|
Note: Though the asynchronous mode is applied in pipeline training to speed up,
|
|
|
|
|
|
|
|
|
|
Train with pipeline mode. The program will be splited by cut_list.
|
|
|
|
|
|
|
|
|
|
If the len of cut_list is k, then the whole program (including \
|
|
|
|
|
backward part) will be splited to 2*k-1 sections.
|
|
|
|
|
|
|
|
|
|
So the length of place_list and concurrency_list must be also 2*k-1.
|
|
|
|
|
|
|
|
|
|
Note: Though the asynchronous mode is applied in pipeline training to speed up, \
|
|
|
|
|
the final performance depends on the training progress of each pipeline heavily.
|
|
|
|
|
And we will try the synchronous mode in the future
|
|
|
|
|
|
|
|
|
|
And we will try the synchronous mode in the future.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
optimizer (Optimizer): The based optimizer, such as SGD
|
|
|
|
|
cut_list (list of Variable list): The cut variable of the main_program
|
|
|
|
|
place_list (list of Place): The place where the section will run on
|
|
|
|
|
concurrency_list (list of int): The concurrency degree
|
|
|
|
|
optimizer (Optimizer): The based optimizer, such as SGD.
|
|
|
|
|
cut_list (list of Variable list): The cut variable of the main_program.
|
|
|
|
|
place_list (list of Place): The place where the section will run on.
|
|
|
|
|
concurrency_list (list of int): The concurrency degree.
|
|
|
|
|
queue_size (int): Each section will consume scopes from its in-scope queue
|
|
|
|
|
and produce scopes to out-scope queue. And this parameter
|
|
|
|
|
specify the scope queue size. [Optional. Default: 30]
|
|
|
|
|
sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]
|
|
|
|
|
start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]
|
|
|
|
|
specify the scope queue size. [Optional. Default: 30].
|
|
|
|
|
sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1].
|
|
|
|
|
start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0].
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
|
|
|
|
|
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
|
|
|
|
|
concat = layers.concat([emb_x, emb_y], axis=1)
|
|
|
|
|
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
|
|
|
|
|
loss = layers.reduce_mean(fc)
|
|
|
|
|
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
|
|
|
|
|
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
|
|
|
|
|
cut_list=[[emb_x, emb_y], [loss]],
|
|
|
|
|
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
|
|
|
|
|
concurrency_list=[1, 1, 4],
|
|
|
|
|
queue_size=2,
|
|
|
|
|
sync_steps=1,
|
|
|
|
|
)
|
|
|
|
|
optimizer.minimize(loss)
|
|
|
|
|
place = fluid.CPUPlace()
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(fluid.default_startup_program())
|
|
|
|
|
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
|
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
|
|
|
|
|
dataset.set_use_var([x,y])
|
|
|
|
|
dataset.set_batch_size(batch_size)
|
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
|
exe.train_from_dataset(
|
|
|
|
|
fluid.default_main_program(),
|
|
|
|
|
dataset,
|
|
|
|
|
thread=2,
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=[],
|
|
|
|
|
fetch_info=[],
|
|
|
|
|
print_period=1)
|
|
|
|
|
|
|
|
|
|
import paddle.fluid.layers as layers
|
|
|
|
|
|
|
|
|
|
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
|
|
|
|
|
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
|
|
|
|
|
concat = layers.concat([emb_x, emb_y], axis=1)
|
|
|
|
|
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
|
|
|
|
|
loss = layers.reduce_mean(fc)
|
|
|
|
|
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
|
|
|
|
|
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
|
|
|
|
|
cut_list=[[emb_x, emb_y], [loss]],
|
|
|
|
|
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
|
|
|
|
|
concurrency_list=[1, 1, 4],
|
|
|
|
|
queue_size=2,
|
|
|
|
|
sync_steps=1,
|
|
|
|
|
)
|
|
|
|
|
optimizer.minimize(loss)
|
|
|
|
|
place = fluid.CPUPlace()
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(fluid.default_startup_program())
|
|
|
|
|
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
|
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
|
|
|
|
|
dataset.set_use_var([x,y])
|
|
|
|
|
dataset.set_batch_size(batch_size)
|
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
|
exe.train_from_dataset(
|
|
|
|
|
fluid.default_main_program(),
|
|
|
|
|
dataset,
|
|
|
|
|
thread=2,
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=[],
|
|
|
|
|
fetch_info=[],
|
|
|
|
|
print_period=1)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
@ -2731,7 +2741,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
self._sync_steps = sync_steps
|
|
|
|
|
self._start_cpu_core_id = start_cpu_core_id
|
|
|
|
|
|
|
|
|
|
def create_vars(self, block, main_program):
|
|
|
|
|
def _create_vars(self, block, main_program):
|
|
|
|
|
used_var_set = set()
|
|
|
|
|
for op_idx in range(block.desc.op_size()):
|
|
|
|
|
op_desc = block.desc.op(op_idx)
|
|
|
|
@ -2743,7 +2753,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
source_var = main_program.block(0).var(str(var))
|
|
|
|
|
block._clone_variable(source_var, False)
|
|
|
|
|
|
|
|
|
|
def extract_section_opt_ops(self, ops, cut_point_name):
|
|
|
|
|
def _extract_section_opt_ops(self, ops, cut_point_name):
|
|
|
|
|
"""
|
|
|
|
|
Extract opt ops in the given section
|
|
|
|
|
"""
|
|
|
|
@ -2759,7 +2769,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]]
|
|
|
|
|
return op_path
|
|
|
|
|
|
|
|
|
|
def find_input_output(self, ops, name, is_forward=True):
|
|
|
|
|
def _find_input_output(self, ops, name, is_forward=True):
|
|
|
|
|
"""
|
|
|
|
|
Find the inputs or outputs of a section
|
|
|
|
|
"""
|
|
|
|
@ -2774,7 +2784,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
all_set.update(op.desc.input_arg_names())
|
|
|
|
|
return all_set - part_set
|
|
|
|
|
|
|
|
|
|
def find_persistable_vars(self, ops, whole_parameters):
|
|
|
|
|
def _find_persistable_vars(self, ops, whole_parameters):
|
|
|
|
|
"""
|
|
|
|
|
find the persistable input vars in current section
|
|
|
|
|
"""
|
|
|
|
@ -2802,7 +2812,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def extract_section_ops(self, ops, cut_point_name):
|
|
|
|
|
def _extract_section_ops(self, ops, cut_point_name):
|
|
|
|
|
"""
|
|
|
|
|
Extract ops in the given section
|
|
|
|
|
"""
|
|
|
|
@ -2822,11 +2832,11 @@ class PipelineOptimizer(object):
|
|
|
|
|
op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]]
|
|
|
|
|
return op_path
|
|
|
|
|
|
|
|
|
|
def find_section_opt(self, ops, params):
|
|
|
|
|
res = self.extract_section_opt_ops(ops, params)
|
|
|
|
|
def _find_section_opt(self, ops, params):
|
|
|
|
|
res = self._extract_section_opt_ops(ops, params)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
def split_program(self, main_program, cut_list):
|
|
|
|
|
def _split_program(self, main_program, cut_list):
|
|
|
|
|
programs = []
|
|
|
|
|
block = main_program.block(0)
|
|
|
|
|
whole_parameters = [e.name for e in block.all_parameters()]
|
|
|
|
@ -2847,24 +2857,24 @@ class PipelineOptimizer(object):
|
|
|
|
|
"input_set": set(),
|
|
|
|
|
"output_set": set()
|
|
|
|
|
}
|
|
|
|
|
cur_ops = self.extract_section_ops(ops, cut_vars)
|
|
|
|
|
cur_ops = self._extract_section_ops(ops, cut_vars)
|
|
|
|
|
if i == 0:
|
|
|
|
|
for op in ops:
|
|
|
|
|
if self._is_lr_role_op(op):
|
|
|
|
|
cur_ops.append(op)
|
|
|
|
|
#prevent inplace in/out
|
|
|
|
|
program["input_set"].update(
|
|
|
|
|
self.find_input_output(
|
|
|
|
|
self._find_input_output(
|
|
|
|
|
cur_ops, [], is_forward=True))
|
|
|
|
|
for e in cur_ops:
|
|
|
|
|
ops.remove(e)
|
|
|
|
|
|
|
|
|
|
if i < cut_len:
|
|
|
|
|
sec_params.append(
|
|
|
|
|
self.find_persistable_vars(cur_ops, whole_parameters))
|
|
|
|
|
self._find_persistable_vars(cur_ops, whole_parameters))
|
|
|
|
|
if i >= cut_len - 1:
|
|
|
|
|
opt_ops = self.find_section_opt(ops,
|
|
|
|
|
sec_params[2 * cut_len - 2 - i])
|
|
|
|
|
opt_ops = self._find_section_opt(
|
|
|
|
|
ops, sec_params[2 * cut_len - 2 - i])
|
|
|
|
|
|
|
|
|
|
for e in opt_ops:
|
|
|
|
|
ops.remove(e)
|
|
|
|
@ -2875,11 +2885,11 @@ class PipelineOptimizer(object):
|
|
|
|
|
ap_op = program["program"].block(0).desc.append_op()
|
|
|
|
|
ap_op.copy_from(op_desc)
|
|
|
|
|
program["input_set"].update(
|
|
|
|
|
self.find_input_output(
|
|
|
|
|
self._find_input_output(
|
|
|
|
|
cur_ops, cut_vars, is_forward=True))
|
|
|
|
|
program["input_set"].update(sec_params[min(i, 2 * cut_len - 2 - i)])
|
|
|
|
|
program["output_set"].update(
|
|
|
|
|
self.find_input_output(
|
|
|
|
|
self._find_input_output(
|
|
|
|
|
cur_ops, cut_vars, is_forward=False))
|
|
|
|
|
programs.append(program)
|
|
|
|
|
program = {
|
|
|
|
@ -2894,7 +2904,7 @@ class PipelineOptimizer(object):
|
|
|
|
|
program["input_set"].update(
|
|
|
|
|
[cut_var.name + "@GRAD" for cut_var in cut_list[0]])
|
|
|
|
|
program["input_set"].update(
|
|
|
|
|
self.find_input_output(
|
|
|
|
|
self._find_input_output(
|
|
|
|
|
ops, [], is_forward=True))
|
|
|
|
|
program["input_set"].update(sec_params[0])
|
|
|
|
|
programs.append(program)
|
|
|
|
@ -2915,9 +2925,9 @@ class PipelineOptimizer(object):
|
|
|
|
|
self._optimizer.minimize(loss, startup_program, parameter_list,
|
|
|
|
|
no_grad_set)
|
|
|
|
|
program = loss.block.program
|
|
|
|
|
program_list = self.split_program(program, self._cut_list)
|
|
|
|
|
program_list = self._split_program(program, self._cut_list)
|
|
|
|
|
for p in program_list:
|
|
|
|
|
self.create_vars(p["program"].block(0), program)
|
|
|
|
|
self._create_vars(p["program"].block(0), program)
|
|
|
|
|
whole_parameters = [e.name for e in program.block(0).all_parameters()]
|
|
|
|
|
param_need_sync = []
|
|
|
|
|
for i, section_p in enumerate(program_list):
|
|
|
|
|