You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/reader/decorator.py

621 lines
18 KiB

# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
'cache', 'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
'ComposeNotAligned', 'firstn', 'xmap_readers', 'multiprocess_reader'
]
from threading import Thread
import subprocess
import multiprocessing
import six
import sys
from six.moves.queue import Queue
from six.moves import zip_longest
from six.moves import map
from six.moves import zip
import itertools
import random
import zlib
import paddle.compat as cpt
def cache(reader):
"""
Cache the reader data into memory.
Be careful that this method may take long time to process,
and consume lots of memory. :code:`reader()` would only
call once.
Args:
reader (generator): a reader object which yields
data each time.
Returns:
generator: a decorated reader object which yields data from cached memory.
"""
all_data = tuple(reader())
def __impl__():
for item in all_data:
yield item
return __impl__
def map_readers(func, *readers):
"""
Creates a data reader that outputs return value of function using
output of each data reader as arguments.
If input readers output the following data entries: 2 3,
and the input func is mul(x, y),
the output of the resulted reader will be 6.
Args:
func: a function to read data and compute result, the output of this function
will be set as the output of the resulted data reader.
readers (Reader|list of Reader): list of readers whose outputs will be used as arguments of func.
Returns:
the resulted data reader (Reader)
Examples:
.. code-block:: python
import paddle.reader
d = {"h": 0, "i": 1}
def func(x):
return d[x]
def reader():
yield "h"
yield "i"
map_reader_result = paddle.reader.map_readers(func, reader)
"""
def reader():
rs = []
for r in readers:
rs.append(r())
for e in map(func, *rs):
yield e
return reader
def shuffle(reader, buf_size):
"""
paddle.fluid.io.shuffle ( :ref:`api_fluid_io_shuffle` ) is recommended to use,
and paddle.reader.shuffle is an alias.
This API creates a decorated reader that outputs the shuffled data.
The output data from the origin reader will be saved into a buffer,
and then shuffle the data. The size of buffer is determined by argument buf_size.
Args:
reader(callable): the original reader whose data will be shuffled.
buf_size(int): the size of shuffled buffer.
Returns:
callable: a decorated reader.
Examples:
.. code-block:: python
import paddle.fluid as fluid
def reader():
for i in range(5):
yield i
shuffled_reader = fluid.io.shuffle(reader, 3)
for e in shuffled_reader():
print(e)
# outputs are 0~4 unordered arrangement
"""
def data_reader():
buf = []
for e in reader():
buf.append(e)
if len(buf) >= buf_size:
random.shuffle(buf)
for b in buf:
yield b
buf = []
if len(buf) > 0:
random.shuffle(buf)
for b in buf:
yield b
return data_reader
def chain(*readers):
"""
Use the input data readers to create a chained data reader. The new created reader
chains the outputs of input readers together as its output.
**Note**:
``paddle.reader.chain`` is the alias of ``paddle.fluid.io.chain``, and
``paddle.fluid.io.chain`` is recommended to use.
For example, if three input readers' outputs are as follows:
[0, 0, 0],
[10, 10, 10],
[20, 20, 20].
The chained reader will output:
[[0, 0, 0], [10, 10, 10], [20, 20, 20]].
Args:
readers(list): input data readers.
Returns:
callable: the new chained data reader.
Examples:
.. code-block:: python
import paddle
def reader_creator_3(start):
def reader():
for i in range(start, start + 3):
yield [i, i, i]
return reader
c = paddle.reader.chain(reader_creator_3(0), reader_creator_3(10), reader_creator_3(20))
for e in c():
print(e)
# Output:
# [0, 0, 0]
# [1, 1, 1]
# [2, 2, 2]
# [10, 10, 10]
# [11, 11, 11]
# [12, 12, 12]
# [20, 20, 20]
# [21, 21, 21]
# [22, 22, 22]
"""
def reader():
rs = []
for r in readers:
rs.append(r())
for e in itertools.chain(*rs):
yield e
return reader
class ComposeNotAligned(ValueError):
pass
def compose(*readers, **kwargs):
"""
Creates a data reader whose output is the combination of input readers.
If input readers output following data entries:
(1, 2) 3 (4, 5)
The composed reader will output:
(1, 2, 3, 4, 5)
Args:
readers (Reader|list of Reader): readers that will be composed together.
check_alignment(bool, optional): Indicates whether the input readers are checked for
alignment. If True, whether input readers are aligned
correctly will be checked, else alignment will not be checkout and trailing outputs
will be discarded. Defaults to True.
Returns:
the new data reader (Reader).
Raises:
ComposeNotAligned: outputs of readers are not aligned. This will not raise if check_alignment is set to False.
Examples:
.. code-block:: python
import paddle.fluid as fluid
def reader_creator_10(dur):
def reader():
for i in range(10):
yield i
return reader
reader = fluid.io.compose(reader_creator_10(0), reader_creator_10(0))
"""
check_alignment = kwargs.pop('check_alignment', True)
def make_tuple(x):
if isinstance(x, tuple):
return x
else:
return (x, )
def reader():
rs = []
for r in readers:
rs.append(r())
if not check_alignment:
for outputs in zip(*rs):
yield sum(list(map(make_tuple, outputs)), ())
else:
for outputs in zip_longest(*rs):
for o in outputs:
if o is None:
# None will be not be present if compose is aligned
raise ComposeNotAligned(
"outputs of readers are not aligned.")
yield sum(list(map(make_tuple, outputs)), ())
return reader
def buffered(reader, size):
"""
Creates a buffered data reader.
The buffered data reader will read and save data entries into a
buffer. Reading from the buffered data reader will proceed as long
as the buffer is not empty.
:param reader: the data reader to read from.
:type reader: callable
:param size: max buffer size.
:type size: int
:returns: the buffered data reader.
"""
class EndSignal():
pass
end = EndSignal()
def read_worker(r, q):
for d in r:
q.put(d)
q.put(end)
def data_reader():
r = reader()
q = Queue(maxsize=size)
t = Thread(
target=read_worker, args=(
r,
q, ))
t.daemon = True
t.start()
e = q.get()
while e != end:
yield e
e = q.get()
return data_reader
8 years ago
def firstn(reader, n):
"""
paddle.fluid.io.firstn ( :ref:`api_fluid_io_firstn` ) is recommended to use,
and paddle.reader.firstn is an alias.
This API creates a decorated reader, and limits the max number of
samples that reader could return.
Args:
reader(callable): the input reader.
n(int): the max number of samples in the reader.
Returns:
callable: the decorated reader.
Examples:
.. code-block:: python
import paddle.fluid as fluid
def reader():
for i in range(100):
yield i
firstn_reader = fluid.io.firstn(reader, 5)
for e in firstn_reader():
print(e)
# the outputs are: 0 1 2 3 4
"""
8 years ago
# TODO(yuyang18): Check if just drop the reader, could clean the opened
# resource or not?
def firstn_reader():
for i, item in enumerate(reader()):
8 years ago
if i == n:
break
yield item
8 years ago
return firstn_reader
class XmapEndSignal():
pass
def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
"""
Use multi-threads to map samples from reader by a mapper defined by user.
Args:
mapper (callable): a function to map the data from reader.
reader (callable): a data reader which yields the data.
process_num (int): thread number to handle original sample.
buffer_size (int): size of the queue to read data in.
order (bool): whether to keep the data order from original reader.
Default False.
Returns:
callable: a decorated reader with data mapping.
"""
end = XmapEndSignal()
8 years ago
# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
for i in reader():
in_queue.put(i)
in_queue.put(end)
8 years ago
# define a worker to read samples from reader to in_queue with order flag
def order_read_worker(reader, in_queue):
in_order = 0
for i in reader():
8 years ago
in_queue.put((in_order, i))
in_order += 1
in_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue
def handle_worker(in_queue, out_queue, mapper):
sample = in_queue.get()
while not isinstance(sample, XmapEndSignal):
r = mapper(sample)
out_queue.put(r)
sample = in_queue.get()
in_queue.put(end)
out_queue.put(end)
8 years ago
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue by order
def order_handle_worker(in_queue, out_queue, mapper, out_order):
ins = in_queue.get()
while not isinstance(ins, XmapEndSignal):
order, sample = ins
r = mapper(sample)
while order != out_order[0]:
pass
out_queue.put(r)
8 years ago
out_order[0] += 1
ins = in_queue.get()
in_queue.put(end)
out_queue.put(end)
def xreader():
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]
# start a read worker in a thread
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# start several handle_workers
target = order_handle_worker if order else handle_worker
args = (in_queue, out_queue, mapper, out_order) if order else (
in_queue, out_queue, mapper)
workers = []
for i in range(process_num):
worker = Thread(target=target, args=args)
worker.daemon = True
workers.append(worker)
for w in workers:
w.start()
sample = out_queue.get()
while not isinstance(sample, XmapEndSignal):
yield sample
sample = out_queue.get()
finish = 1
while finish < process_num:
sample = out_queue.get()
if isinstance(sample, XmapEndSignal):
finish += 1
else:
yield sample
return xreader
def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
"""
This API use python ``multiprocessing`` to read data from ``readers`` parallelly,
and then ``multiprocess.Queue`` or ``multiprocess.Pipe`` is used to merge
these data. A seperate process will be created for each reader in the
``readers`` list, please guarantee every reader can work independently
to avoid conflicts in parallel environment.
``Multiprocess.Queue`` require the rw access right to /dev/shm, and it's not suppported
in some platforms.
Parameters:
readers (list( ``generator`` ) | tuple( ``generator`` )): a python ``generator`` list
used to read input data
use_pipe (bool, optional): control the inner API used to implement the multi-processing,
default True - use ``multiprocess.Pipe`` which is recommended
queue_size (int, optional): only useful when ``use_pipe`` is False - ``multiprocess.Queue``
is used, default 1000. Increase this value can speed up the data reading, and more memory
will be consumed.
Returns:
``generator``: a new reader which can be run parallelly
Example:
.. code-block:: python
import paddle.fluid as fluid
from paddle.fluid.io import multiprocess_reader
import numpy as np
sample_files = ['sample_file_1', 'sample_file_2']
def fake_input_files():
with open(sample_files[0], 'w') as f:
np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8]))
with open(sample_files[1], 'w') as f:
np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14]))
def generate_reader(file_name):
# load data file
def _impl():
data = np.load(file_name)
for item in sorted(data.files):
yield data[item],
return _impl
if __name__ == '__main__':
# generate sample input files
fake_input_files()
with fluid.program_guard(fluid.Program(), fluid.Program()):
place = fluid.CPUPlace()
# the 1st 2 is batch size
image = fluid.data(name='image', dtype='int64', shape=[2, 1, 2])
fluid.layers.Print(image)
# print detailed tensor info of image variable
reader = fluid.io.PyReader(feed_list=[image], capacity=2)
decorated_reader = multiprocess_reader(
[generate_reader(sample_files[0]), generate_reader(sample_files[1])], False)
reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for data in reader():
res = exe.run(feed=data, fetch_list=[image])
print(res[0])
# print below content in this case
# [[[1 2]], [[3 4]]]
# [[[5 6]], [[7 8]]]
# [[[9 10]], [[11 12]]]
# [13,14] will be dropped
"""
try:
import ujson as json
except Exception as e:
sys.stderr.write("import ujson error: " + str(e) + " use json\n")
import json
assert type(readers) is list and len(readers) > 0
def _read_into_queue(reader, queue):
try:
for sample in reader():
if sample is None:
raise ValueError("sample has None")
queue.put(sample)
queue.put(None)
except:
queue.put("")
six.reraise(*sys.exc_info())
def queue_reader():
queue = multiprocessing.Queue(queue_size)
for reader in readers:
p = multiprocessing.Process(
target=_read_into_queue, args=(reader, queue))
p.start()
reader_num = len(readers)
finish_num = 0
while finish_num < reader_num:
sample = queue.get()
if sample is None:
finish_num += 1
elif sample == "":
raise ValueError("multiprocess reader raises an exception")
else:
yield sample
def _read_into_pipe(reader, conn):
try:
for sample in reader():
if sample is None:
raise ValueError("sample has None!")
conn.send(json.dumps(sample))
conn.send(json.dumps(None))
conn.close()
except:
conn.send(json.dumps(""))
conn.close()
six.reraise(*sys.exc_info())
def pipe_reader():
conns = []
for reader in readers:
parent_conn, child_conn = multiprocessing.Pipe()
conns.append(parent_conn)
p = multiprocessing.Process(
target=_read_into_pipe, args=(reader, child_conn))
p.start()
reader_num = len(readers)
finish_num = 0
conn_to_remove = []
while finish_num < reader_num:
for conn in conn_to_remove:
conns.remove(conn)
conn_to_remove = []
for conn in conns:
sample = json.loads(conn.recv())
if sample is None:
finish_num += 1
conn.close()
conn_to_remove.append(conn)
elif sample == "":
conn.close()
conn_to_remove.append(conn)
raise ValueError("multiprocess reader raises an exception")
else:
yield sample
if use_pipe:
return pipe_reader
else:
return queue_reader