Merge pull request #15114 from sneaxiy/remove_data_balance_unittest
Delete data_balance unittestrevert-15207-remove_op_handle_lock_and_fix_var
commit
ce093d2d5c
@ -1,199 +0,0 @@
|
|||||||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from __future__ import print_function
|
|
||||||
|
|
||||||
import unittest
|
|
||||||
import paddle.fluid as fluid
|
|
||||||
import paddle
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
class TestDataBalance(unittest.TestCase):
|
|
||||||
def prepare_data(self):
|
|
||||||
def fake_data_generator():
|
|
||||||
for n in range(self.total_ins_num):
|
|
||||||
yield np.ones((3, 4)) * n, n
|
|
||||||
|
|
||||||
# Prepare data
|
|
||||||
with fluid.program_guard(fluid.Program(), fluid.Program()):
|
|
||||||
reader = paddle.batch(
|
|
||||||
fake_data_generator, batch_size=self.batch_size)
|
|
||||||
feeder = fluid.DataFeeder(
|
|
||||||
feed_list=[
|
|
||||||
fluid.layers.data(
|
|
||||||
name='image', shape=[3, 4], dtype='float32'),
|
|
||||||
fluid.layers.data(
|
|
||||||
name='label', shape=[1], dtype='int64'),
|
|
||||||
],
|
|
||||||
place=fluid.CPUPlace())
|
|
||||||
self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file(
|
|
||||||
self.data_file_name, reader, feeder)
|
|
||||||
|
|
||||||
def prepare_lod_data(self):
|
|
||||||
def fake_data_generator():
|
|
||||||
for n in range(1, self.total_ins_num + 1):
|
|
||||||
d1 = (np.ones((n, 3)) * n).astype('float32')
|
|
||||||
d2 = (np.array(n).reshape((1, 1))).astype('int32')
|
|
||||||
yield d1, d2
|
|
||||||
|
|
||||||
# Prepare lod data
|
|
||||||
with fluid.program_guard(fluid.Program(), fluid.Program()):
|
|
||||||
with fluid.recordio_writer.create_recordio_writer(
|
|
||||||
filename=self.lod_data_file_name) as writer:
|
|
||||||
eof = False
|
|
||||||
generator = fake_data_generator()
|
|
||||||
while (not eof):
|
|
||||||
data_batch = [
|
|
||||||
np.array([]).reshape((0, 3)), np.array([]).reshape(
|
|
||||||
(0, 1))
|
|
||||||
]
|
|
||||||
lod = [0]
|
|
||||||
for _ in range(self.batch_size):
|
|
||||||
try:
|
|
||||||
ins = next(generator)
|
|
||||||
except StopIteration:
|
|
||||||
eof = True
|
|
||||||
break
|
|
||||||
for i, d in enumerate(ins):
|
|
||||||
data_batch[i] = np.concatenate(
|
|
||||||
(data_batch[i], d), axis=0)
|
|
||||||
lod.append(lod[-1] + ins[0].shape[0])
|
|
||||||
if data_batch[0].shape[0] > 0:
|
|
||||||
for i, d in enumerate(data_batch):
|
|
||||||
t = fluid.LoDTensor()
|
|
||||||
t.set(data_batch[i], fluid.CPUPlace())
|
|
||||||
if i == 0:
|
|
||||||
t.set_lod([lod])
|
|
||||||
writer.append_tensor(t)
|
|
||||||
writer.complete_append_tensor()
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.use_cuda = fluid.core.is_compiled_with_cuda()
|
|
||||||
self.data_file_name = './data_balance_test.recordio'
|
|
||||||
self.lod_data_file_name = './data_balance_with_lod_test.recordio'
|
|
||||||
self.total_ins_num = 50
|
|
||||||
self.batch_size = 12
|
|
||||||
self.prepare_data()
|
|
||||||
self.prepare_lod_data()
|
|
||||||
|
|
||||||
def main(self):
|
|
||||||
main_prog = fluid.Program()
|
|
||||||
startup_prog = fluid.Program()
|
|
||||||
with fluid.program_guard(main_prog, startup_prog):
|
|
||||||
data_reader = fluid.layers.io.open_files(
|
|
||||||
filenames=[self.data_file_name],
|
|
||||||
shapes=[[-1, 3, 4], [-1, 1]],
|
|
||||||
lod_levels=[0, 0],
|
|
||||||
dtypes=['float32', 'int64'])
|
|
||||||
if self.use_cuda:
|
|
||||||
data_reader = fluid.layers.double_buffer(data_reader)
|
|
||||||
image, label = fluid.layers.read_file(data_reader)
|
|
||||||
|
|
||||||
place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace()
|
|
||||||
exe = fluid.Executor(place)
|
|
||||||
exe.run(startup_prog)
|
|
||||||
|
|
||||||
build_strategy = fluid.BuildStrategy()
|
|
||||||
build_strategy.enable_data_balance = True
|
|
||||||
parallel_exe = fluid.ParallelExecutor(
|
|
||||||
use_cuda=self.use_cuda,
|
|
||||||
main_program=main_prog,
|
|
||||||
build_strategy=build_strategy)
|
|
||||||
|
|
||||||
if (parallel_exe.device_count > self.batch_size):
|
|
||||||
print("WARNING: Unittest TestDataBalance skipped. \
|
|
||||||
For the result is not correct when device count \
|
|
||||||
is larger than batch size.")
|
|
||||||
return
|
|
||||||
fetch_list = [image.name, label.name]
|
|
||||||
|
|
||||||
data_appeared = [False] * self.total_ins_num
|
|
||||||
while (True):
|
|
||||||
try:
|
|
||||||
image_val, label_val = parallel_exe.run(fetch_list,
|
|
||||||
return_numpy=True)
|
|
||||||
except fluid.core.EOFException:
|
|
||||||
break
|
|
||||||
ins_num = image_val.shape[0]
|
|
||||||
broadcasted_label = np.ones(
|
|
||||||
(ins_num, 3, 4)) * label_val.reshape((ins_num, 1, 1))
|
|
||||||
self.assertEqual(image_val.all(), broadcasted_label.all())
|
|
||||||
for l in label_val:
|
|
||||||
self.assertFalse(data_appeared[l[0]])
|
|
||||||
data_appeared[l[0]] = True
|
|
||||||
for i in data_appeared:
|
|
||||||
self.assertTrue(i)
|
|
||||||
|
|
||||||
def main_lod(self):
|
|
||||||
main_prog = fluid.Program()
|
|
||||||
startup_prog = fluid.Program()
|
|
||||||
with fluid.program_guard(main_prog, startup_prog):
|
|
||||||
data_reader = fluid.layers.io.open_files(
|
|
||||||
filenames=[self.lod_data_file_name],
|
|
||||||
shapes=[[-1, 3], [-1, 1]],
|
|
||||||
lod_levels=[1, 0],
|
|
||||||
dtypes=['float32', 'int32'])
|
|
||||||
ins, label = fluid.layers.read_file(data_reader)
|
|
||||||
|
|
||||||
place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace()
|
|
||||||
exe = fluid.Executor(place)
|
|
||||||
exe.run(startup_prog)
|
|
||||||
build_strategy = fluid.BuildStrategy()
|
|
||||||
build_strategy.enable_data_balance = True
|
|
||||||
parallel_exe = fluid.ParallelExecutor(
|
|
||||||
use_cuda=self.use_cuda,
|
|
||||||
main_program=main_prog,
|
|
||||||
build_strategy=build_strategy)
|
|
||||||
|
|
||||||
if parallel_exe.device_count > self.batch_size:
|
|
||||||
print("WARNING: Unittest TestDataBalance skipped. \
|
|
||||||
For the result is not correct when device count \
|
|
||||||
is larger than batch size.")
|
|
||||||
exit(0)
|
|
||||||
fetch_list = [ins.name, label.name]
|
|
||||||
|
|
||||||
data_appeared = [False] * self.total_ins_num
|
|
||||||
while (True):
|
|
||||||
try:
|
|
||||||
ins_tensor, label_tensor = parallel_exe.run(
|
|
||||||
fetch_list, return_numpy=False)
|
|
||||||
except fluid.core.EOFException:
|
|
||||||
break
|
|
||||||
|
|
||||||
ins_val = np.array(ins_tensor)
|
|
||||||
label_val = np.array(label_tensor)
|
|
||||||
ins_lod = ins_tensor.lod()[0]
|
|
||||||
self.assertEqual(ins_val.shape[1], 3)
|
|
||||||
self.assertEqual(label_val.shape[1], 1)
|
|
||||||
self.assertEqual(len(ins_lod) - 1, label_val.shape[0])
|
|
||||||
for i in range(0, len(ins_lod) - 1):
|
|
||||||
ins_elem = ins_val[ins_lod[i]:ins_lod[i + 1]][:]
|
|
||||||
label_elem = label_val[i][0]
|
|
||||||
self.assertEqual(ins_elem.all(), label_elem.all())
|
|
||||||
self.assertFalse(data_appeared[int(label_elem - 1)])
|
|
||||||
data_appeared[int(label_elem - 1)] = True
|
|
||||||
|
|
||||||
for i in data_appeared:
|
|
||||||
self.assertTrue(i)
|
|
||||||
|
|
||||||
def test_all(self):
|
|
||||||
self.main()
|
|
||||||
self.main_lod()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# Disable data balance unittest, because data balance would be removed
|
|
||||||
# unittest.main()
|
|
||||||
pass
|
|
Loading…
Reference in new issue