|
|
|
@ -47,6 +47,76 @@ class PyReader(object):
|
|
|
|
|
capacity,
|
|
|
|
|
use_double_buffer=True,
|
|
|
|
|
iterable=True):
|
|
|
|
|
"""
|
|
|
|
|
Create a reader object for data feeding in Python.
|
|
|
|
|
Data would be prefetched using Python thread and be pushed
|
|
|
|
|
into a queue asynchronously. Data in the queue would be extracted
|
|
|
|
|
automatically when `Executor.run(...)` is called.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
feed_list (list(Variable)|tuple(Variable)): feed variable list.
|
|
|
|
|
The variables should be created by :code:`fluid.layers.data()`.
|
|
|
|
|
capacity (int): capacity of the queue maintained in PyReader object.
|
|
|
|
|
use_double_buffer (bool): whether to use double_buffer_reader to
|
|
|
|
|
speed up data feeding.
|
|
|
|
|
iterable (bool): whether the created reader object is iterable.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
reader (Reader): the created reader object.
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
1. If iterable = False, the created PyReader object is almost the
|
|
|
|
|
same as :code:`fluid.layers.py_reader()`. Operators would be
|
|
|
|
|
inserted into the program. User should call :code:`start()`
|
|
|
|
|
before each epoch and catch :code:`fluid.core.EOFException`
|
|
|
|
|
thrown by :code:`Executor.run()` when epoch ends. Once the
|
|
|
|
|
exception is caught, user should call :code:`reset()` to reset
|
|
|
|
|
the reader manually.
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
image = fluid.layers.data(
|
|
|
|
|
name='image', shape=[784], dtype='float32')
|
|
|
|
|
label = fluid.layers.data(
|
|
|
|
|
name='label', shape=[1], dtype='int64')
|
|
|
|
|
|
|
|
|
|
reader = fluid.io.PyReader(feed_list=[image, label],
|
|
|
|
|
capacity=4, iterable=False)
|
|
|
|
|
reader.decorate_paddle_reader(user_defined_reader)
|
|
|
|
|
... # definition of network is omitted
|
|
|
|
|
executor.run(fluid.default_main_program())
|
|
|
|
|
for _ in range(EPOCH_NUM):
|
|
|
|
|
reader.start()
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
executor.run(feed=None, ...)
|
|
|
|
|
except fluid.core.EOFException:
|
|
|
|
|
reader.reset()
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
2. If iterable=True, the created PyReader object is decoupled with
|
|
|
|
|
the program. No operator would be inserted into the program.
|
|
|
|
|
In this case, the created reader is a Python generator, which
|
|
|
|
|
is iterable. User should feed the data yielded from PyReader
|
|
|
|
|
object into :code:`Executor.run(feed=...)`.
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
image = fluid.layers.data(
|
|
|
|
|
name='image', shape=[784], dtype='float32')
|
|
|
|
|
label = fluid.layers.data(
|
|
|
|
|
name='label', shape=[1], dtype='int64')
|
|
|
|
|
|
|
|
|
|
reader = fluid.io.PyReader(feed_list=[image, label],
|
|
|
|
|
capacity=4, iterable=True)
|
|
|
|
|
reader.decorate_paddle_reader(user_defined_reader,
|
|
|
|
|
places=fluid.cuda_places())
|
|
|
|
|
... # definition of network is omitted
|
|
|
|
|
executor.run(fluid.default_main_program())
|
|
|
|
|
for _ in range(EPOCH_NUM):
|
|
|
|
|
for data in reader():
|
|
|
|
|
executor.run(feed=data, ...)
|
|
|
|
|
"""
|
|
|
|
|
self._tensor_reader = None
|
|
|
|
|
self._thread = None
|
|
|
|
|
self._iterable = iterable
|
|
|
|
@ -161,10 +231,18 @@ class PyReader(object):
|
|
|
|
|
self._thread.join()
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
'''
|
|
|
|
|
Start the data feeding thread.
|
|
|
|
|
Can only call when the reader object is not iterable.
|
|
|
|
|
'''
|
|
|
|
|
assert not self._iterable, "start() cannot be called when PyReader is iterable"
|
|
|
|
|
self._start()
|
|
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
|
'''
|
|
|
|
|
Reset the reader object when :code:`fluid.core.EOFException` raises.
|
|
|
|
|
Can only call when the reader object is not iterable.
|
|
|
|
|
'''
|
|
|
|
|
assert not self._iterable, "reset() cannot be called when PyReader is iterable"
|
|
|
|
|
self._reset()
|
|
|
|
|
|
|
|
|
@ -190,6 +268,18 @@ class PyReader(object):
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
def decorate_paddle_reader(self, reader, places=None):
|
|
|
|
|
'''
|
|
|
|
|
Set the data source of the PyReader object.
|
|
|
|
|
|
|
|
|
|
The provided :code:`reader` should be a Python generator,
|
|
|
|
|
which yields numpy-typed batched data.
|
|
|
|
|
|
|
|
|
|
:code:`places` must be set when the PyReader object is iterable.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reader (generator): Python generator that yields numpy-typed
|
|
|
|
|
batched data.
|
|
|
|
|
'''
|
|
|
|
|
assert self._tensor_reader is None, \
|
|
|
|
|
"Cannot reset the data source of PyReader"
|
|
|
|
|
with program_guard(Program(), Program()):
|
|
|
|
@ -204,6 +294,18 @@ class PyReader(object):
|
|
|
|
|
self.decorate_tensor_provider(__tensor_reader_impl__, places)
|
|
|
|
|
|
|
|
|
|
def decorate_tensor_provider(self, reader, places=None):
|
|
|
|
|
'''
|
|
|
|
|
Set the data source of the PyReader object.
|
|
|
|
|
|
|
|
|
|
The provided :code:`reader` should be a Python generator,
|
|
|
|
|
which yields LoDTensor-typed batched data.
|
|
|
|
|
|
|
|
|
|
:code:`places` must be set when the PyReader object is iterable.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reader (generator): Python generator that yields LoDTensor-typed
|
|
|
|
|
batched data.
|
|
|
|
|
'''
|
|
|
|
|
assert self._tensor_reader is None, \
|
|
|
|
|
"Cannot reset the data source of PyReader"
|
|
|
|
|
self._tensor_reader = reader
|
|
|
|
|