|
|
|
@ -2654,6 +2654,61 @@ class ExponentialMovingAverage(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PipelineOptimizer(object):
|
|
|
|
|
"""
|
|
|
|
|
Pipeline Optimizer
|
|
|
|
|
Train with pipeline mode. The program will be splited by cut_list.
|
|
|
|
|
If the len of cut_list is k, then the whole program (including
|
|
|
|
|
backward part) will be splited to 2*k-1 sections. So the length of place_list
|
|
|
|
|
and concurrency_list must be also 2*k-1.
|
|
|
|
|
Note: Though the asynchronous mode is applied in pipeline training to speed up,
|
|
|
|
|
the final performance depends on the training progress of each pipeline heavily.
|
|
|
|
|
And we will try the synchronous mode in the future
|
|
|
|
|
Args:
|
|
|
|
|
optimizer (Optimizer): The based optimizer, such as SGD
|
|
|
|
|
cut_list (list of Variable list): The cut variable of the main_program
|
|
|
|
|
place_list (list of Place): The place where the section will run on
|
|
|
|
|
concurrency_list (list of int): The concurrency degree
|
|
|
|
|
queue_size (int): Each section will consume scopes from its in-scope queue
|
|
|
|
|
and produce scopes to out-scope queue. And this parameter
|
|
|
|
|
specify the scope queue size. [Optional. Default: 30]
|
|
|
|
|
sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]
|
|
|
|
|
start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]
|
|
|
|
|
Examples:
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
|
|
|
|
|
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
|
|
|
|
|
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
|
|
|
|
|
concat = layers.concat([emb_x, emb_y], axis=1)
|
|
|
|
|
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
|
|
|
|
|
loss = layers.reduce_mean(fc)
|
|
|
|
|
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
|
|
|
|
|
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
|
|
|
|
|
cut_list=[[emb_x, emb_y], [loss]],
|
|
|
|
|
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
|
|
|
|
|
concurrency_list=[1, 1, 4],
|
|
|
|
|
queue_size=2,
|
|
|
|
|
sync_steps=1,
|
|
|
|
|
)
|
|
|
|
|
optimizer.minimize(loss)
|
|
|
|
|
place = fluid.CPUPlace()
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(fluid.default_startup_program())
|
|
|
|
|
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
|
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
|
|
|
|
|
dataset.set_use_var([x,y])
|
|
|
|
|
dataset.set_batch_size(batch_size)
|
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
|
exe.train_from_dataset(
|
|
|
|
|
fluid.default_main_program(),
|
|
|
|
|
dataset,
|
|
|
|
|
thread=2,
|
|
|
|
|
debug=False,
|
|
|
|
|
fetch_list=[],
|
|
|
|
|
fetch_info=[],
|
|
|
|
|
print_period=1)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
|
optimizer,
|
|
|
|
|
cut_list=None,
|
|
|
|
|