You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/incubate/hapi/tests/test_model.py

485 lines
16 KiB

# copyright (c) 2020 paddlepaddle authors. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import unittest
import os
import numpy as np
import shutil
import tempfile
from paddle import fluid
from paddle.nn import Conv2D, Pool2D, Linear, ReLU, Sequential
from paddle.fluid.dygraph.base import to_variable
from paddle.incubate.hapi.model import Model, Input, set_device
from paddle.incubate.hapi.loss import CrossEntropy
from paddle.incubate.hapi.metrics import Accuracy
from paddle.incubate.hapi.datasets import MNIST
from paddle.incubate.hapi.vision.models import LeNet
from paddle.incubate.hapi.distributed import DistributedBatchSampler, prepare_distributed_context
class LeNetDygraph(fluid.dygraph.Layer):
def __init__(self, num_classes=10, classifier_activation='softmax'):
super(LeNetDygraph, self).__init__()
self.num_classes = num_classes
self.features = Sequential(
Conv2D(
1, 6, 3, stride=1, padding=1),
ReLU(),
Pool2D(2, 'max', 2),
Conv2D(
6, 16, 5, stride=1, padding=0),
ReLU(),
Pool2D(2, 'max', 2))
if num_classes > 0:
self.fc = Sequential(
Linear(400, 120),
Linear(120, 84),
Linear(
84, 10, act=classifier_activation))
def forward(self, inputs):
x = self.features(inputs)
if self.num_classes > 0:
x = fluid.layers.flatten(x, 1)
x = self.fc(x)
return x
class MnistDataset(MNIST):
def __init__(self, mode, return_label=True, sample_num=None):
super(MnistDataset, self).__init__(mode=mode)
self.return_label = return_label
if sample_num:
self.images = self.images[:sample_num]
self.labels = self.labels[:sample_num]
def __getitem__(self, idx):
img, label = self.images[idx], self.labels[idx]
img = np.reshape(img, [1, 28, 28])
if self.return_label:
return img, np.array(self.labels[idx]).astype('int64')
return img,
def __len__(self):
return len(self.images)
def compute_acc(pred, label):
pred = np.argmax(pred, -1)
label = np.array(label)
correct = pred[:, np.newaxis] == label
return np.sum(correct) / correct.shape[0]
def dynamic_train(model, dataloader):
optim = fluid.optimizer.Adam(
learning_rate=0.001, parameter_list=model.parameters())
model.train()
for inputs, labels in dataloader:
outputs = model(inputs)
loss = fluid.layers.cross_entropy(outputs, labels)
avg_loss = fluid.layers.reduce_sum(loss)
avg_loss.backward()
optim.minimize(avg_loss)
model.clear_gradients()
def dynamic_evaluate(model, dataloader):
with fluid.dygraph.no_grad():
model.eval()
cnt = 0
for inputs, labels in dataloader:
outputs = model(inputs)
cnt += (np.argmax(outputs.numpy(), -1)[:, np.newaxis] ==
labels.numpy()).astype('int').sum()
return cnt / len(dataloader.dataset)
@unittest.skipIf(not fluid.is_compiled_with_cuda(),
'CPU testing is not supported')
class TestModel(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not fluid.is_compiled_with_cuda():
self.skipTest('module not tested when ONLY_CPU compling')
cls.device = set_device('gpu')
fluid.enable_dygraph(cls.device)
sp_num = 1280
cls.train_dataset = MnistDataset(mode='train', sample_num=sp_num)
cls.val_dataset = MnistDataset(mode='test', sample_num=sp_num)
cls.test_dataset = MnistDataset(
mode='test', return_label=False, sample_num=sp_num)
cls.train_loader = fluid.io.DataLoader(
cls.train_dataset, places=cls.device, batch_size=64)
cls.val_loader = fluid.io.DataLoader(
cls.val_dataset, places=cls.device, batch_size=64)
cls.test_loader = fluid.io.DataLoader(
cls.test_dataset, places=cls.device, batch_size=64)
seed = 333
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
dy_lenet = LeNetDygraph()
cls.init_param = dy_lenet.state_dict()
dynamic_train(dy_lenet, cls.train_loader)
cls.acc1 = dynamic_evaluate(dy_lenet, cls.val_loader)
cls.inputs = [Input([-1, 1, 28, 28], 'float32', name='image')]
cls.labels = [Input([None, 1], 'int64', name='label')]
cls.save_dir = tempfile.mkdtemp()
cls.weight_path = os.path.join(cls.save_dir, 'lenet')
fluid.dygraph.save_dygraph(dy_lenet.state_dict(), cls.weight_path)
fluid.disable_dygraph()
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.save_dir)
def test_fit_dygraph(self):
self.fit(True)
def test_fit_static(self):
self.fit(False)
def test_evaluate_dygraph(self):
self.evaluate(True)
def test_evaluate_static(self):
self.evaluate(False)
def test_predict_dygraph(self):
self.predict(True)
def test_predict_static(self):
self.predict(False)
def test_prepare_context(self):
prepare_distributed_context()
def fit(self, dynamic):
fluid.enable_dygraph(self.device) if dynamic else None
seed = 333
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
model = LeNet()
optim_new = fluid.optimizer.Adam(
learning_rate=0.001, parameter_list=model.parameters())
model.prepare(
optim_new,
loss_function=CrossEntropy(average=False),
metrics=Accuracy(),
inputs=self.inputs,
labels=self.labels)
model.fit(self.train_dataset, batch_size=64, shuffle=False)
result = model.evaluate(self.val_dataset, batch_size=64)
np.testing.assert_allclose(result['acc'], self.acc1)
train_sampler = DistributedBatchSampler(
self.train_dataset, batch_size=64, shuffle=False)
val_sampler = DistributedBatchSampler(
self.val_dataset, batch_size=64, shuffle=False)
train_loader = fluid.io.DataLoader(
self.train_dataset,
batch_sampler=train_sampler,
places=self.device,
return_list=True)
val_loader = fluid.io.DataLoader(
self.val_dataset,
batch_sampler=val_sampler,
places=self.device,
return_list=True)
model.fit(train_loader, val_loader)
fluid.disable_dygraph() if dynamic else None
def evaluate(self, dynamic):
fluid.enable_dygraph(self.device) if dynamic else None
model = LeNet()
model.prepare(
metrics=Accuracy(), inputs=self.inputs, labels=self.labels)
model.load(self.weight_path)
result = model.evaluate(self.val_dataset, batch_size=64)
np.testing.assert_allclose(result['acc'], self.acc1)
sampler = DistributedBatchSampler(
self.val_dataset, batch_size=64, shuffle=False)
val_loader = fluid.io.DataLoader(
self.val_dataset,
batch_sampler=sampler,
places=self.device,
return_list=True)
model.evaluate(val_loader)
fluid.disable_dygraph() if dynamic else None
def predict(self, dynamic):
fluid.enable_dygraph(self.device) if dynamic else None
model = LeNet()
model.prepare(inputs=self.inputs)
model.load(self.weight_path)
output = model.predict(
self.test_dataset, batch_size=64, stack_outputs=True)
np.testing.assert_equal(output[0].shape[0], len(self.test_dataset))
acc = compute_acc(output[0], self.val_dataset.labels)
np.testing.assert_allclose(acc, self.acc1)
sampler = DistributedBatchSampler(
self.test_dataset, batch_size=64, shuffle=False)
test_loader = fluid.io.DataLoader(
self.test_dataset,
batch_sampler=sampler,
places=self.device,
return_list=True)
model.evaluate(test_loader)
fluid.disable_dygraph() if dynamic else None
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self._fc = Linear(20, 10, act='softmax')
def forward(self, x):
y = self._fc(x)
return y
class TestModelFunction(unittest.TestCase):
def set_seed(self, seed=1024):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
def test_train_batch(self, dynamic=True):
dim = 20
data = np.random.random(size=(4, dim)).astype(np.float32)
label = np.random.randint(0, 10, size=(4, 1)).astype(np.int64)
def get_expect():
fluid.enable_dygraph(fluid.CPUPlace())
self.set_seed()
m = MyModel()
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=m.parameters())
m.train()
output = m(to_variable(data))
l = to_variable(label)
loss = fluid.layers.cross_entropy(output, l)
avg_loss = fluid.layers.reduce_sum(loss)
avg_loss.backward()
optim.minimize(avg_loss)
m.clear_gradients()
fluid.disable_dygraph()
return avg_loss.numpy()
ref = get_expect()
for dynamic in [True, False]:
device = set_device('cpu')
fluid.enable_dygraph(device) if dynamic else None
self.set_seed()
model = MyModel()
optim2 = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
inputs = [Input([None, dim], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
model.prepare(
optim2,
loss_function=CrossEntropy(average=False),
inputs=inputs,
labels=labels,
device=device)
loss, = model.train_batch([data], [label])
np.testing.assert_allclose(loss.flatten(), ref.flatten())
fluid.disable_dygraph() if dynamic else None
def test_test_batch(self, dynamic=True):
dim = 20
data = np.random.random(size=(4, dim)).astype(np.float32)
def get_expect():
fluid.enable_dygraph(fluid.CPUPlace())
self.set_seed()
m = MyModel()
m.eval()
output = m(to_variable(data))
fluid.disable_dygraph()
return output.numpy()
ref = get_expect()
for dynamic in [True, False]:
device = set_device('cpu')
fluid.enable_dygraph(device) if dynamic else None
self.set_seed()
model = MyModel()
inputs = [Input([None, dim], 'float32', name='x')]
model.prepare(inputs=inputs, device=device)
out, = model.test_batch([data])
np.testing.assert_allclose(out, ref)
fluid.disable_dygraph() if dynamic else None
def test_save_load(self):
path = tempfile.mkdtemp()
for dynamic in [True, False]:
device = set_device('cpu')
fluid.enable_dygraph(device) if dynamic else None
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
model.prepare(
inputs=inputs,
optimizer=optim,
loss_function=CrossEntropy(average=False),
labels=labels)
model.save(path + '/test')
model.load(path + '/test')
shutil.rmtree(path)
fluid.disable_dygraph() if dynamic else None
def test_dynamic_save_static_load(self):
path = tempfile.mkdtemp()
# for dynamic in [True, False]:
device = set_device('cpu')
fluid.enable_dygraph(device) #if dynamic else None
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
model.prepare(
inputs=inputs,
optimizer=optim,
loss_function=CrossEntropy(average=False),
labels=labels)
model.save(path + '/test')
fluid.disable_dygraph()
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
model.prepare(
inputs=inputs,
optimizer=optim,
loss_function=CrossEntropy(average=False),
labels=labels)
model.load(path + '/test')
shutil.rmtree(path)
def test_static_save_dynamic_load(self):
path = tempfile.mkdtemp()
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
model.prepare(
inputs=inputs,
optimizer=optim,
loss_function=CrossEntropy(average=False),
labels=labels)
model.save(path + '/test')
device = set_device('cpu')
fluid.enable_dygraph(device) #if dynamic else None
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
labels = [Input([None, 1], 'int64', name='label')]
optim = fluid.optimizer.SGD(learning_rate=0.001,
parameter_list=model.parameters())
model.prepare(
inputs=inputs,
optimizer=optim,
loss_function=CrossEntropy(average=False),
labels=labels)
model.load(path + '/test')
shutil.rmtree(path)
fluid.disable_dygraph()
def test_parameters(self):
for dynamic in [True, False]:
device = set_device('cpu')
fluid.enable_dygraph(device) if dynamic else None
model = MyModel()
inputs = [Input([None, 20], 'float32', name='x')]
model.prepare(inputs=inputs)
params = model.parameters()
self.assertTrue(params[0].shape[0] == 20)
self.assertTrue(params[0].shape[1] == 10)
fluid.disable_dygraph() if dynamic else None
def test_export_deploy_model(self):
model = LeNet()
inputs = [Input([-1, 1, 28, 28], 'float32', name='image')]
model.prepare(inputs=inputs)
save_dir = tempfile.mkdtemp()
if not os.path.exists(save_dir):
os.makedirs(save_dir)
tensor_img = np.array(
np.random.random((1, 1, 28, 28)), dtype=np.float32)
ori_results = model.test_batch(tensor_img)
model.save_inference_model(save_dir)
place = fluid.CPUPlace() if not fluid.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
[inference_program, feed_target_names, fetch_targets] = (
fluid.io.load_inference_model(
dirname=save_dir, executor=exe))
results = exe.run(inference_program,
feed={feed_target_names[0]: tensor_img},
fetch_list=fetch_targets)
np.testing.assert_allclose(results, ori_results)
shutil.rmtree(save_dir)
if __name__ == '__main__':
unittest.main()