@ -1,4 +1,4 @@
# Copyright (c) 20 19 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 20 20 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,9 +14,17 @@
from __future__ import print_function
import random
import unittest
import numpy as np
import paddle
import paddle . nn as nn
from paddle import Model , set_device
from paddle . static import InputSpec as Input
from paddle . fluid . dygraph import Layer
from paddle . nn import BeamSearchDecoder , dynamic_decode
import paddle . fluid as fluid
import paddle . fluid . layers as layers
import paddle . fluid . core as core
@ -24,6 +32,8 @@ import paddle.fluid.core as core
from paddle . fluid . executor import Executor
from paddle . fluid import framework
paddle . enable_static ( )
class EncoderCell ( layers . RNNCell ) :
def __init__ ( self , num_layers , hidden_size , dropout_prob = 0. ) :
@ -436,6 +446,7 @@ class TestDynamicDecode(unittest.TestCase):
self . exe = Executor ( place )
def test_mle_train ( self ) :
paddle . enable_static ( )
self . model_hparams [ " decoding_strategy " ] = " train_greedy "
agent = SeqPGAgent (
model_cls = Seq2SeqModel ,
@ -468,6 +479,7 @@ class TestDynamicDecode(unittest.TestCase):
( iter_idx , reward . mean ( ) , cost ) )
def test_greedy_train ( self ) :
paddle . enable_static ( )
self . model_hparams [ " decoding_strategy " ] = " infer_greedy "
agent = SeqPGAgent (
model_cls = Seq2SeqModel ,
@ -493,6 +505,7 @@ class TestDynamicDecode(unittest.TestCase):
( iter_idx , reward . mean ( ) , cost ) )
def test_sample_train ( self ) :
paddle . enable_static ( )
self . model_hparams [ " decoding_strategy " ] = " infer_sample "
agent = SeqPGAgent (
model_cls = Seq2SeqModel ,
@ -518,6 +531,8 @@ class TestDynamicDecode(unittest.TestCase):
( iter_idx , reward . mean ( ) , cost ) )
def test_beam_search_infer ( self ) :
paddle . set_default_dtype ( " float32 " )
paddle . enable_static ( )
self . model_hparams [ " decoding_strategy " ] = " beam_search "
main_program = fluid . Program ( )
startup_program = fluid . Program ( )
@ -542,5 +557,154 @@ class TestDynamicDecode(unittest.TestCase):
fetch_list = [ output ] ) [ 0 ]
class ModuleApiTest ( unittest . TestCase ) :
@classmethod
def setUpClass ( cls ) :
cls . _np_rand_state = np . random . get_state ( )
cls . _py_rand_state = random . getstate ( )
cls . _random_seed = 123
np . random . seed ( cls . _random_seed )
random . seed ( cls . _random_seed )
cls . model_cls = type ( cls . __name__ + " Model " , ( Layer , ) , {
" __init__ " : cls . model_init_wrapper ( cls . model_init ) ,
" forward " : cls . model_forward
} )
@classmethod
def tearDownClass ( cls ) :
np . random . set_state ( cls . _np_rand_state )
random . setstate ( cls . _py_rand_state )
@staticmethod
def model_init_wrapper ( func ) :
def __impl__ ( self , * args , * * kwargs ) :
Layer . __init__ ( self )
func ( self , * args , * * kwargs )
return __impl__
@staticmethod
def model_init ( model , * args , * * kwargs ) :
raise NotImplementedError (
" model_init acts as `Model.__init__`, thus must implement it " )
@staticmethod
def model_forward ( model , * args , * * kwargs ) :
return model . module ( * args , * * kwargs )
def make_inputs ( self ) :
# TODO(guosheng): add default from `self.inputs`
raise NotImplementedError (
" model_inputs makes inputs for model, thus must implement it " )
def setUp ( self ) :
"""
For the model which wraps the module to be tested :
Set input data by ` self . inputs ` list
Set init argument values by ` self . attrs ` list / dict
Set model parameter values by ` self . param_states ` dict
Set expected output data by ` self . outputs ` list
We can create a model instance and run once with these .
"""
self . inputs = [ ]
self . attrs = { }
self . param_states = { }
self . outputs = [ ]
def _calc_output ( self , place , mode = " test " , dygraph = True ) :
if dygraph :
fluid . enable_dygraph ( place )
else :
fluid . disable_dygraph ( )
gen = paddle . manual_seed ( self . _random_seed )
gen . _is_init_py = False
paddle . framework . random . _manual_program_seed ( self . _random_seed )
scope = fluid . core . Scope ( )
with fluid . scope_guard ( scope ) :
layer = self . model_cls ( * * self . attrs ) if isinstance (
self . attrs , dict ) else self . model_cls ( * self . attrs )
model = Model ( layer , inputs = self . make_inputs ( ) )
model . prepare ( )
if self . param_states :
model . load ( self . param_states , optim_state = None )
return model . test_batch ( self . inputs )
def check_output_with_place ( self , place , mode = " test " ) :
dygraph_output = self . _calc_output ( place , mode , dygraph = True )
stgraph_output = self . _calc_output ( place , mode , dygraph = False )
expect_output = getattr ( self , " outputs " , None )
for actual_t , expect_t in zip ( dygraph_output , stgraph_output ) :
self . assertTrue ( np . allclose ( actual_t , expect_t , rtol = 1e-5 , atol = 0 ) )
if expect_output :
for actual_t , expect_t in zip ( dygraph_output , expect_output ) :
self . assertTrue (
np . allclose (
actual_t , expect_t , rtol = 1e-5 , atol = 0 ) )
def check_output ( self ) :
devices = [ " CPU " , " GPU " ] if fluid . is_compiled_with_cuda ( ) else [ " CPU " ]
for device in devices :
place = set_device ( device )
self . check_output_with_place ( place )
class TestBeamSearch ( ModuleApiTest ) :
def setUp ( self ) :
paddle . set_default_dtype ( " float64 " )
shape = ( 8 , 32 )
self . inputs = [
np . random . random ( shape ) . astype ( " float64 " ) ,
np . random . random ( shape ) . astype ( " float64 " )
]
self . outputs = None
self . attrs = {
" vocab_size " : 100 ,
" embed_dim " : 32 ,
" hidden_size " : 32 ,
}
self . param_states = { }
@staticmethod
def model_init ( self ,
vocab_size ,
embed_dim ,
hidden_size ,
bos_id = 0 ,
eos_id = 1 ,
beam_size = 2 ,
max_step_num = 2 ) :
embedder = paddle . fluid . dygraph . Embedding (
size = [ vocab_size , embed_dim ] , dtype = " float64 " )
output_layer = nn . Linear ( hidden_size , vocab_size )
cell = nn . LSTMCell ( embed_dim , hidden_size )
self . max_step_num = max_step_num
self . beam_search_decoder = BeamSearchDecoder (
cell ,
start_token = bos_id ,
end_token = eos_id ,
beam_size = beam_size ,
embedding_fn = embedder ,
output_fn = output_layer )
@staticmethod
def model_forward ( model , init_hidden , init_cell ) :
return dynamic_decode (
model . beam_search_decoder , [ init_hidden , init_cell ] ,
max_step_num = model . max_step_num ,
impute_finished = True ,
is_test = True ) [ 0 ]
def make_inputs ( self ) :
inputs = [
Input ( [ None , self . inputs [ 0 ] . shape [ - 1 ] ] , " float64 " , " init_hidden " ) ,
Input ( [ None , self . inputs [ 1 ] . shape [ - 1 ] ] , " float64 " , " init_cell " ) ,
]
return inputs
def test_check_output ( self ) :
self . check_output ( )
if __name__ == ' __main__ ' :
unittest . main ( )