|
|
|
@ -455,26 +455,84 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
|
|
|
|
|
|
|
|
|
|
def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
|
|
|
|
|
"""
|
|
|
|
|
multiprocess_reader use python multi process to read data from readers
|
|
|
|
|
and then use multiprocess.Queue or multiprocess.Pipe to merge all
|
|
|
|
|
data. The process number is equal to the number of input readers, each
|
|
|
|
|
process call one reader.
|
|
|
|
|
This API use python ``multiprocessing`` to read data from ``readers`` parallelly,
|
|
|
|
|
and then ``multiprocess.Queue`` or ``multiprocess.Pipe`` is used to merge
|
|
|
|
|
these data. A seperate process will be created for each reader in the
|
|
|
|
|
``readers`` list, please guarantee every reader can work independently
|
|
|
|
|
to avoid conflicts in parallel environment.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
``Multiprocess.Queue`` require the rw access right to /dev/shm, and it's not suppported
|
|
|
|
|
in some platforms.
|
|
|
|
|
|
|
|
|
|
Multiprocess.Queue require the rw access right to /dev/shm, some
|
|
|
|
|
platform does not support.
|
|
|
|
|
Parameters:
|
|
|
|
|
readers (list( ``generator`` ) | tuple( ``generator`` )): a python ``generator`` list
|
|
|
|
|
used to read input data
|
|
|
|
|
use_pipe (bool, optional): control the inner API used to implement the multi-processing,
|
|
|
|
|
default True - use ``multiprocess.Pipe`` which is recommended
|
|
|
|
|
queue_size (int, optional): only useful when ``use_pipe`` is False - ``multiprocess.Queue``
|
|
|
|
|
is used, default 1000. Increase this value can speed up the data reading, and more memory
|
|
|
|
|
will be consumed.
|
|
|
|
|
|
|
|
|
|
you need to create multiple readers first, these readers should be independent
|
|
|
|
|
to each other so that each process can work independently.
|
|
|
|
|
Returns:
|
|
|
|
|
``generator``: a new reader which can be run parallelly
|
|
|
|
|
|
|
|
|
|
An example:
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
reader0 = reader(["file01", "file02"])
|
|
|
|
|
reader1 = reader(["file11", "file12"])
|
|
|
|
|
reader1 = reader(["file21", "file22"])
|
|
|
|
|
reader = multiprocess_reader([reader0, reader1, reader2],
|
|
|
|
|
queue_size=100, use_pipe=False)
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
|
from paddle.fluid.io import multiprocess_reader
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
|
sample_files = ['sample_file_1', 'sample_file_2']
|
|
|
|
|
|
|
|
|
|
def fake_input_files():
|
|
|
|
|
with open(sample_files[0], 'w') as f:
|
|
|
|
|
np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8]))
|
|
|
|
|
with open(sample_files[1], 'w') as f:
|
|
|
|
|
np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_reader(file_name):
|
|
|
|
|
# load data file
|
|
|
|
|
def _impl():
|
|
|
|
|
data = np.load(file_name)
|
|
|
|
|
for item in sorted(data.files):
|
|
|
|
|
yield data[item],
|
|
|
|
|
return _impl
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
# generate sample input files
|
|
|
|
|
fake_input_files()
|
|
|
|
|
|
|
|
|
|
with fluid.program_guard(fluid.Program(), fluid.Program()):
|
|
|
|
|
place = fluid.CPUPlace()
|
|
|
|
|
# the 1st 2 is batch size
|
|
|
|
|
image = fluid.data(name='image', dtype='int64', shape=[2, 1, 2])
|
|
|
|
|
fluid.layers.Print(image)
|
|
|
|
|
# print detailed tensor info of image variable
|
|
|
|
|
|
|
|
|
|
reader = fluid.io.PyReader(feed_list=[image], capacity=2)
|
|
|
|
|
|
|
|
|
|
decorated_reader = multiprocess_reader(
|
|
|
|
|
[generate_reader(sample_files[0]), generate_reader(sample_files[1])], False)
|
|
|
|
|
|
|
|
|
|
reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
|
|
|
|
|
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(fluid.default_startup_program())
|
|
|
|
|
|
|
|
|
|
for data in reader():
|
|
|
|
|
res = exe.run(feed=data, fetch_list=[image])
|
|
|
|
|
print(res[0])
|
|
|
|
|
# print below content in this case
|
|
|
|
|
# [[[1 2]], [[3 4]]]
|
|
|
|
|
# [[[5 6]], [[7 8]]]
|
|
|
|
|
# [[[9 10]], [[11 12]]]
|
|
|
|
|
# [13,14] will be dropped
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|