From c9a76ebba2c0b050c157232c13670b17d2ba806d Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Tue, 20 Jun 2017 16:03:49 +0800 Subject: [PATCH 1/6] modified xmap reader to process sample by order --- python/paddle/v2/reader/decorator.py | 36 ++++++++++++++++--- .../paddle/v2/reader/tests/decorator_test.py | 18 ++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index c76faa596c..68ffbd6f3d 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -230,7 +230,7 @@ class XmapEndSignal(): pass -def xmap_readers(mapper, reader, process_num, buffer_size): +def xmap_readers(mapper, reader, process_num, buffer_size, order=False): """ Use multiprocess to map samples from reader by a mapper defined by user. And this function contains a buffered decorator. @@ -242,21 +242,32 @@ def xmap_readers(mapper, reader, process_num, buffer_size): :type process_num: int :param buffer_size: max buffer size :type buffer_size: int + :param order: keep the order of reader + :type order: bool :return: the decarated reader :rtype: callable """ end = XmapEndSignal() in_queue = Queue(buffer_size) out_queue = Queue(buffer_size) - + out_order = [0] # define a worker to read samples from reader to in_queue def read_worker(reader, in_queue): for i in reader(): in_queue.put(i) in_queue.put(end) + + # define a worker to read samples from reader to in_queue with order flag + def order_read_worker(reader, in_queue): + in_order = 0 + for i in reader(): + in_queue.put((in_order,i)) + in_order+=1 + in_queue.put(end) # start a read worker in a thread - t = Thread(target=read_worker, args=(reader, in_queue)) + target = order_read_worker if order else read_worker + t = Thread(target=target, args=(reader, in_queue)) t.daemon = True t.start() @@ -270,12 +281,29 @@ def xmap_readers(mapper, reader, process_num, buffer_size): sample = in_queue.get() in_queue.put(end) out_queue.put(end) + + # define a worker to handle samples from in_queue by mapper + # and put mapped samples into out_queue by order + def order_handle_worker(in_queue, out_queue, mapper, out_order): + ins = in_queue.get() + while not isinstance(ins, XmapEndSignal): + order, sample = ins + r = mapper(sample) + while order != out_order[0]: + pass + out_queue.put(r) + out_order[0] += 1 + ins = in_queue.get() + in_queue.put(end) + out_queue.put(end) # start several handle_workers + target = order_handle_worker if order else handle_worker + args = (in_queue, out_queue, mapper, out_order) if order else (in_queue, out_queue, mapper) workers = [] for i in xrange(process_num): worker = Thread( - target=handle_worker, args=(in_queue, out_queue, mapper)) + target=target, args=args) worker.daemon = True workers.append(worker) for w in workers: diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 734154b979..76db91a44b 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -120,6 +120,24 @@ class TestShuffle(unittest.TestCase): total += 1 self.assertEqual(total, 10) +class TestXmap(unittest.TestCase): + def test_xmap(self): + def mapper(x): + return (x + 1) + orders = (True, False) + thread_nums = (1, 2, 4, 8, 16) + buffered_size = (1, 2, 4, 8, 16) + for order in orders: + for tNum in thread_nums: + for size in buffered_size: + result = [] + for i in paddle.v2.reader.xmap_readers(mapper, reader_creator_10(), tNum, size, order)(): + result.append(i) + if not order: + result.sort() + for idx, e in enumerate(result): + self.assertEqual(e, mapper(idx)) + if __name__ == '__main__': unittest.main() From 8bc07dee4e3c1d01e0c5f5f229fd13cadc74ace8 Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Tue, 20 Jun 2017 16:11:14 +0800 Subject: [PATCH 2/6] format code --- python/paddle/v2/reader/decorator.py | 17 +++++++++-------- python/paddle/v2/reader/tests/decorator_test.py | 8 ++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index 68ffbd6f3d..e432003129 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -251,18 +251,19 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): in_queue = Queue(buffer_size) out_queue = Queue(buffer_size) out_order = [0] + # define a worker to read samples from reader to in_queue def read_worker(reader, in_queue): for i in reader(): in_queue.put(i) in_queue.put(end) - + # define a worker to read samples from reader to in_queue with order flag def order_read_worker(reader, in_queue): in_order = 0 for i in reader(): - in_queue.put((in_order,i)) - in_order+=1 + in_queue.put((in_order, i)) + in_order += 1 in_queue.put(end) # start a read worker in a thread @@ -281,7 +282,7 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): sample = in_queue.get() in_queue.put(end) out_queue.put(end) - + # define a worker to handle samples from in_queue by mapper # and put mapped samples into out_queue by order def order_handle_worker(in_queue, out_queue, mapper, out_order): @@ -292,18 +293,18 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): while order != out_order[0]: pass out_queue.put(r) - out_order[0] += 1 + out_order[0] += 1 ins = in_queue.get() in_queue.put(end) out_queue.put(end) # start several handle_workers target = order_handle_worker if order else handle_worker - args = (in_queue, out_queue, mapper, out_order) if order else (in_queue, out_queue, mapper) + args = (in_queue, out_queue, mapper, out_order) if order else ( + in_queue, out_queue, mapper) workers = [] for i in xrange(process_num): - worker = Thread( - target=target, args=args) + worker = Thread(target=target, args=args) worker.daemon = True workers.append(worker) for w in workers: diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 76db91a44b..0bd7733955 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -120,10 +120,12 @@ class TestShuffle(unittest.TestCase): total += 1 self.assertEqual(total, 10) + class TestXmap(unittest.TestCase): def test_xmap(self): def mapper(x): return (x + 1) + orders = (True, False) thread_nums = (1, 2, 4, 8, 16) buffered_size = (1, 2, 4, 8, 16) @@ -131,13 +133,15 @@ class TestXmap(unittest.TestCase): for tNum in thread_nums: for size in buffered_size: result = [] - for i in paddle.v2.reader.xmap_readers(mapper, reader_creator_10(), tNum, size, order)(): + for i in paddle.v2.reader.xmap_readers(mapper, + reader_creator_10(), + tNum, size, order)(): result.append(i) if not order: result.sort() for idx, e in enumerate(result): self.assertEqual(e, mapper(idx)) - + if __name__ == '__main__': unittest.main() From ff4be82252d797746b3a4169137c7fcfd9ee7039 Mon Sep 17 00:00:00 2001 From: Yi Wang Date: Tue, 20 Jun 2017 17:54:10 -0700 Subject: [PATCH 3/6] Handle multiple processes trying to create the data home directory --- python/paddle/v2/dataset/common.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index e09ac1a7a0..72894c24b1 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -27,13 +27,17 @@ __all__ = ['DATA_HOME', 'download', 'md5file', 'split', 'cluster_files_reader'] DATA_HOME = os.path.expanduser('~/.cache/paddle/dataset') -if not os.path.exists(DATA_HOME): - try: - os.makedirs(DATA_HOME) - except OSError as exc: - if exc.errno != errno.EEXIST: - raise - pass +# When running unit tests, there could be multiple processes that +# trying to create DATA_HOME directory simultaneously, so we cannot +# use a if condition to check for the existence of the directory; +# instead, we use the filesystem as the synchronization mechanism by +# catching returned errors. +try: + os.makedirs(DATA_HOME) +except OSError as exc: + if exc.errno != errno.EEXIST: + raise + pass def md5file(fname): From 09cc4408e5d5424fba49fcacbd813a846413f9cf Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Tue, 20 Jun 2017 16:03:49 +0800 Subject: [PATCH 4/6] modified xmap reader to process sample by order --- python/paddle/v2/reader/decorator.py | 36 ++++++++++++++++--- .../paddle/v2/reader/tests/decorator_test.py | 18 ++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index c76faa596c..68ffbd6f3d 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -230,7 +230,7 @@ class XmapEndSignal(): pass -def xmap_readers(mapper, reader, process_num, buffer_size): +def xmap_readers(mapper, reader, process_num, buffer_size, order=False): """ Use multiprocess to map samples from reader by a mapper defined by user. And this function contains a buffered decorator. @@ -242,21 +242,32 @@ def xmap_readers(mapper, reader, process_num, buffer_size): :type process_num: int :param buffer_size: max buffer size :type buffer_size: int + :param order: keep the order of reader + :type order: bool :return: the decarated reader :rtype: callable """ end = XmapEndSignal() in_queue = Queue(buffer_size) out_queue = Queue(buffer_size) - + out_order = [0] # define a worker to read samples from reader to in_queue def read_worker(reader, in_queue): for i in reader(): in_queue.put(i) in_queue.put(end) + + # define a worker to read samples from reader to in_queue with order flag + def order_read_worker(reader, in_queue): + in_order = 0 + for i in reader(): + in_queue.put((in_order,i)) + in_order+=1 + in_queue.put(end) # start a read worker in a thread - t = Thread(target=read_worker, args=(reader, in_queue)) + target = order_read_worker if order else read_worker + t = Thread(target=target, args=(reader, in_queue)) t.daemon = True t.start() @@ -270,12 +281,29 @@ def xmap_readers(mapper, reader, process_num, buffer_size): sample = in_queue.get() in_queue.put(end) out_queue.put(end) + + # define a worker to handle samples from in_queue by mapper + # and put mapped samples into out_queue by order + def order_handle_worker(in_queue, out_queue, mapper, out_order): + ins = in_queue.get() + while not isinstance(ins, XmapEndSignal): + order, sample = ins + r = mapper(sample) + while order != out_order[0]: + pass + out_queue.put(r) + out_order[0] += 1 + ins = in_queue.get() + in_queue.put(end) + out_queue.put(end) # start several handle_workers + target = order_handle_worker if order else handle_worker + args = (in_queue, out_queue, mapper, out_order) if order else (in_queue, out_queue, mapper) workers = [] for i in xrange(process_num): worker = Thread( - target=handle_worker, args=(in_queue, out_queue, mapper)) + target=target, args=args) worker.daemon = True workers.append(worker) for w in workers: diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 734154b979..76db91a44b 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -120,6 +120,24 @@ class TestShuffle(unittest.TestCase): total += 1 self.assertEqual(total, 10) +class TestXmap(unittest.TestCase): + def test_xmap(self): + def mapper(x): + return (x + 1) + orders = (True, False) + thread_nums = (1, 2, 4, 8, 16) + buffered_size = (1, 2, 4, 8, 16) + for order in orders: + for tNum in thread_nums: + for size in buffered_size: + result = [] + for i in paddle.v2.reader.xmap_readers(mapper, reader_creator_10(), tNum, size, order)(): + result.append(i) + if not order: + result.sort() + for idx, e in enumerate(result): + self.assertEqual(e, mapper(idx)) + if __name__ == '__main__': unittest.main() From cadea35a107167edee23b8e3ca0a92ca3d85e859 Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Tue, 20 Jun 2017 16:11:14 +0800 Subject: [PATCH 5/6] format code --- python/paddle/v2/reader/decorator.py | 17 +++++++++-------- python/paddle/v2/reader/tests/decorator_test.py | 8 ++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index 68ffbd6f3d..e432003129 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -251,18 +251,19 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): in_queue = Queue(buffer_size) out_queue = Queue(buffer_size) out_order = [0] + # define a worker to read samples from reader to in_queue def read_worker(reader, in_queue): for i in reader(): in_queue.put(i) in_queue.put(end) - + # define a worker to read samples from reader to in_queue with order flag def order_read_worker(reader, in_queue): in_order = 0 for i in reader(): - in_queue.put((in_order,i)) - in_order+=1 + in_queue.put((in_order, i)) + in_order += 1 in_queue.put(end) # start a read worker in a thread @@ -281,7 +282,7 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): sample = in_queue.get() in_queue.put(end) out_queue.put(end) - + # define a worker to handle samples from in_queue by mapper # and put mapped samples into out_queue by order def order_handle_worker(in_queue, out_queue, mapper, out_order): @@ -292,18 +293,18 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): while order != out_order[0]: pass out_queue.put(r) - out_order[0] += 1 + out_order[0] += 1 ins = in_queue.get() in_queue.put(end) out_queue.put(end) # start several handle_workers target = order_handle_worker if order else handle_worker - args = (in_queue, out_queue, mapper, out_order) if order else (in_queue, out_queue, mapper) + args = (in_queue, out_queue, mapper, out_order) if order else ( + in_queue, out_queue, mapper) workers = [] for i in xrange(process_num): - worker = Thread( - target=target, args=args) + worker = Thread(target=target, args=args) worker.daemon = True workers.append(worker) for w in workers: diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 76db91a44b..0bd7733955 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -120,10 +120,12 @@ class TestShuffle(unittest.TestCase): total += 1 self.assertEqual(total, 10) + class TestXmap(unittest.TestCase): def test_xmap(self): def mapper(x): return (x + 1) + orders = (True, False) thread_nums = (1, 2, 4, 8, 16) buffered_size = (1, 2, 4, 8, 16) @@ -131,13 +133,15 @@ class TestXmap(unittest.TestCase): for tNum in thread_nums: for size in buffered_size: result = [] - for i in paddle.v2.reader.xmap_readers(mapper, reader_creator_10(), tNum, size, order)(): + for i in paddle.v2.reader.xmap_readers(mapper, + reader_creator_10(), + tNum, size, order)(): result.append(i) if not order: result.sort() for idx, e in enumerate(result): self.assertEqual(e, mapper(idx)) - + if __name__ == '__main__': unittest.main() From d322c94243ef2039c633c3e455a6d3660193804c Mon Sep 17 00:00:00 2001 From: wanghaoshuang Date: Wed, 21 Jun 2017 21:41:53 +0800 Subject: [PATCH 6/6] fix unittest --- python/paddle/v2/reader/tests/decorator_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 0bd7733955..bb3c5d220b 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -134,7 +134,7 @@ class TestXmap(unittest.TestCase): for size in buffered_size: result = [] for i in paddle.v2.reader.xmap_readers(mapper, - reader_creator_10(), + reader_creator_10(0), tNum, size, order)(): result.append(i) if not order: