|
|
|
@ -14,6 +14,7 @@
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
import unittest
|
|
|
|
|
import multiprocessing
|
|
|
|
|
import numpy as np
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
|
from paddle.fluid import core
|
|
|
|
@ -50,6 +51,14 @@ class TestDygraphDataLoaderProcess(unittest.TestCase):
|
|
|
|
|
self.capacity = 2
|
|
|
|
|
|
|
|
|
|
def test_reader_process_loop(self):
|
|
|
|
|
# This unittest's memory mapped files needs to be cleaned manually
|
|
|
|
|
def __clear_process__(util_queue):
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
util_queue.get_nowait()
|
|
|
|
|
except queue.Empty:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
with fluid.dygraph.guard():
|
|
|
|
|
loader = fluid.io.DataLoader.from_generator(
|
|
|
|
|
capacity=self.batch_num + 1, use_multiprocess=True)
|
|
|
|
@ -58,8 +67,16 @@ class TestDygraphDataLoaderProcess(unittest.TestCase):
|
|
|
|
|
places=fluid.CPUPlace())
|
|
|
|
|
loader._data_queue = queue.Queue(self.batch_num + 1)
|
|
|
|
|
loader._reader_process_loop()
|
|
|
|
|
# For clean memory mapped files
|
|
|
|
|
util_queue = multiprocessing.Queue(self.batch_num + 1)
|
|
|
|
|
for _ in range(self.batch_num):
|
|
|
|
|
loader._data_queue.get(timeout=10)
|
|
|
|
|
data = loader._data_queue.get(timeout=10)
|
|
|
|
|
util_queue.put(data)
|
|
|
|
|
|
|
|
|
|
# Clean up memory mapped files
|
|
|
|
|
clear_process = multiprocessing.Process(
|
|
|
|
|
target=__clear_process__, args=(util_queue, ))
|
|
|
|
|
clear_process.start()
|
|
|
|
|
|
|
|
|
|
def test_reader_process_loop_simple_none(self):
|
|
|
|
|
def none_sample_genarator(batch_num):
|
|
|
|
|