You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/tests/unittests/test_basic_lstm_api.py

306 lines
11 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import paddle.fluid.core as core
from paddle.fluid.contrib.layers import basic_lstm
from paddle.fluid.executor import Executor
from paddle.fluid import framework
import numpy as np
SIGMOID_THRESHOLD_MIN = -40.0
SIGMOID_THRESHOLD_MAX = 13.0
EXP_MAX_INPUT = 40.0
def sigmoid(x):
y = np.copy(x)
y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN
y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX
return 1. / (1. + np.exp(-y))
def tanh(x):
y = -2. * x
y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT
return (2. / (1. + np.exp(y))) - 1.
def lstm_np(input,
init_h,
init_c,
hidden_size,
gate_weight,
gate_bias,
num_layers=1,
batch_first=False,
is_bidirect=False,
sequence_length=None,
forget_bias=1.0):
def step(step_in, pre_hidden, pre_cell, gate_w, gate_b):
concat_1 = np.concatenate([step_in, pre_hidden], 1)
gate_input = np.matmul(concat_1, gate_w)
gate_input += gate_b
i, j, f, o = np.split(gate_input, indices_or_sections=4, axis=1)
new_cell = pre_cell * sigmoid(f + forget_bias) + sigmoid(i) * tanh(j)
new_hidden = tanh(new_cell) * sigmoid(o)
return new_hidden, new_cell
if batch_first:
input = np.tranpose(input, [1, 0, 2])
if mask is not None:
mask = np.transpose(mask, [1, 0])
batch_size = input.shape[1]
mask = None
if sequence_length is not None:
max_seq_len = input.shape[0]
mask = np.zeros([batch_size, max_seq_len])
for i, len in enumerate(sequence_length):
mask[i, :len] = 1.0
mask = np.transpose(mask, [1, 0])
direc_num = 1
if is_bidirect:
direc_num = 2
if init_h:
init_h = np.reshape(init_h, [num_layers, direc_num, -1, hidden_size])
init_c = np.reshape(init_c, [num_layers, direc_num, -1, hidden_size])
else:
init_h = np.zeros([num_layers, direc_num, batch_size, hidden_size])
init_c = np.zeros([num_layers, direc_num, batch_size, hidden_size])
def get_single_direction_output(rnn_input, mask=None, direc_index=0):
seq_len = rnn_input.shape[0]
output = []
# init pre hidden
pre_hidden_array = []
pre_cell_array = []
for i in range(num_layers):
pre_hidden_array.append(init_h[i, direc_index])
pre_cell_array.append(init_c[i, direc_index])
for i in range(seq_len):
step_input = rnn_input[i]
if mask is not None:
step_mask = mask[i]
step_mask = np.reshape(step_mask, [-1, 1])
#print("np mask", step_mask.shape )
for i in range(num_layers):
new_hidden, new_cell = step(
step_input, pre_hidden_array[i], pre_cell_array[i],
gate_weight[direc_index * num_layers + i],
gate_bias[direc_index * num_layers + i])
if mask is not None:
new_hidden = np.multiply(
new_hidden, step_mask) - np.multiply(
pre_hidden_array[i], (step_mask - 1.0))
#new_hidden = new_hidden * step_mask - pre_hidden_array[i] * ( step_mask -1 )
#new_cell = new_cell * step_mask - pre_cell_array[i] * (step_mask -1)
new_cell = np.multiply(new_cell, step_mask) - np.multiply(
pre_cell_array[i], (step_mask - 1.0))
pre_hidden_array[i] = new_hidden
pre_cell_array[i] = new_cell
step_input = new_hidden
output.append(step_input)
rnn_out = np.concatenate(output, 0)
rnn_out = np.reshape(rnn_out, [seq_len, -1, hidden_size])
last_hidden_out = np.concatenate(pre_hidden_array, 0)
last_hidden_out = np.reshape(last_hidden_out,
[num_layers, -1, hidden_size])
last_cell_out = np.concatenate(pre_cell_array, 0)
last_cell_out = np.reshape(last_cell_out, [num_layers, -1, hidden_size])
return rnn_out, last_hidden_out, last_cell_out
fw_rnn_out, fw_last_hidden, fw_last_cell = get_single_direction_output(
input, mask, direc_index=0)
if is_bidirect:
bw_input = input[::-1]
bw_mask = None
if mask is not None:
bw_mask = mask[::-1]
bw_rnn_out, bw_last_hidden, bw_last_cell = get_single_direction_output(
bw_input, bw_mask, direc_index=1)
bw_rnn_out = bw_rnn_out[::-1]
rnn_out = np.concatenate([fw_rnn_out, bw_rnn_out], 2)
last_hidden = np.concatenate([fw_last_hidden, bw_last_hidden], 1)
last_hidden = np.reshape(last_hidden,
[num_layers * direc_num, -1, hidden_size])
last_cell = np.concatenate([fw_last_cell, bw_last_cell], 1)
last_cell = np.reshape(last_cell,
[num_layers * direc_num, -1, hidden_size])
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
else:
rnn_out = fw_rnn_out
last_hidden = fw_last_hidden
last_cell = fw_last_cell
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
class TestBasicLSTMApi(unittest.TestCase):
def setUp(self):
self.hidden_size = 10
self.batch_size = 5
self.seq_len = 6
self.num_layers = 2
self.is_bidirect = True
self.batch_first = False
self.forget_bias = 1.0
def test_run(self):
x = layers.data(
name='x',
shape=[-1, self.batch_size, self.hidden_size],
dtype='float32')
sequence_length = layers.data(
name="sequence_length", shape=[-1], dtype='float32')
rnn_out, last_hidden, last_cell = basic_lstm( x, None, None, self.hidden_size, num_layers=self.num_layers, \
batch_first = self.batch_first, bidirectional=self.is_bidirect, sequence_length=sequence_length, forget_bias = self.forget_bias )
last_hidden.persisbale = True
rnn_out.persisbale = True
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
place = core.CPUPlace()
exe = Executor(place)
exe.run(framework.default_startup_program())
param_list = fluid.default_main_program().block(0).all_parameters()
# process weight and bias
gate_weight = []
gate_bias = []
for i in range(self.num_layers):
gate_w_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.w_0"
gate_b_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.b_0"
gate_w = np.array(fluid.global_scope().find_var(gate_w_name)
.get_tensor())
gate_w = np.random.uniform(
-0.1, 0.1, size=gate_w.shape).astype('float32')
fluid.global_scope().find_var(gate_w_name).get_tensor().set(gate_w,
place)
gate_b = np.array(fluid.global_scope().find_var(gate_b_name)
.get_tensor())
gate_b = np.random.uniform(
-0.1, 0.1, size=gate_b.shape).astype('float32')
fluid.global_scope().find_var(gate_b_name).get_tensor().set(gate_b,
place)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
if self.is_bidirect:
for i in range(self.num_layers):
gate_w_name = "basic_lstm_reverse_layers_" + str(
i) + "/BasicLSTMUnit_0.w_0"
gate_b_name = "basic_lstm_reverse_layers_" + str(
i) + "/BasicLSTMUnit_0.b_0"
gate_w = np.array(fluid.global_scope().find_var(gate_w_name)
.get_tensor())
gate_w = np.random.uniform(
-0.1, 0.1, size=gate_w.shape).astype('float32')
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place)
gate_b = np.array(fluid.global_scope().find_var(gate_b_name)
.get_tensor())
gate_b = np.random.uniform(
-0.1, 0.1, size=gate_b.shape).astype('float32')
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
step_input_np = np.random.uniform(-0.1, 0.1, (
self.seq_len, self.batch_size, self.hidden_size)).astype('float32')
sequence_length_np = np.random.randint(
self.seq_len // 2, self.seq_len,
size=(self.batch_size)).astype('int64')
out = exe.run(
feed={'x': step_input_np,
'sequence_length': sequence_length_np},
fetch_list=[rnn_out, last_hidden, last_cell])
api_rnn_out = out[0]
api_last_hidden = out[1]
api_last_cell = out[2]
np_out = lstm_np(
step_input_np,
None,
None,
self.hidden_size,
gate_weight,
gate_bias,
num_layers=self.num_layers,
batch_first=self.batch_first,
is_bidirect=self.is_bidirect,
sequence_length=sequence_length_np)
self.assertTrue(np.allclose(api_rnn_out, np_out[0], rtol=1e-4, atol=0))
self.assertTrue(
np.allclose(
api_last_hidden, np_out[1], rtol=1e-4, atol=0))
self.assertTrue(
np.allclose(
api_last_cell, np_out[2], rtol=1e-4, atol=0))
if __name__ == '__main__':
unittest.main()