You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/v2/tests/test_data_feeder.py

161 lines
5.8 KiB

# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import py_paddle.swig_paddle as api
import numpy as np
from paddle.v2 import data_type
from paddle.v2.data_feeder import DataFeeder
class DataFeederTest(unittest.TestCase):
def dense_reader(self, size):
data = np.random.random(size)
return data
def sparse_binary_reader(self, high, size_limit, non_empty=False):
num = np.random.randint(size_limit) # num could be 0
while non_empty and num == 0:
num = np.random.randint(size_limit)
return np.random.randint(high, size=num).tolist()
def test_dense_vector(self):
def compare(input):
feeder = DataFeeder([('image', data_type.dense_vector(784))],
{'image': 0})
arg = feeder(input)
output = arg.getSlotValue(0).copyToNumpyMat()
input = np.array(input, dtype='float32')
self.assertAlmostEqual(input.all(), output.all())
# test numpy array
batch_size = 32
dim = 784
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(self.dense_reader(dim))
data.append(each_sample)
compare(data)
# test list
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(self.dense_reader(dim).tolist())
data.append(each_sample)
compare(data)
def test_sparse_binary(self):
dim = 10000
batch_size = 32
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(self.sparse_binary_reader(dim, 50))
data.append(each_sample)
feeder = DataFeeder([('input', data_type.sparse_binary_vector(dim))],
{'input': 0})
arg = feeder(data)
output = arg.getSlotValue(0)
assert isinstance(output, api.Matrix)
for i in xrange(batch_size):
self.assertEqual(output.getSparseRowCols(i), data[i][0])
def test_sparse(self):
dim = 10000
batch_size = 32
v = []
w = []
data = []
for dat in xrange(batch_size):
each_sample = []
a = self.sparse_binary_reader(dim, 40, non_empty=True)
b = self.dense_reader(len(a)).tolist()
v.append(a)
w.append(b[0])
each_sample.append(zip(a, b))
data.append(each_sample)
feeder = DataFeeder([('input', data_type.sparse_vector(dim))],
{'input': 0})
arg = feeder(data)
output = arg.getSlotValue(0)
assert isinstance(output, api.Matrix)
for i in xrange(batch_size):
self.assertEqual(output.getSparseRowCols(i), v[i])
def test_integer(self):
dim = 100
batch_size = 32
index = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(np.random.randint(dim))
index.append(each_sample)
feeder = DataFeeder([('input', data_type.integer_value(dim))],
{'input': 0})
arg = feeder(index)
output = arg.getSlotIds(0).copyToNumpyArray()
index = np.array(index, dtype='int')
self.assertEqual(output.all(), index.flatten().all())
def test_multiple_slots(self):
batch_size = 2
data = []
for i in xrange(batch_size):
each_sample = []
each_sample.append(np.random.randint(10)) # size of feature 2: 10
each_sample.append(
self.sparse_binary_reader(
20000, 40, non_empty=True)) # size of feature 1: 20000
each_sample.append(self.dense_reader(100)) # size of feature 0: 100
data.append(each_sample)
# test multiple features
data_types = [('fea0', data_type.dense_vector(100)),
('fea1', data_type.sparse_binary_vector(20000)),
('fea2', data_type.integer_value(10))]
feeder = DataFeeder(data_types, {'fea0': 2, 'fea1': 1, 'fea2': 0})
arg = feeder(data)
output_dense = arg.getSlotValue(0).copyToNumpyMat()
output_sparse = arg.getSlotValue(1)
output_index = arg.getSlotIds(2).copyToNumpyArray()
for i in xrange(batch_size):
self.assertEqual(output_dense[i].all(), data[i][2].all())
self.assertEqual(output_sparse.getSparseRowCols(i), data[i][1])
self.assertEqual(output_index[i], data[i][0])
# reader returns 3 featreus, but only use 2 features
data_types = [('fea0', data_type.dense_vector(100)),
('fea2', data_type.integer_value(10))]
feeder = DataFeeder(data_types, {'fea0': 2, 'fea2': 0})
arg = feeder(data)
output_dense = arg.getSlotValue(0).copyToNumpyMat()
output_index = arg.getSlotIds(1).copyToNumpyArray()
for i in xrange(batch_size):
self.assertEqual(output_dense[i].all(), data[i][2].all())
self.assertEqual(output_index[i], data[i][0])
if __name__ == '__main__':
api.initPaddle("--use_gpu=0")
unittest.main()
if __name__ == '__main__':
api.initPaddle("--use_gpu=0")
unittest.main()