You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/tests/unittests/test_multiprocess_dataloade...

128 lines
4.2 KiB

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import os
import sys
import six
import time
import unittest
import multiprocessing
import numpy as np
import paddle.fluid as fluid
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.dygraph.nn import Linear
from paddle.fluid.dygraph.base import to_variable
from test_multiprocess_dataloader_static import RandomDataset, prepare_places
from test_multiprocess_dataloader_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM
class SimpleFCNet(fluid.dygraph.Layer):
def __init__(self):
super(SimpleFCNet, self).__init__()
param_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.8))
bias_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.5))
self._fcs = []
in_channel = IMAGE_SIZE
for hidden_size in [10, 20, 30]:
self._fcs.append(
Linear(
in_channel,
hidden_size,
act='tanh',
param_attr=param_attr,
bias_attr=bias_attr))
in_channel = hidden_size
self._fcs.append(
Linear(
in_channel,
CLASS_NUM,
act='softmax',
param_attr=param_attr,
bias_attr=bias_attr))
def forward(self, image):
out = image
for fc in self._fcs:
out = fc(out)
return out
class TestDygraphDataLoader(unittest.TestCase):
def run_main(self, num_workers, places):
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
with fluid.dygraph.guard(places[0]):
fc_net = SimpleFCNet()
optimizer = fluid.optimizer.Adam(parameter_list=fc_net.parameters())
dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM)
dataloader = DataLoader(
dataset,
places=places,
num_workers=num_workers,
batch_size=BATCH_SIZE,
drop_last=True)
assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE)
step_list = []
loss_list = []
start_t = time.time()
for _ in six.moves.range(EPOCH_NUM):
step = 0
for image, label in dataloader():
out = fc_net(image)
loss = fluid.layers.cross_entropy(out, label)
avg_loss = fluid.layers.reduce_mean(loss)
avg_loss.backward()
optimizer.minimize(avg_loss)
fc_net.clear_gradients()
loss_list.append(np.mean(avg_loss.numpy()))
step += 1
step_list.append(step)
end_t = time.time()
ret = {
"time": end_t - start_t,
"step": step_list,
"loss": np.array(loss_list)
}
print("time cost", ret['time'], 'step_list', ret['step'])
return ret
def test_main(self):
# dynamic graph do not run with_data_parallel
for p in prepare_places(False):
results = []
for num_workers in [0, 2]:
print(self.__class__.__name__, p, num_workers)
sys.stdout.flush()
ret = self.run_main(num_workers=num_workers, places=p)
results.append(ret)
diff = np.max(
np.abs(results[0]['loss'] - results[1]['loss']) /
np.abs(results[0]['loss']))
self.assertLess(diff, 1e-2)
if __name__ == '__main__':
unittest.main()