control flow: support optimizer called (#21851)

* append optimize op in the grad block of current block if current block is in control flow. test=develop

* add conditional grad op when optimizer used in control flow. test=develop

* add comment and modify typo. test=develop

* fix append_backward to support control flow. test=develop

* add test. test=develop

* fix copy_var_to_parent_block and conditional_block_grad. test=develop

* fix bug: revert to append conditional_block_grad vars to sub grad block. test=develop

* fix bug: revert to assign var to parent block even if var already is in parent block

* fix bug: consider outputs is empty. test=develop

* move _rename_grad_ out. test=develop

* modify code according to reviews from Huihuang. test=develop

* modify code according to reviews from Jinle. test=develop
release/1.7
liym27 5 years ago committed by GitHub
parent 12b2b4b1a1
commit 7d8d45991a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

File diff suppressed because it is too large Load Diff

@ -2231,6 +2231,14 @@ class Block(object):
""" """
self.desc._set_forward_block_idx(idx) self.desc._set_forward_block_idx(idx)
@property
def backward_block_idx(self):
cur_block_idx = self.idx
for block in self.program.blocks:
if block.forward_block_idx == cur_block_idx:
return block.idx
return -1
@property @property
def idx(self): def idx(self):
return self.desc.id return self.desc.id

@ -28,6 +28,8 @@ import warnings
import six import six
from functools import reduce, partial from functools import reduce, partial
from ..data_feeder import convert_dtype, check_type_and_dtype from ..data_feeder import convert_dtype, check_type_and_dtype
from ... import compat as cpt
from ..backward import _infer_var_data_type_shape_
__all__ = [ __all__ = [
'While', 'Switch', 'increment', 'array_write', 'create_array', 'less_than', 'While', 'Switch', 'increment', 'array_write', 'create_array', 'less_than',
@ -1799,6 +1801,9 @@ class ConditionalBlock(object):
intermediate.add(out_var_name) intermediate.add(out_var_name)
input_set = set([ipt.name for ipt in self.inputs]) input_set = set([ipt.name for ipt in self.inputs])
# Todo(liym27) Here assume that all params are in recursive parent block
# but when minimize() called in control flow, some params may be in
# conditional grad block
param_list = [ param_list = [
parent_block._var_recursive(each_name) for each_name in params parent_block._var_recursive(each_name) for each_name in params
] ]
@ -1811,7 +1816,7 @@ class ConditionalBlock(object):
step_scope = parent_block.create_var( step_scope = parent_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES) type=core.VarDesc.VarType.STEP_SCOPES)
parent_block.append_op( conditional_block_op = parent_block.append_op(
type='conditional_block', type='conditional_block',
inputs={ inputs={
'Cond': self.inputs, 'Cond': self.inputs,
@ -1824,6 +1829,90 @@ class ConditionalBlock(object):
'is_scalar_condition': self.is_scalar_condition 'is_scalar_condition': self.is_scalar_condition
}) })
if self.need_append_conditional_block_grad(inside_block):
self.append_conditional_block_grad(parent_block, inside_block,
conditional_block_op)
def need_append_conditional_block_grad(self, inside_block):
grad_sub_block_idx = inside_block.backward_block_idx
return grad_sub_block_idx != -1
def append_conditional_block_grad(self, parent_block, inside_block,
conditional_block_op):
'''
Append op `conditional_block_grad` manually.
When `optimizer.minimize/append_backward` is called in Paddle control flow,
grad ops will be appended before appending op `conditional_block` so that
op `conditional_block_grad` can't be appended when calling
`optimizer.minimize/append_backward`. After appending op `conditional_block`,
`conditional_block_grad` is appended manually.
Args:
parent_block (Block): The block that `conditional_block_op` blongs to.
inside_block (Block): The sub block of `conditional_block_op`.
conditional_block_op (Operator): The forward op conditional_block.
'''
grad_sub_block_idx = inside_block.backward_block_idx
grad_sub_block = self.helper.main_program.block(grad_sub_block_idx)
intermediate = set()
params = set()
for each_op in grad_sub_block.ops:
assert isinstance(each_op, Operator)
for iname in each_op.input_names:
for in_var_name in each_op.input(iname):
if in_var_name not in intermediate:
params.add(in_var_name)
for oname in each_op.output_names:
for out_var_name in each_op.output(oname):
intermediate.add(out_var_name)
param_list = []
for inner_input_name in params:
inner_var = parent_block._find_var_recursive(inner_input_name)
if inner_var:
param_list.append(cpt.to_text(inner_var.name))
grad_op_desc, op_grad_to_var = core.get_grad_op_desc(
conditional_block_op.desc,
cpt.to_text(set()), [grad_sub_block.desc])
# append op_desc in grad_op_descs to target_block
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
backward = core.op_proto_and_checker_maker.OpRole.Backward
new_op_desc = parent_block.desc.append_op()
new_op_desc.copy_from(grad_op_desc[0])
new_op_desc._set_attr(op_role_attr_name, backward)
# set input and output manually
new_op_desc.set_input('Input', param_list)
new_op_desc.set_output('Input@GRAD',
[param + "@GRAD" for param in param_list])
new_vars = set()
for grad_var_name in new_op_desc.output_arg_names():
if grad_sub_block.desc.has_var_recursive(
cpt.to_bytes(grad_var_name)
) or grad_var_name == core.empty_var_name():
continue
grad_sub_block.desc.var(cpt.to_bytes(grad_var_name))
new_vars.add(grad_var_name)
if grad_var_name not in op_grad_to_var:
continue
# infer_shape and infer_type
new_op_desc.infer_var_type(grad_sub_block.desc)
new_op_desc.infer_shape(grad_sub_block.desc)
for arg in new_op_desc.output_arg_names():
if arg in new_vars:
_infer_var_data_type_shape_(arg, grad_sub_block)
self.helper.main_program._sync_with_cpp()
def copy_var_to_parent_block(var, layer_helper): def copy_var_to_parent_block(var, layer_helper):
if var is None: if var is None:

@ -422,11 +422,22 @@ class Optimizer(object):
# for parameters and extend _finish_update method to add custom ops. # for parameters and extend _finish_update method to add custom ops.
# Allways called under program_guard use global block as loss block # Allways called under program_guard use global block as loss block
# But if current block is in control flow, append optimize op in the
# grad block of current block
global_block = framework.default_main_program().global_block() global_block = framework.default_main_program().global_block()
start = len(global_block.ops) target_block = global_block
current_block = framework.default_main_program().current_block()
if current_block.idx != global_block.idx:
assert current_block.backward_block_idx != -1, \
"current block is not global_block, but it doesn't have backward block."
target_block = framework.default_main_program().blocks[
current_block.backward_block_idx]
start = len(target_block.ops)
self.helper = LayerHelper(self.__class__.__name__) self.helper = LayerHelper(self.__class__.__name__)
self._create_accumulators( self._create_accumulators(
global_block, target_block,
[p[0] for p in parameters_and_grads if p[0].trainable]) [p[0] for p in parameters_and_grads if p[0].trainable])
self._create_global_learning_rate() self._create_global_learning_rate()
@ -438,7 +449,7 @@ class Optimizer(object):
with param_and_grad[0].block.program._optimized_guard( with param_and_grad[0].block.program._optimized_guard(
param_and_grad): param_and_grad):
if param_and_grad[0].trainable is True: if param_and_grad[0].trainable is True:
optimize_op = self._append_optimize_op(global_block, optimize_op = self._append_optimize_op(target_block,
param_and_grad) param_and_grad)
optimize_ops.append(optimize_op) optimize_ops.append(optimize_op)
else: else:
@ -448,16 +459,16 @@ class Optimizer(object):
with param_and_grad[0].block.program._optimized_guard( with param_and_grad[0].block.program._optimized_guard(
param_and_grad), name_scope("optimizer"): param_and_grad), name_scope("optimizer"):
if param_and_grad[0].trainable is True: if param_and_grad[0].trainable is True:
optimize_op = self._append_optimize_op(global_block, optimize_op = self._append_optimize_op(target_block,
param_and_grad) param_and_grad)
optimize_ops.append(optimize_op) optimize_ops.append(optimize_op)
# Get custom finish ops for subclasses # Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies # FIXME: Need to fix this once we figure out how to handle dependencies
self._finish_update(global_block, parameters_and_grads) self._finish_update(target_block, parameters_and_grads)
end = len(global_block.ops) end = len(target_block.ops)
return global_block._slice_ops(start, end) return target_block._slice_ops(start, end)
def _process_distribute_lookuptable(self, param_grads): def _process_distribute_lookuptable(self, param_grads):
""" """
@ -1904,7 +1915,6 @@ class AdamaxOptimizer(Optimizer):
"""Update Beta1 Power accumulator """Update Beta1 Power accumulator
""" """
assert isinstance(block, framework.Block) assert isinstance(block, framework.Block)
main_block = block.program.global_block()
for param, grad in parameters_and_grads: for param, grad in parameters_and_grads:
if grad is None or param.trainable is False: if grad is None or param.trainable is False:
continue continue
@ -1912,7 +1922,7 @@ class AdamaxOptimizer(Optimizer):
[param, grad]), name_scope('adamx'): [param, grad]), name_scope('adamx'):
beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str, beta1_pow_acc = self._get_accumulator(self._beta1_pow_acc_str,
param) param)
main_block.append_op( block.append_op(
type="scale", type="scale",
inputs={"X": beta1_pow_acc}, inputs={"X": beta1_pow_acc},
outputs={"Out": beta1_pow_acc}, outputs={"Out": beta1_pow_acc},

@ -22,6 +22,7 @@ import paddle.fluid.core as core
import paddle.fluid.layers as layers import paddle.fluid.layers as layers
from paddle.fluid.framework import Program, program_guard from paddle.fluid.framework import Program, program_guard
from functools import partial from functools import partial
import paddle.fluid.optimizer as optimizer
class TestAPICase(unittest.TestCase): class TestAPICase(unittest.TestCase):
@ -223,5 +224,52 @@ class TestAPICase_Error(unittest.TestCase):
self.assertRaises(TypeError, type_error_default) self.assertRaises(TypeError, type_error_default)
# when optimizer in case
class TestMutiTask(unittest.TestCase):
def test_optimizer_in_case(self):
BATCH_SIZE = 1
INPUT_SIZE = 784
EPOCH_NUM = 2
x = fluid.data(
name='x', shape=[BATCH_SIZE, INPUT_SIZE], dtype='float32')
y = fluid.data(
name='y', shape=[BATCH_SIZE, INPUT_SIZE], dtype='float32')
switch_id = fluid.data(name='switch_id', shape=[1], dtype='int32')
one = layers.fill_constant(shape=[1], dtype='int32', value=1)
adam = optimizer.Adam(learning_rate=0.001)
adagrad = optimizer.Adagrad(learning_rate=0.001)
def fn_1():
sum = layers.elementwise_mul(x, y)
loss = layers.mean(sum, name="f_1_loss")
adam.minimize(loss)
def fn_2():
sum = layers.elementwise_mul(x, y)
loss = layers.mean(sum, name="f_2_loss")
adagrad.minimize(loss)
layers.case(pred_fn_pairs=[(switch_id == one, fn_1)], default=fn_2)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
for epoch in range(EPOCH_NUM):
np.random.seed(epoch)
feed_image = np.random.random(
size=[BATCH_SIZE, INPUT_SIZE]).astype('float32')
main_program = fluid.default_main_program()
out = exe.run(main_program,
feed={
'x': feed_image,
'y': feed_image,
'switch_id': np.array([epoch]).astype('int32')
},
fetch_list=[])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

@ -0,0 +1,229 @@
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import unittest
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import paddle.fluid.optimizer as optimizer
from paddle.fluid.framework import Program, program_guard
import paddle.fluid.core as core
BATCH_SIZE = 1
INPUT_SIZE = 784
CLASS_NUM = 10
FC_SIZE = 40
EPOCH_NUM = 5
LR = 0.001
SEED = 2020
def static(train_data,
loss_in_switch=True,
use_cuda=False,
use_parallel_exe=False):
startup_program = Program()
main_program = Program()
startup_program.random_seed = SEED
main_program.random_seed = SEED
with program_guard(main_program, startup_program):
def double_fc_net(image):
hidden = layers.fc(
image,
size=FC_SIZE,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.99)),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.5)),
name="hidden")
prediction = layers.fc(
hidden,
size=CLASS_NUM,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=1.2)),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=0.8)),
name="prediction")
return hidden, prediction
def fn_1(opt, avg_loss=None, pred=None, label=None):
if avg_loss is None:
loss = layers.cross_entropy(input=pred, label=label)
avg_loss = layers.mean(loss, name='mean_cross_entropy_loss')
opt.minimize(avg_loss)
return avg_loss
def fn_2(opt, avg_loss=None, pred=None, label=None):
if avg_loss is None:
loss = layers.softmax_with_cross_entropy(
logits=pred, label=label)
avg_loss = layers.mean(loss, name='mean_softmax_loss')
opt.minimize(avg_loss)
return avg_loss
image = fluid.data('image', [BATCH_SIZE, INPUT_SIZE], 'float32')
label = fluid.data('label', [BATCH_SIZE, 1], 'int64')
hidden, prediction = double_fc_net(image)
adam = optimizer.Adam(learning_rate=LR)
sgd = optimizer.SGD(learning_rate=LR)
id = fluid.data('id', [1], 'int32')
two = layers.fill_constant([1], 'int32', 2)
mod_two = layers.elementwise_mod(id, two) == 0
if loss_in_switch:
avg_loss = layers.case([(
mod_two, lambda: fn_1(adam, None, prediction, label))],
lambda: fn_2(sgd, None, prediction, label))
else:
loss_1 = layers.cross_entropy(input=prediction, label=label)
avg_loss_1 = layers.mean(loss_1)
loss_2 = layers.softmax_with_cross_entropy(
logits=prediction, label=label)
avg_loss_2 = layers.mean(loss_2)
avg_loss = layers.case([(mod_two, lambda: fn_1(adam, avg_loss_1))],
lambda: fn_2(sgd, avg_loss_2))
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for epoch in range(EPOCH_NUM):
feed_image, feed_label = train_data[epoch]
fetch_list = [hidden, prediction, avg_loss]
feed = {
'image': feed_image,
'label': feed_label,
'id': np.array([epoch]).astype('int32')
}
out = exe.run(main_program, feed=feed, fetch_list=fetch_list)
out_hidden, out_pred, loss = out
return out_hidden, out_pred, loss
class DygraphLayer(fluid.dygraph.Layer):
def __init__(self):
super(DygraphLayer, self).__init__()
self.fc_1 = fluid.dygraph.nn.Linear(
INPUT_SIZE,
FC_SIZE,
act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.99)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.5)), )
self.fc_2 = fluid.dygraph.nn.Linear(
FC_SIZE,
CLASS_NUM,
act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=1.2)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.8)))
def forward(self, inputs):
hidden = self.fc_1(inputs)
prediction = self.fc_2(hidden)
return hidden, prediction
def dynamic(train_data, use_cuda=False, use_parallel_exe=False):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
with fluid.dygraph.guard(place):
fluid.default_startup_program().random_seed = SEED
fluid.default_main_program().random_seed = SEED
dy_layer = DygraphLayer()
adam = fluid.optimizer.Adam(
learning_rate=LR, parameter_list=dy_layer.parameters())
sgd = fluid.optimizer.SGD(learning_rate=LR,
parameter_list=dy_layer.parameters())
for epoch in range(EPOCH_NUM):
image_data, label = train_data[epoch]
var_input = fluid.dygraph.to_variable(image_data)
var_label = fluid.dygraph.to_variable(label)
hidden, prediction = dy_layer(var_input)
if epoch % 2 == 0:
cross_entropy_loss = layers.cross_entropy(prediction, var_label)
loss = layers.mean(cross_entropy_loss)
loss.backward()
adam.minimize(loss)
else:
softmax_loss = layers.softmax_with_cross_entropy(prediction,
var_label)
loss = layers.mean(softmax_loss)
loss.backward()
sgd.minimize(loss)
dy_layer.clear_gradients()
return hidden.numpy(), prediction.numpy(), loss.numpy()
class TestMultiTask(unittest.TestCase):
'''
Compare results of static graph and dynamic graph.
Todo(liym27): add parallel GPU train.
'''
def random_input(self,
seed,
image_shape=[BATCH_SIZE, INPUT_SIZE],
label_shape=[BATCH_SIZE, 1]):
np.random.seed(seed)
image_np = np.random.random(size=image_shape).astype('float32')
np.random.seed(seed)
label_np = np.random.randint(
low=0, high=CLASS_NUM - 1, size=label_shape).astype('int64')
return image_np, label_np
def init_train_data(self):
self.train_data = []
for epoch in range(EPOCH_NUM):
self.train_data.append(self.random_input(epoch))
def test_optimzier_in_switch(self):
self.init_train_data()
use_cuda = core.is_compiled_with_cuda()
hidden_2, pre_2, loss_2 = dynamic(self.train_data, use_cuda)
for loss_in_switch in [True, False]:
hidden_1, pre_1, loss_1 = static(self.train_data, loss_in_switch,
use_cuda)
self.assertTrue(
np.allclose(hidden_1, hidden_2),
msg='static hidden is {}\ndynamic hidden is {}'.format(
hidden_1, hidden_2))
self.assertTrue(
np.allclose(pre_1, pre_2),
msg='static prediction is {}\ndynamic prediction is {}'.format(
pre_1, pre_2))
self.assertTrue(
np.allclose(loss_1, loss_2),
msg='static loss is {}\ndynamic loss is {}'.format(loss_1,
loss_2))
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save