clean parallel do

test=develop
for_weibo
Xin Pan 6 years ago
parent e3c4b0dace
commit 47ea2534fb

File diff suppressed because it is too large Load Diff

@ -249,69 +249,6 @@ def serialize_op_decs(op_desc):
return proto.__str__()
def _callback_lookup_(op):
"""
Only used in _append_backward_ops_
Build and returns a callback function for certain op. For example
parallel_do: AllReduce
:param op:
:return: callback function
"""
if op.type == 'parallel_do' and op.attr('use_nccl'):
all_vars = op.block.vars
param_names = set(op.input('parameters'))
param_names = [
name for name in param_names
if all_vars[name].stop_gradient is False
]
param_grad_names = [n + "@GRAD" for n in param_names]
class ParallelDoCallBack(object):
def __init__(self, param_grad_names, parallel_scopes_name):
self.has_inserted_nccl_init = False
self.param_grad_names = param_grad_names
self.parallel_scopes_name = parallel_scopes_name
def __call__(self, block, context):
if not self.has_inserted_nccl_init:
op_desc = _create_op_desc_(
"ncclInit",
{"parallel_scopes": self.parallel_scopes_name},
{"Communicator": ['nccl_com__do_not_change_']}, {})
block.program.global_block().desc.append_op().copy_from(
op_desc)
self.has_inserted_nccl_init = True
current_op_desc = context["__current_op_desc__"]
for o_param in current_op_desc.output_names():
for o_argu in current_op_desc.output(o_param):
if o_argu in self.param_grad_names:
allreduce_out_name = o_argu + "__nccl_all_reduce__"
op_desc = _create_op_desc_(
"ncclReduce",
{
"X": [o_argu],
"Communicator":
['nccl_com__do_not_change_']
},
{"Out": [allreduce_out_name]},
{"reduction": "ncclSum",
"root": 0}, )
block.desc.append_op().copy_from(op_desc)
op_desc = _create_op_desc_(
"assign", {"X": [allreduce_out_name]},
{"Out": [o_argu]}, {})
block.desc.append_op().copy_from(op_desc)
return ParallelDoCallBack(param_grad_names,
op.output("parallel_scopes"))
else:
return None
def _append_backward_ops_(block,
ops,
target_block,
@ -349,17 +286,8 @@ def _append_backward_ops_(block,
sub_block = program.block(op._block_attr_id("sub_block"))
grad_sub_block = program._create_block()
grad_sub_block._set_forward_block_idx(sub_block.idx)
cb = _callback_lookup_(op)
if cb is not None:
if callbacks is None:
new_callbacks = [cb]
else:
new_callbacks = callbacks + [_callback_lookup_(op)]
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block,
no_grad_dict, grad_to_var, new_callbacks)
else:
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block,
no_grad_dict, grad_to_var, callbacks)
_append_backward_ops_(sub_block, sub_block.ops, grad_sub_block,
no_grad_dict, grad_to_var, callbacks)
program._rollback()
grad_sub_block_list.append(grad_sub_block.desc)
@ -424,9 +352,6 @@ def _append_backward_vars_(block, start_op_idx, grad_to_var, grad_info_map):
# infer_shape and infer_type
op_desc.infer_var_type(block.desc)
op_desc.infer_shape(block.desc)
# ncclInit dones't need to set data_type
if op_desc.type() == 'ncclInit':
continue
for arg in op_desc.output_arg_names():
if arg in new_vars:
_infer_var_data_type_(arg, block)

