diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index 7e457f987d..27c82c95f7 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -390,8 +390,6 @@ def pipe_reader(left_cmd, if not callable(parser): raise TypeError("parser must be a callable object") - process = subprocess.Popen( - left_cmd.split(" "), bufsize=bufsize, stdout=subprocess.PIPE) # TODO(typhoonzero): add a thread to read stderr # Always init a decompress object is better than @@ -400,6 +398,8 @@ def pipe_reader(left_cmd, 32 + zlib.MAX_WBITS) # offset 32 to skip the header def reader(): + process = subprocess.Popen( + left_cmd.split(" "), bufsize=bufsize, stdout=subprocess.PIPE) remained = "" while True: buff = process.stdout.read(bufsize) diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 5a92951b10..06e14796da 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -145,5 +145,35 @@ class TestXmap(unittest.TestCase): self.assertEqual(e, mapper(idx)) +class TestPipeReader(unittest.TestCase): + def test_pipe_reader(self): + def simple_parser(lines): + return lines + + import tempfile + + records = [str(i) for i in xrange(5)] + temp = tempfile.NamedTemporaryFile() + try: + with open(temp.name, 'w') as f: + for r in records: + f.write('%s\n' % r) + + cmd = "cat %s" % temp.name + reader = paddle.v2.reader.pipe_reader( + cmd, simple_parser, bufsize=128) + for i in xrange(4): + result = [] + for r in reader(): + result.append(r) + + for idx, e in enumerate(records): + print e, result[idx] + self.assertEqual(e, result[idx]) + finally: + # delete the temporary file + temp.close() + + if __name__ == '__main__': unittest.main()