|
|
|
@ -456,52 +456,124 @@ def py_reader(capacity,
|
|
|
|
|
name=None,
|
|
|
|
|
use_double_buffer=True):
|
|
|
|
|
"""
|
|
|
|
|
Create a reader and blocking queue for data feeding in Python
|
|
|
|
|
Create a Python reader for data feeding in Python
|
|
|
|
|
|
|
|
|
|
This layer returns a Reader Variable and a BlockingQueue.
|
|
|
|
|
The BlockingQueue provides `push()` method to push a `LoDTensorArray`
|
|
|
|
|
object into the queue in Python side. In C++ side, the Reader
|
|
|
|
|
Variable would invoke `pop()` method of the queue to retrieve the
|
|
|
|
|
feeding data. The process of feeding data in Python side and fetching
|
|
|
|
|
data in C++ side can run in parallel. The BlockingQueue should be closed
|
|
|
|
|
using `close()` method when unused.
|
|
|
|
|
This layer returns a Reader Variable.
|
|
|
|
|
The Reader provides :code:`decorate_paddle_reader()` and
|
|
|
|
|
:code:`decorate_tensor_provider()` to set a Python generator as the data
|
|
|
|
|
source in Python side. When :code:`Executor::Run()` is invoked in C++
|
|
|
|
|
side, the data from the generator would be read automatically. Unlike
|
|
|
|
|
:code:`DataFeeder.feed()`, the data reading process and
|
|
|
|
|
:code:`Executor::Run()` process can run in parallel using
|
|
|
|
|
:code:`py_reader`. The :code:`start()` method of the Reader should be
|
|
|
|
|
called when each pass begins, while the :code:`reset()` method should be
|
|
|
|
|
called when the pass ends and :code:`fluid.core.EOFException` raises.
|
|
|
|
|
Note that :code:`Program.clone()` method cannot clone :code:`py_reader`.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
use_double_buffer(bool): Whether use double buffer or not.
|
|
|
|
|
capacity(int): The maximum capacity of the BlockingQueue.
|
|
|
|
|
capacity(int): The buffer capacity maintained by :code:`py_reader`.
|
|
|
|
|
shapes(list|tuple): List of tuples which declaring data shapes.
|
|
|
|
|
dtypes(list|tuple): List of strs which declaring data type.
|
|
|
|
|
lod_levels(list|tuple): List of ints which declaring data lod_level.
|
|
|
|
|
name(basestring): The prefix Python queue name and Reader name. None will
|
|
|
|
|
be generated automatically.
|
|
|
|
|
use_double_buffer(bool): Whether use double buffer or not.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
tuple(Variable, BlockingQueue):
|
|
|
|
|
A Reader Variable from which we can get feeding data.
|
|
|
|
|
|
|
|
|
|
A BlockingQueue object for data feeding.
|
|
|
|
|
Variable: A Reader from which we can get feeding data.
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
1. The basic usage of :code:`py_reader` is as follows:
|
|
|
|
|
|
|
|
|
|
reader, queue = fluid.layers.py_reader(
|
|
|
|
|
capacity=10,
|
|
|
|
|
shapes=[[-1,3,224,224], [-1,1]],
|
|
|
|
|
dtypes=['float32', 'int64'])
|
|
|
|
|
# Via the reader, we can use 'read_file' layer to get data:
|
|
|
|
|
image, label = fluid.layers.read_file(reader)
|
|
|
|
|
|
|
|
|
|
# Via the blocking queue, we can feed data using threads
|
|
|
|
|
def feed_data(queue, feed_images, feed_labels):
|
|
|
|
|
for feed_image, feed_label in zip(feed_images, feed_labels):
|
|
|
|
|
data = core.LoDTensorArray()
|
|
|
|
|
data.append(feed_image)
|
|
|
|
|
data.append(feed_label)
|
|
|
|
|
queue.push(data)
|
|
|
|
|
|
|
|
|
|
thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels))
|
|
|
|
|
thread.start()
|
|
|
|
|
>>> import paddle.v2
|
|
|
|
|
>>> import paddle.fluid as fluid
|
|
|
|
|
>>> import paddle.dataset.mnist as mnist
|
|
|
|
|
>>>
|
|
|
|
|
>>> reader = fluid.layers.py_reader(capacity=64,
|
|
|
|
|
>>> shapes=[(-1,3,224,224), (-1,1)],
|
|
|
|
|
>>> dtypes=['float32', 'int64'])
|
|
|
|
|
>>> reader.decorate_paddle_reader(
|
|
|
|
|
>>> paddle.v2.reader.shuffle(paddle.batch(mnist.train())
|
|
|
|
|
>>>
|
|
|
|
|
>>> img, label = fluid.layers.read_file(reader)
|
|
|
|
|
>>> loss = network(img, label) # some network definition
|
|
|
|
|
>>>
|
|
|
|
|
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
|
|
|
|
|
>>>
|
|
|
|
|
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
|
|
|
|
|
>>> for epoch_id in range(10):
|
|
|
|
|
>>> reader.start()
|
|
|
|
|
>>> try:
|
|
|
|
|
>>> while True:
|
|
|
|
|
>>> exe.run(fetch_list=[loss.name])
|
|
|
|
|
>>> except fluid.core.EOFException:
|
|
|
|
|
>>> reader.reset()
|
|
|
|
|
|
|
|
|
|
2. When training and testing are both performed, two different
|
|
|
|
|
:code:`py_reader` should be created with different names, e.g.:
|
|
|
|
|
|
|
|
|
|
>>> import paddle.v2
|
|
|
|
|
>>> import paddle.fluid as fluid
|
|
|
|
|
>>> import paddle.dataset.mnist as mnist
|
|
|
|
|
>>>
|
|
|
|
|
>>> def network(reader):
|
|
|
|
|
>>> img, label = fluid.layers.read_file(reader)
|
|
|
|
|
>>> # Here, we omitted the network definition
|
|
|
|
|
>>> return loss
|
|
|
|
|
>>>
|
|
|
|
|
>>> train_reader = fluid.layers.py_reader(capacity=64,
|
|
|
|
|
>>> shapes=[(-1,3,224,224), (-1,1)],
|
|
|
|
|
>>> dtypes=['float32', 'int64'],
|
|
|
|
|
>>> name='train_reader')
|
|
|
|
|
>>> train_reader.decorate_paddle_reader(
|
|
|
|
|
>>> paddle.v2.reader.shuffle(paddle.batch(mnist.train())
|
|
|
|
|
>>>
|
|
|
|
|
>>> test_reader = fluid.layers.py_reader(capacity=32,
|
|
|
|
|
>>> shapes=[(-1,3,224,224), (-1,1)],
|
|
|
|
|
>>> dtypes=['float32', 'int64'],
|
|
|
|
|
>>> name='test_reader')
|
|
|
|
|
>>> test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
|
|
|
|
|
>>>
|
|
|
|
|
>>> # Create train_main_prog and train_startup_prog
|
|
|
|
|
>>> train_main_prog = fluid.Program()
|
|
|
|
|
>>> train_startup_prog = fluid.Program()
|
|
|
|
|
>>> with fluid.program_guard(train_main_prog, train_startup_prog):
|
|
|
|
|
>>> # Use fluid.unique_name.guard() to share parameters with test program
|
|
|
|
|
>>> with fluid.unique_name.guard():
|
|
|
|
|
>>> train_loss = network(train_reader) # some network definition
|
|
|
|
|
>>> adam = fluid.optimizer.Adam(learning_rate=0.01)
|
|
|
|
|
>>> adam.minimize(loss)
|
|
|
|
|
>>>
|
|
|
|
|
>>> # Create test_main_prog and test_startup_prog
|
|
|
|
|
>>> test_main_prog = fluid.Program()
|
|
|
|
|
>>> test_startup_prog = fluid.Program()
|
|
|
|
|
>>> with fluid.program_guard(test_main_prog, test_startup_prog):
|
|
|
|
|
>>> # Use fluid.unique_name.guard() to share parameters with train program
|
|
|
|
|
>>> with fluid.unique_name.guard():
|
|
|
|
|
>>> test_loss = network(test_reader)
|
|
|
|
|
>>>
|
|
|
|
|
>>> fluid.Executor(fluid.CUDAPlace(0)).run(train_startup_prog)
|
|
|
|
|
>>> fluid.Executor(fluid.CUDAPlace(0)).run(test_startup_prog)
|
|
|
|
|
>>>
|
|
|
|
|
>>> train_exe = fluid.ParallelExecutor(use_cuda=True,
|
|
|
|
|
>>> loss_name=train_loss.name, main_program=train_main_prog)
|
|
|
|
|
>>> test_exe = fluid.ParallelExecutor(use_cuda=True,
|
|
|
|
|
>>> loss_name=test_loss.name, main_program=test_main_prog)
|
|
|
|
|
>>> for epoch_id in range(10):
|
|
|
|
|
>>> train_reader.start()
|
|
|
|
|
>>> try:
|
|
|
|
|
>>> while True:
|
|
|
|
|
>>> train_exe.run(fetch_list=[train_loss.name])
|
|
|
|
|
>>> except fluid.core.EOFException:
|
|
|
|
|
>>> train_reader.reset()
|
|
|
|
|
>>>
|
|
|
|
|
>>> test_reader.start()
|
|
|
|
|
>>> try:
|
|
|
|
|
>>> while True:
|
|
|
|
|
>>> test_exe.run(fetch_list=[test_loss.name])
|
|
|
|
|
>>> except fluid.core.EOFException:
|
|
|
|
|
>>> test_reader.reset()
|
|
|
|
|
"""
|
|
|
|
|
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
|
|
|
|
|
shape_concat = []
|
|
|
|
|