You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/tests/demo/pyreader.py

118 lines
3.8 KiB

# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist
import paddle
import threading
import numpy
def network(is_train):
reader, queue = fluid.layers.py_reader(
capacity=10,
shapes=((-1, 784), (-1, 1)),
dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader")
img, label = fluid.layers.read_file(fluid.layers.double_buffer(reader))
hidden = img
for i in xrange(2):
hidden = fluid.layers.fc(input=hidden, size=100, act='tanh')
hidden = fluid.layers.dropout(
hidden, dropout_prob=0.5, is_test=not is_train)
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
return fluid.layers.mean(loss), queue, reader
def pipe_reader_to_queue(reader_creator, queue):
with fluid.program_guard(fluid.Program(), fluid.Program()):
feeder = fluid.DataFeeder(
feed_list=[
fluid.layers.data(
name='img', dtype='float32', shape=[784]),
fluid.layers.data(
name='label', dtype='int64', shape=[1])
],
place=fluid.CPUPlace())
def __thread_main__():
for data in feeder.decorate_reader(
reader_creator, multi_devices=False)():
tmp = fluid.core.LoDTensorArray()
tmp.append(data['img'])
tmp.append(data['label'])
queue.push(tmp)
queue.close()
th = threading.Thread(target=__thread_main__)
th.start()
return th
def main():
train_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
loss, train_queue, train_reader = network(True)
adam = fluid.optimizer.Adam(learning_rate=0.01)
adam.minimize(loss)
test_prog = fluid.Program()
with fluid.program_guard(test_prog, fluid.Program()):
with fluid.unique_name.guard():
test_loss, test_queue, test_reader = network(False)
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
trainer = fluid.ParallelExecutor(
use_cuda=True, loss_name=loss.name, main_program=train_prog)
tester = fluid.ParallelExecutor(
use_cuda=True, share_vars_from=trainer, main_program=test_prog)
for epoch_id in xrange(10):
train_data_thread = pipe_reader_to_queue(
paddle.batch(mnist.train(), 32), train_queue)
try:
while True:
print 'train_loss', numpy.array(
trainer.run(fetch_list=[loss.name]))
except fluid.core.EOFException:
print 'End of epoch', epoch_id
train_reader.reset()
train_data_thread.join()
test_data_thread = pipe_reader_to_queue(
paddle.batch(mnist.test(), 32), test_queue)
try:
while True:
print 'test loss', numpy.array(
tester.run(fetch_list=[test_loss.name]))
except fluid.core.EOFException:
print 'End of testing'
test_reader.reset()
test_data_thread.join()
break
if __name__ == '__main__':
main()