|
|
|
@ -14,8 +14,7 @@
|
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
|
|
'cache', 'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
|
|
|
|
|
'ComposeNotAligned', 'firstn', 'xmap_readers', 'PipeReader',
|
|
|
|
|
'multiprocess_reader', 'Fake'
|
|
|
|
|
'ComposeNotAligned', 'firstn', 'xmap_readers', 'multiprocess_reader'
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
from threading import Thread
|
|
|
|
@ -517,116 +516,3 @@ def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
|
|
|
|
|
return pipe_reader
|
|
|
|
|
else:
|
|
|
|
|
return queue_reader
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _buf2lines(buf, line_break="\n"):
|
|
|
|
|
# FIXME: line_break should be automatically configured.
|
|
|
|
|
lines = buf.split(line_break)
|
|
|
|
|
return lines[:-1], lines[-1]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PipeReader:
|
|
|
|
|
"""
|
|
|
|
|
PipeReader read data by stream from a command, take it's
|
|
|
|
|
stdout into a pipe buffer and redirect it to the parser to
|
|
|
|
|
parse, then yield data as your desired format.
|
|
|
|
|
|
|
|
|
|
You can using standard linux command or call another program
|
|
|
|
|
to read data, from HDFS, Ceph, URL, AWS S3 etc:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
cmd = "hadoop fs -cat /path/to/some/file"
|
|
|
|
|
cmd = "cat sample_file.tar.gz"
|
|
|
|
|
cmd = "curl http://someurl"
|
|
|
|
|
cmd = "python print_s3_bucket.py"
|
|
|
|
|
|
|
|
|
|
An example:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
def example_reader():
|
|
|
|
|
for f in myfiles:
|
|
|
|
|
pr = PipeReader("cat %s"%f)
|
|
|
|
|
for l in pr.get_line():
|
|
|
|
|
sample = l.split(" ")
|
|
|
|
|
yield sample
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, command, bufsize=8192, file_type="plain"):
|
|
|
|
|
if not isinstance(command, str):
|
|
|
|
|
raise TypeError("left_cmd must be a string")
|
|
|
|
|
if file_type == "gzip":
|
|
|
|
|
self.dec = zlib.decompressobj(
|
|
|
|
|
32 + zlib.MAX_WBITS) # offset 32 to skip the header
|
|
|
|
|
self.file_type = file_type
|
|
|
|
|
self.bufsize = bufsize
|
|
|
|
|
self.process = subprocess.Popen(
|
|
|
|
|
command.split(" "), bufsize=bufsize, stdout=subprocess.PIPE)
|
|
|
|
|
|
|
|
|
|
def get_line(self, cut_lines=True, line_break="\n"):
|
|
|
|
|
"""
|
|
|
|
|
:param cut_lines: cut buffer to lines
|
|
|
|
|
:type cut_lines: bool
|
|
|
|
|
:param line_break: line break of the file, like '\\\\n' or '\\\\r'
|
|
|
|
|
:type line_break: string
|
|
|
|
|
|
|
|
|
|
:return: one line or a buffer of bytes
|
|
|
|
|
:rtype: string
|
|
|
|
|
"""
|
|
|
|
|
remained = ""
|
|
|
|
|
while True:
|
|
|
|
|
buff = self.process.stdout.read(self.bufsize)
|
|
|
|
|
if buff:
|
|
|
|
|
if self.file_type == "gzip":
|
|
|
|
|
decomp_buff = cpt.to_text(self.dec.decompress(buff))
|
|
|
|
|
elif self.file_type == "plain":
|
|
|
|
|
decomp_buff = cpt.to_text(buff)
|
|
|
|
|
else:
|
|
|
|
|
raise TypeError("file_type %s is not allowed" %
|
|
|
|
|
self.file_type)
|
|
|
|
|
|
|
|
|
|
if cut_lines:
|
|
|
|
|
lines, remained = _buf2lines(''.join(
|
|
|
|
|
[remained, decomp_buff]), line_break)
|
|
|
|
|
for line in lines:
|
|
|
|
|
yield line
|
|
|
|
|
else:
|
|
|
|
|
yield decomp_buff
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Fake(object):
|
|
|
|
|
"""
|
|
|
|
|
fake reader will cache the first data it read and yield it out for data_num times.
|
|
|
|
|
It is used to cache a data from real reader and use it for speed testing.
|
|
|
|
|
|
|
|
|
|
:param reader: the origin reader
|
|
|
|
|
:param data_num: times that this reader will yield data.
|
|
|
|
|
|
|
|
|
|
:return: a fake reader.
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
def reader():
|
|
|
|
|
for i in range(10):
|
|
|
|
|
yield i
|
|
|
|
|
|
|
|
|
|
fake_reader = Fake()(reader, 100)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.data = None
|
|
|
|
|
self.yield_num = 0
|
|
|
|
|
|
|
|
|
|
def __call__(self, reader, data_num):
|
|
|
|
|
def fake_reader():
|
|
|
|
|
if self.data is None:
|
|
|
|
|
self.data = next(reader())
|
|
|
|
|
while self.yield_num < data_num:
|
|
|
|
|
yield self.data
|
|
|
|
|
self.yield_num += 1
|
|
|
|
|
self.yield_num = 0
|
|
|
|
|
|
|
|
|
|
return fake_reader
|
|
|
|
|