@ -563,8 +563,8 @@ class Operator(object):
OP_WITHOUT_KERNEL_SET = {
'feed', 'fetch', 'save', 'load', 'recurrent', 'go',
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'recv',
'listen_and_serv', 'parallel_do', 'save_combine', 'load_combine',
'ncclInit', 'select', 'checkpoint_notify', 'gen_nccl_id'
'listen_and_serv', 'save_combine', 'load_combine', 'ncclInit', 'select',
'checkpoint_notify', 'gen_nccl_id'
}
def __init__(self,

@ -226,156 +226,6 @@ class BlockGuard(object):
return True
class ParallelDo(object):
"""
ParallelDo is used to represent multi-thread data parallel processing.
Its vanilla implementation can be shown as the following (:math:`|` means
single thread and :math:`||||` means multiple threads)
.. code-block:: text
In the forward pass
| Split input onto different devices
| Copy parameter onto different devices
|||| Compute forward pass in parallel
| Merge output from different devices
In the backward pass
| Split output@grad onto different devices
|||| Compute backward pass in parallel
| accumulate param@grad from different devices to the first device
| Merge input@grad from different devices
| Copy param@grad to the place of parallel_do_op
Examples:
.. code-block:: python
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# ParallelDo version & Single-thread version
if thread_num > 1:
places = fluid.layers.get_places(thread_num)
pd = fluid.layers.control_flow.ParallelDo(places)
with pd.do():
images = pd.read_input(images)
label = pd.read_input(label)
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
pd.write_output(avg_cost)
avg_cost = pd()
avg_cost = fluid.layers.mean(avg_cost)
else:
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
.. warning::
It will be soon deprecated, please use ParallelExecutor instead.
"""
def __init__(self, places, use_nccl=False, name=None):
warnings.warn(
"API ParallelDo is deprecated since 0.15.0. Please use ParallelExecutor instead.",
Warning)
self.helper = LayerHelper("parallel_do", name=name)
self.inputs = []
self.places = places
self.outputs = []
self.status = StaticRNN.BEFORE_RNN_BLOCK
self.use_nccl = use_nccl
def do(self):
return BlockGuardWithCompletion(self)
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def __call__(self, *args, **kwargs):
if self.status != StaticRNN.AFTER_RNN_BLOCK:
raise ValueError("RNN output can only be retrieved after rnn block")
if len(self.outputs) == 0:
raise ValueError("RNN has no output")
elif len(self.outputs) == 1:
return self.outputs[0]
else:
return self.outputs
def read_input(self, var):
self.inputs.append(var)
return var
def write_output(self, var):
self.outputs.append(var)
def get_parameters(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
local_inputs = set()
params = list()
for var in self.inputs:
local_inputs.add(var.name)
for op in current_block.ops:
for iname in op.input_names:
for in_var_name in op.input(iname):
if in_var_name not in local_inputs:
params.append(in_var_name)
for oname in op.output_names:
for out_var_name in op.output(oname):
local_inputs.add(out_var_name)
params = list(set(params))
return [parent_block.var(name) for name in params]
def _complete_op(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
step_scope = parent_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES)
self.outputs = [
parent_block.create_var(
name=o.name,
shape=o.shape,
dtype=o.dtype,
lod_level=o.lod_level,
persistable=o.persistable,
stop_gradient=o.stop_gradient) for o in self.outputs
]
inputs = [parent_block.var(i.name) for i in self.inputs]
outputs = [parent_block.var(o.name) for o in self.outputs]
parent_block.append_op(
type='parallel_do',
inputs={
'inputs': inputs,
'parameters': self.get_parameters(),
'places': self.places
},
outputs={'outputs': outputs,
'parallel_scopes': [step_scope]},
attrs={'sub_block': current_block,
'use_nccl': self.use_nccl})
class BlockGuardWithCompletion(BlockGuard):
"""
BlockGuardWithCompletion class.
@ -384,7 +234,7 @@ class BlockGuardWithCompletion(BlockGuard):
"""
def __init__(self, rnn):
if not (isinstance(rnn, StaticRNN) or isinstance(rnn, ParallelDo)):
if not isinstance(rnn, StaticRNN):
raise TypeError(
"BlockGuardWithCompletion takes a StaticRNN or ParallelDo")
super(BlockGuardWithCompletion, self).__init__(rnn.helper.main_program)

@ -15,7 +15,6 @@
from __future__ import print_function
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import unittest
import paddle.fluid as fluid
import paddle
@ -147,22 +146,7 @@ def train(word_dict,
cost, acc_out, prediction = net_method(
data, label, input_dim=dict_dim, class_dim=class_dim)
else:
places = get_places()
pd = ParallelDo(places)
with pd.do():
cost, acc, _ = net_method(
pd.read_input(data),
pd.read_input(label),
input_dim=dict_dim,
class_dim=class_dim)
pd.write_output(cost)
pd.write_output(acc)
cost, acc = pd()
cost = fluid.layers.mean(cost)
acc_out = fluid.layers.mean(acc)
prediction = None
assert save_dirname is None
raise NotImplementedError()
adagrad = fluid.optimizer.Adagrad(learning_rate=0.002)
adagrad.minimize(cost)

@ -25,7 +25,6 @@ import numpy
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
BATCH_SIZE = 64
@ -82,19 +81,7 @@ def train(nn_type,
net_conf = conv_net
if parallel:
places = get_places()
pd = ParallelDo(places)
with pd.do():
img_ = pd.read_input(img)
label_ = pd.read_input(label)
prediction, avg_loss, acc = net_conf(img_, label_)
for o in [avg_loss, acc]:
pd.write_output(o)
avg_loss, acc = pd()
# get mean loss and acc through every devices.
avg_loss = fluid.layers.mean(avg_loss)
acc = fluid.layers.mean(acc)
raise NotImplementedError()
else:
prediction, avg_loss, acc = net_conf(img, label)

@ -17,7 +17,6 @@ from __future__ import print_function
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import unittest
import os
import numpy as np
@ -84,18 +83,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True):
avg_cost, predict_word = __network__(
[first_word, second_word, third_word, forth_word, next_word])
else:
places = get_places()
pd = ParallelDo(places)
with pd.do():
avg_cost, predict_word = __network__(
list(
map(pd.read_input, [
first_word, second_word, third_word, forth_word,
next_word
])))
pd.write_output(avg_cost)
avg_cost = fluid.layers.mean(pd())
raise NotImplementedError()
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)

@ -1,87 +0,0 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import math
import sys
import paddle
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
# need to fix random seed and training data to compare the loss
# value accurately calculated by the default and the memory optimization
# version.
fluid.default_startup_program().random_seed = 111
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
device_type = 'CPU'
use_nccl = False
place = fluid.CPUPlace()
if fluid.core.is_compiled_with_cuda():
device_type = 'CUDA'
use_nccl = False
place = fluid.CUDAPlace(0)
places = get_places(device_count=0, device_type=device_type)
pd = ParallelDo(places, use_nccl=use_nccl)
with pd.do():
x_ = pd.read_input(x)
y_ = pd.read_input(y)
y_predict = fluid.layers.fc(input=x_, size=1, act=None)
cost = fluid.layers.square_error_cost(input=y_predict, label=y_)
avg_cost = fluid.layers.mean(x=cost)
pd.write_output(avg_cost)
cost = pd()
avg_cost = fluid.layers.mean(x=cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.01)
sgd_optimizer.minimize(avg_cost)
fluid.memory_optimize(fluid.default_main_program(), print_log=True)
# fluid.release_memory(fluid.default_main_program())
BATCH_SIZE = 200
# fix the order of training data
train_reader = paddle.batch(
paddle.dataset.uci_housing.train(), batch_size=BATCH_SIZE, drop_last=False)
# train_reader = paddle.batch(
# paddle.reader.shuffle(
# paddle.dataset.uci_housing.train(), buf_size=500),
# batch_size=BATCH_SIZE)
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
PASS_NUM = 100
for pass_id in range(PASS_NUM):
for data in train_reader():
avg_loss_value, = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
if avg_loss_value[0] < 10.0:
exit(0) # if avg cost less than 10.0, we think our code is good.
print(avg_loss_value[0])
if math.isnan(float(avg_loss_value)):
sys.exit("got NaN loss, training failed.")
exit(1)

@ -1,235 +0,0 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import paddle.fluid as fluid
from paddle.fluid.layers.device import get_places
from paddle.fluid.layers.control_flow import ParallelDo
import paddle.fluid.profiler as profiler
import numpy
import six
class BaseParallelForTest(unittest.TestCase):
def run_test(self, callback, feed, fetch):
"""
Run the unittest for parallel.for
Args:
callback(callable): A callable function returns a generator. There
are two yields in the generator function. The first yield
returns the data layers, and the second yield returns the loss.
The modified data variables will be sent back during the first
yield.
feed(dict): The executor feeding dictionary.
fetch(list|basestr): The fetch name lists.
Returns:
None
Raises:
AssertionError when the computation of cpu, parallel.for in cpu,
gpu, parallel.for in gpu are different.
"""
cpu = fluid.CPUPlace()
result_cpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=False)
result_cpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=cpu,
use_parallel=True)
if fluid.core.is_compiled_with_cuda():
gpu = fluid.CUDAPlace(0)
result_gpu = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=False,
use_gpu=True)
result_gpu_parallel = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=True,
use_gpu=True)
result_gpu_nccl = self._run_test_impl_(
callback=callback,
feed=feed,
fetch=fetch,
place=gpu,
use_parallel=True,
use_nccl=True,
use_gpu=True)
self._assert_same_(fetch, result_cpu, result_cpu_parallel,
result_gpu, result_gpu_parallel, result_gpu_nccl)
else:
self._assert_same_(fetch, result_cpu, result_cpu_parallel)
def _run_test_impl_(self,
callback,
feed,
fetch,
place,
use_parallel=False,
use_nccl=False,
use_gpu=False):
"""
Run a single test, returns the fetch values
Args:
place(Place): the computation place.
use_parallel(bool): Whether use parallel.for or not.
Returns:
Fetched numpy arrays.
"""
if isinstance(fetch, six.string_types):
fetch = [fetch]
main = fluid.Program()
startup = fluid.Program()
# Fix seed
main.random_seed = 10
startup.random_seed = 10
with fluid.program_guard(main, startup):
generator = callback()
# Automatically insert parallel do if use_parallel = True
if use_parallel:
thread_num = fluid.core.get_cuda_device_count(
) if use_gpu else 8
places = get_places(thread_num)
pd = ParallelDo(places, use_nccl=use_nccl)
data = next(generator)
if isinstance(data, fluid.framework.Variable):
data = [data]
with pd.do():
ins = list(map(pd.read_input, data))
if len(ins) == 1:
ins = ins[0]
loss = generator.send(ins) # patch input
pd.write_output(loss)
loss = pd()
else:
data = next(generator)
loss = generator.send(data)
self.assertIsNotNone(loss)
avg_loss = fluid.layers.mean(loss)
fluid.backward.append_backward(loss=avg_loss)
exe = fluid.Executor(place)
exe.run(startup)
if use_gpu:
profile_type = 'GPU'
else:
profile_type = 'CPU'
with profiler.profiler(profile_type, 'total', '/tmp/profiler'):
return exe.run(main, feed=feed, fetch_list=fetch)
def _assert_same_(self, fetch, *args):
"""
Assert the return values of `run_test` are same.
Args:
fetch: Fetch list. Used for print error message
*args: The fetch result lists of each situations.
Returns:
None
Raises:
AssertionError
"""
def _impl_(a, b, fetch_id, item_id):
item_str = [
'CPU', 'ParallelCPU', 'GPU', 'ParallelGPU', 'ParallelGPUNCCL'
]
flag = numpy.allclose(a, b, rtol=0.1, atol=1e-3)
self.assertTrue(flag,
"The {0} are different in {1}, {2} vs {3}".format(
fetch[fetch_id], item_str[item_id], a, b))
for i, items in enumerate(zip(*args)):
self.assertGreater(len(items), 0)
for j in range(1, len(items)):
_impl_(items[0], items[j], fetch_id=i, item_id=j)
class ParallelOpTest(BaseParallelForTest):
@staticmethod
def __network__():
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
x = yield x
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
hidden = fluid.layers.batch_norm(input=hidden)
loss = fluid.layers.mean(hidden)
yield loss
def test_simple_fc(self):
self.run_test(
callback=self.__network__,
feed={
'img': numpy.random.random(size=(51, 784)).astype('float32')
},
fetch=['fc1.w@GRAD'])
def test_fc_with_tiny_data(self):
self.run_test(
callback=self.__network__,
feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
fetch=['fc1.w@GRAD'])
class ParallelOpTestMultipleInput(BaseParallelForTest):
@staticmethod
def __network__():
x = fluid.layers.data(
shape=[784], dtype='float32', name='img1', stop_gradient=False)
y = fluid.layers.data(
shape=[784], dtype='float32', name='img2', stop_gradient=False)
yield [x, y]
x = x + y
hidden1 = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
hidden2 = fluid.layers.fc(input=hidden1, size=200, param_attr='fc2.w')
hidden3 = fluid.layers.fc(input=hidden2, size=200, param_attr='fc3.w')
loss = fluid.layers.mean(hidden3)
yield loss
def test_simple_fc(self):
self.run_test(
callback=self.__network__,
feed={
'img1': numpy.random.random(size=(51, 784)).astype('float32'),
'img2': numpy.random.random(size=(51, 784)).astype('float32')
},
fetch=['fc1.w@GRAD', 'fc2.w@GRAD', 'fc3.w@GRAD'])
if __name__ == '__main__':
unittest.main()

@ -35,11 +35,10 @@ dtype_to_size = {
}
SUB_BLOCK_OPS = [
"while", "while_grad", "parallel_do", "parallel_do_grad",
"conditional_block", "conditional_block_grad"
"while", "while_grad", "conditional_block", "conditional_block_grad"
]
SUB_BLOCK_PAIR = [("while", "while_grad"), ("parallel_do", "parallel_do_grad"),
SUB_BLOCK_PAIR = [("while", "while_grad"),
("conditional_block", "conditional_block_grad")]
PRINT_LOG = False

Loading…
Cancel
Save