You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/dygraph/static_runner.py

544 lines
22 KiB

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import logging
import numpy as np
import os
import six
from . import layers
from .. import core
from .. import framework
from .. import backward
from .base import switch_to_static_graph
from ... import compat as cpt
# DESIGN IDEA: Add an special operator, execute static program inside operator.
#
# Op's Inputs:
# - the input variable of the user feed
# - the necessary parameters of the network
# Op's Outputs:
# - the output variable of fetch
#
# This op receives a complete program desc, internally creates scope
# and executor, executes this program. Key points:
#
# 1. Data Sharing:
# The varBase of the dynamic graph is not in the scope, so before the op
# executes the program internally, create persistent variables with the
# same name as feed, parameters, and fetch in the scope, and share the
# LoDTensor of the op input.
#
# 2. Forward and Backward Separation:
# Because the dynamic graph op performs the forward and backward separately,
# the forward program is used as the execution object of the forward op,
# and the reverse program is used as the execution object of the grad op.
class StaticModelRunner(layers.Layer):
"""
A Dynamic graph Layer for loading inference program and related parameters,
and then performing fine-tune training or inference.
The loaded program and parameters are saved by `fluid.io.save_inference_model`.
.. note::
**1. Dynamic graph mode do not support LoDTensor.
All original static graph model's feed targets or parametars
that depend on LoD are temporarily unavailable.**
**2. All saved inference model's feed targets need be given.**
**3. The ``stop_gradient`` information is lost and can not be recovered.**
**4. The parameter's ``trainable`` information is lost and can not be recovered.**
**5. Double gradient model is not supported now.**
**6. Now only supports loading models saved by `fluid.io.save_inference_model`.**
Args:
model_dir(str): The directory path where the model is saved.
model_filename(str, optional): The file name of saved inference program.
If set to None, a default filename is
:code:`__model__`.
The default value is None.
params_filename(str, optional): The file name of saved all related parameters.
If set to None, parameters are saved
in separate files.
The default value is None.
Returns:
Layer: A Layer object can run loaded program.
Examples:
.. code-block:: python
import numpy as np
import paddle.fluid as fluid
BATCH_SIZE = 32
BATCH_NUM = 20
SAVE_DIRNAME = "fc.inference.model"
def random_batch_reader():
def _get_random_images_and_labels(image_shape, label_shape):
image = np.random.random(size=image_shape).astype('float32')
label = np.random.random(size=label_shape).astype('int64')
return image, label
def __reader__():
for _ in range(BATCH_NUM):
batch_image, batch_label = _get_random_images_and_labels(
[BATCH_SIZE, 784], [BATCH_SIZE, 1])
yield batch_image, batch_label
return __reader__
def train_and_save_static_model(place):
img = fluid.data(name='img', shape=[None, 784], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
pred = fluid.layers.fc(input=img, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=pred, label=label)
avg_loss = fluid.layers.mean(loss)
optimizer = fluid.optimizer.SGD(learning_rate=0.001)
optimizer.minimize(avg_loss)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
loader = fluid.io.DataLoader.from_generator(
feed_list=[img, label], capacity=5, iterable=True)
loader.set_batch_generator(random_batch_reader(), places=place)
for data in loader():
exe.run(
fluid.default_main_program(),
feed=data,
fetch_list=[avg_loss])
# save model by fluid.io.save_inference_model
fluid.io.save_inference_model(
SAVE_DIRNAME, ["img"], [pred], exe)
# Step 1. train and save inference model in static graph mode
place = fluid.CPUPlace()
train_and_save_static_model(place)
# Step 2. load inference model in dygraph and fine-tune
with fluid.dygraph.guard(place):
fc = fluid.dygraph.static_runner.StaticModelRunner(SAVE_DIRNAME)
sgd = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=fc.parameters())
train_loader = fluid.io.DataLoader.from_generator(capacity=5)
train_loader.set_batch_generator(
random_batch_reader(), places=place)
for data in train_loader():
img = data[0]
label = data[1]
label.stop_gradient = True
cost = fc(inputs=img)
loss = fluid.layers.cross_entropy(cost, label)
avg_loss = fluid.layers.mean(loss)
avg_loss.backward()
sgd.minimize(avg_loss)
"""
def __init__(self, model_dir, model_filename=None, params_filename=None):
super(StaticModelRunner, self).__init__()
# Step 0. key variable definitions
# loaded inference program desc
self._infer_program_desc = None
# recovered train program desc
self._train_program_desc = None
# StaticModelRunner executed program desc,
# switch infer or train by train() and eval()
self._trace_program_desc = None
self._inner_scope = core.Scope()
# the layer outputs var desc
self._output_descs = []
# input, output, params name list
self._input_names = []
self._output_names = []
self._param_names = []
# train or eval flag
self._is_test = False
# Step 1. load program desc from disk
# the saved model hold feed, fetch & scale op, no need, can be remove
self._infer_program_desc = self._load_static_model(model_dir,
model_filename)
# Step 2. load all parameters
self._load_persisitable_dict(model_dir, params_filename)
# Step 3. generate backwar program desc
self._train_program_desc = self._append_backward_desc()
# Step 4. recheck parameters stop gradients
self._recheck_stop_gradients()
# Step 5. set default mode to train
self.train()
def train(self):
self._is_test = False
self._trace_program_desc = self._train_program_desc
def eval(self):
self._is_test = True
self._trace_program_desc = self._infer_program_desc
def forward(self, *args):
"""
Executed forward part of StaticModelRunner Layer.
Generally execute directly using the Layer object.
Args:
args(tuple(np.ndarray|Variable)): the inputs of StaticModelRunner.
The order of input variables needs to be the same as the order
of feed variables when using `save_inference_model` to save model.
Returns:
Variable|list[Variable]: The forward outputs of StaticModelRunner Layer.
If there is only one output, return Variable;
if there are multiple outputs, return list[Variable].
"""
# Step 1. prepare inputs, outputs, attrs
input_vars = []
for i, value in enumerate(args):
if not isinstance(value, (np.ndarray, core.VarBase)):
raise TypeError(
"The type of inputs.value in StaticModelRunner.forward must be numpy array or Variable(VarBase), but received %s."
% type(value))
# NOTE: In order to unify the API, firstly convert the input to VarBase
if isinstance(value, np.ndarray):
var = core.VarBase(
value=value,
name=self._input_names[i],
persistable=False,
place=framework._current_expected_place(),
zero_copy=True)
else:
var = value
# TODO: here may have important name set by user
var.name = self._input_names[i]
input_vars.append(var)
params = []
for param in self._parameters.values():
params.append(param)
output_vars = []
for var_desc in self._output_descs:
var = core.VarBase(var_desc.dtype(),
var_desc.shape(),
var_desc.name(), var_desc.type(), False)
output_vars.append(var)
# hold forward variables
tmp_scope_vec = core.VarBase(core.VarDesc.VarType.FP32, [],
"program_out_scope",
core.VarDesc.VarType.STEP_SCOPES, True)
tmp_scope_vec.value().set_scope(self._inner_scope)
# Step 2. run prorgam by op
framework._dygraph_tracer().trace_op(
type='run_program',
inputs={'X': input_vars,
'Params': params},
outputs={'Out': output_vars,
'OutScope': tmp_scope_vec},
attrs={
'global_block': self._trace_program_desc.block(0),
'start_op_index': 0,
'end_op_index': self._infer_program_desc.block(0).op_size(),
'is_test': self._is_test
})
# NOTE: [ why need set param's gradient type here ]
# if user set sparse gradient mode, the param's gradient
# will be SelectedRows, not LoDTensor. But tracer will just
# set param grad VarBase by forward VarBase(LoDTensor)
# If we don't change grad_var type here, RunProgramOp need
# transform SelectedRows to LoDTensor forcely, it may not
# be user wanted result.
for param in params:
grad_name = param.name + core.grad_var_suffix()
grad_var = self._trace_program_desc.block(0).find_var(
cpt.to_bytes(grad_name))
# NOTE: cannot find var desc maybe no problem, such as in batch_norm
if grad_var is None:
continue
param._set_grad_type(grad_var.type())
# Step 3. prepare output, keep same form with inputs
outs = output_vars
if len(output_vars) == 1:
outs = output_vars[0]
return outs
def _load_static_model(self, model_dir, model_filename=None):
# Step 1. dir and filename check
load_dirname = os.path.normpath(model_dir)
if not os.path.isdir(load_dirname):
raise ValueError("There is no directory named '%s'" % load_dirname)
if model_filename is not None:
model_filename = os.path.basename(model_filename)
else:
model_filename = "__model__"
model_filename = os.path.join(load_dirname, model_filename)
# Step 2. parse program desc
with open(model_filename, "rb") as f:
program_desc_str = f.read()
program_desc = core.ProgramDesc(program_desc_str)
if not core._is_program_version_supported(program_desc._version()):
raise ValueError("Unsupported program version: %d\n" %
program_desc._version())
# Step 3.
# - remove feed, fetch and useless scale-1 op
# - remove op_callstack attr
ops_to_remove = []
root_block = program_desc.block(0)
for i in six.moves.range(root_block.op_size()):
op = root_block.op(i)
if op.type() == 'feed':
ops_to_remove.append(i)
feed_var_name = cpt.to_bytes(op.input('X')[0])
root_block._remove_var(feed_var_name)
self._input_names.append(cpt.to_bytes(op.output('Out')[0]))
elif op.type() == 'scale' and op.output('Out')[0].startswith(
'save_infer_model/scale_'):
ops_to_remove.append(i)
out_var_name = cpt.to_bytes(op.output('Out')[0])
root_block._remove_var(out_var_name)
self._output_names.append(cpt.to_bytes(op.input('X')[0]))
self._output_descs.append(
root_block.find_var(cpt.to_bytes(op.input('X')[0])))
elif op.type() == 'fetch':
ops_to_remove.append(i)
fetch_var_name = cpt.to_bytes(op.output('Out')[0])
root_block._remove_var(fetch_var_name)
# NOTE: some old pre-train models have no extra scale_op
if not op.input('X')[0].startswith('save_infer_model/scale_'):
self._output_names.append(cpt.to_bytes(op.input('X')[0]))
self._output_descs.append(
root_block.find_var(cpt.to_bytes(op.input('X')[0])))
else:
if op.has_attr("op_callstack"):
op.remove_attr("op_callstack")
for op_idx in reversed(ops_to_remove):
root_block._remove_op(op_idx, op_idx + 1)
# NOTE: reverse feed vars
self._input_names.reverse()
return program_desc
@switch_to_static_graph
def _append_backward_desc(self):
assert self._infer_program_desc is not None, "The StaticModelRunner not initialized properly."
program_desc_copy = core.ProgramDesc(self._infer_program_desc)
# Step 1. set all `is_test` attributes to False
self._change_is_test_status(program_desc_copy, False)
# Step 2. prepare program and related var
# NOTE: To reuse backward interfaces, build Program firstly.
# Originally, there is no need to build a program, but need to almost
# rewrite a series of methods for append_backward for program_desc.
# Therefore, in order to reuse the method of backward.py, build the program here.
fwd_op_num = program_desc_copy.block(0).op_size()
program = self._build_program_by_desc(program_desc_copy)
# TODO: could the targets be in sub block?
targets = []
for out in self._output_descs:
targets.append(program.global_block().var(out.name()))
# Step 3. append backward
backward.gradients(targets=targets, inputs=[])
return program.desc
def _load_persisitable_dict(self, model_dir, params_filename=None):
load_dirname = os.path.normpath(model_dir)
assert self._infer_program_desc is not None, "The StaticModelRunner not initialized properly."
persis_vars = self._get_persis_vars(self._infer_program_desc)
load_var_map = {}
for each_var in persis_vars:
orig_each_name = each_var.name()
# append suffix
self._append_loaded_suffix_to_param(each_var)
# create output varbase
new_var = framework.ParamBase(
shape=each_var.shape(),
dtype=each_var.dtype(),
name=each_var.name(),
type=each_var.type(),
persistable=True)
if params_filename is None:
if not self._is_parameter(each_var):
continue
framework._dygraph_tracer().trace_op(
type='load',
inputs={},
outputs={'Out': new_var},
attrs={
'file_path': os.path.join(load_dirname, orig_each_name)
})
new_var.stop_gradient = False
self.add_parameter(name=new_var.name, parameter=new_var)
self._param_names.append(new_var.name)
else:
load_var_map[each_var.name()] = new_var
if params_filename is not None:
load_var_list = []
for name in sorted(load_var_map.keys()):
load_var_list.append(load_var_map[name])
framework._dygraph_tracer().trace_op(
type='load_combine',
inputs={},
outputs={'Out': load_var_list},
attrs={
'file_path': os.path.join(load_dirname, params_filename)
})
for each_var in persis_vars:
if not self._is_parameter(each_var):
continue
param = load_var_map[each_var.name()]
param.stop_gradient = False
self.add_parameter(name=param.name, parameter=param)
self._param_names.append(param.name)
def _recheck_stop_gradients(self):
assert self._train_program_desc is not None, "The StaticModelRunner not initialized properly."
# NOTE: After loading the model, the stop_gradient information
# of the original variable is lost, but if a parameter does not
# have a corresponding @GRAD variable in the backward program,
# it can be said that it is also stop_gradient
all_var_names = self._get_all_var_names(self._train_program_desc)
for param_name in self._parameters:
param_grad_name = param_name + core.grad_var_suffix()
if param_grad_name not in all_var_names:
self._parameters[param_name].stop_gradient = True
def _get_all_var_names(self, program_desc):
all_var_names = set()
for i in six.moves.range(program_desc.num_blocks()):
block = program_desc.block(i)
for var in block.all_vars():
all_var_names.add(var.name())
return all_var_names
def _get_persis_vars(self, program_desc):
persis_vars = []
for i in six.moves.range(program_desc.num_blocks()):
block = program_desc.block(i)
persis_vars.extend(
list(filter(self._is_persistable, block.all_vars())))
return persis_vars
@switch_to_static_graph
def _build_program_by_desc(self, program_desc):
prog = framework.Program()
prog.desc = program_desc
prog.blocks = [
framework.Block(prog, i)
for i in six.moves.range(prog.desc.num_blocks())
]
prog._sync_with_cpp()
return prog
def _is_persistable(self, var_desc):
if var_desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var_desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var_desc.type() == core.VarDesc.VarType.READER or \
var_desc.type() == core.VarDesc.VarType.RAW:
return False
return var_desc.persistable()
def _is_parameter(self, persis_var_desc):
assert self._infer_program_desc is not None, "The StaticModelRunner not initialized properly."
# 1. firstly, param should be input of op
input_ops = [] # op can be repeated
for block_idx in six.moves.range(self._infer_program_desc.num_blocks()):
block = self._infer_program_desc.block(block_idx)
for op_idx in six.moves.range(block.op_size()):
op = block.op(op_idx)
# NOTE: parameter is the input of a certain op
if persis_var_desc.name() in op.input_arg_names():
input_ops.append(op)
# 2. secondly, param should not be output of op or be same op's output
for block_idx in six.moves.range(self._infer_program_desc.num_blocks()):
block = self._infer_program_desc.block(block_idx)
for op_idx in six.moves.range(block.op_size()):
op = block.op(op_idx)
if persis_var_desc.name() in op.output_arg_names():
# such as batch_norm_op
if op in input_ops:
continue
else:
return False
return True
def _change_is_test_status(self, program_desc, is_test):
# change all `is_test` attributes
for i in six.moves.range(program_desc.num_blocks()):
block = program_desc.block(i)
for j in six.moves.range(block.op_size()):
op = block.op(j)
if op.has_attr('is_test'):
op._set_attr('is_test', is_test)
def _append_loaded_suffix(self, name):
"""
Append grad suffix to the given variable name
e.g. x ==> x@LOADED
"""
suffix = core.loaded_var_suffix()
name = cpt.to_text(name)
if suffix not in name:
name = name + suffix
return name
def _append_loaded_suffix_to_param(self, param_desc):
old_name = param_desc.name()
new_name = self._append_loaded_suffix(param_desc.name())
param_desc.set_name(new_name)
for block_idx in six.moves.range(self._infer_program_desc.num_blocks()):
block = self._infer_program_desc.block(block_idx)
for op_idx in six.moves.range(block.op_size()):
op = block.op(op_idx)
op._rename_input(old_name, new_name)
op._rename_output(old_name, new_name)