|
|
|
@ -12,7 +12,10 @@
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
|
|
__all__ = ['buffered', 'compose', 'chain', 'shuffle', 'ComposeNotAligned']
|
|
|
|
|
__all__ = [
|
|
|
|
|
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
|
|
|
|
|
'ComposeNotAligned'
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
from Queue import Queue
|
|
|
|
|
from threading import Thread
|
|
|
|
@ -20,20 +23,38 @@ import itertools
|
|
|
|
|
import random
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def map_readers(func, *readers):
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader that outputs return value of function using
|
|
|
|
|
output of each data readers as arguments.
|
|
|
|
|
|
|
|
|
|
:param func: function to use.
|
|
|
|
|
:param *readers: readers whose outputs will be used as arguments of func.
|
|
|
|
|
:returns: the created data reader.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def reader():
|
|
|
|
|
rs = []
|
|
|
|
|
for r in readers:
|
|
|
|
|
rs.append(r())
|
|
|
|
|
for e in itertools.imap(func, *rs):
|
|
|
|
|
yield e
|
|
|
|
|
|
|
|
|
|
return reader
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def shuffle(reader, buf_size):
|
|
|
|
|
"""Creates a data reader whose data output is suffled.
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader whose data output is suffled.
|
|
|
|
|
|
|
|
|
|
Output from the iterator that created by original reader will be
|
|
|
|
|
buffered into shuffle buffer, and then shuffled. The size of shuffle buffer
|
|
|
|
|
is determined by argument buf_size.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reader: the original reader whose output will be
|
|
|
|
|
shuffled.
|
|
|
|
|
buf_size: shuffle buffer size.
|
|
|
|
|
:param reader: the original reader whose output will be shuffled.
|
|
|
|
|
:param buf_size: shuffle buffer size.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
the new reader whose output is shuffled.
|
|
|
|
|
:returns:the new reader whose output is shuffled.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def data_reader():
|
|
|
|
@ -55,8 +76,9 @@ def shuffle(reader, buf_size):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chain(*readers):
|
|
|
|
|
"""Creates a data reader whose output is the outputs of input data
|
|
|
|
|
readers chained together.
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader whose output is the outputs of input data
|
|
|
|
|
readers chained together.
|
|
|
|
|
|
|
|
|
|
If input readers output following data entries:
|
|
|
|
|
[0, 0, 0]
|
|
|
|
@ -65,11 +87,8 @@ def chain(*readers):
|
|
|
|
|
The chained reader will output:
|
|
|
|
|
[0, 0, 0, 1, 1, 1, 2, 2, 2]
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
readers: input readers.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
the new data reader.
|
|
|
|
|
:param readers: input readers.
|
|
|
|
|
:returns: the new data reader.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def reader():
|
|
|
|
@ -88,25 +107,23 @@ class ComposeNotAligned(ValueError):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compose(*readers, **kwargs):
|
|
|
|
|
"""Creates a data reader whose output is the combination of input readers.
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader whose output is the combination of input readers.
|
|
|
|
|
|
|
|
|
|
If input readers output following data entries:
|
|
|
|
|
(1, 2) 3 (4, 5)
|
|
|
|
|
The composed reader will output:
|
|
|
|
|
(1, 2, 3, 4, 5)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
*readers: readers that will be composed together.
|
|
|
|
|
check_alignment: If True, will check if input readers are aligned
|
|
|
|
|
correctly. If False, will not check alignment and trailing outputs
|
|
|
|
|
will be discarded. Defaults to True.
|
|
|
|
|
:*readers: readers that will be composed together.
|
|
|
|
|
:check_alignment: if True, will check if input readers are aligned
|
|
|
|
|
correctly. If False, will not check alignment and trailing outputs
|
|
|
|
|
will be discarded. Defaults to True.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
the new data reader.
|
|
|
|
|
:returns: the new data reader.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ComposeNotAligned: outputs of readers are not aligned.
|
|
|
|
|
Will not raise when check_alignment is set to False.
|
|
|
|
|
:raises ComposeNotAligned: outputs of readers are not aligned.
|
|
|
|
|
Will not raise when check_alignment is set to False.
|
|
|
|
|
"""
|
|
|
|
|
check_alignment = kwargs.pop('check_alignment', True)
|
|
|
|
|
|
|
|
|
@ -136,18 +153,17 @@ def compose(*readers, **kwargs):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def buffered(reader, size):
|
|
|
|
|
"""Creates a buffered data reader.
|
|
|
|
|
"""
|
|
|
|
|
Creates a buffered data reader.
|
|
|
|
|
|
|
|
|
|
The buffered data reader will read and save data entries into a
|
|
|
|
|
buffer. Reading from the buffered data reader will proceed as long
|
|
|
|
|
as the buffer is not empty.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reader: the data reader to read from.
|
|
|
|
|
size: max buffer size.
|
|
|
|
|
:param reader: the data reader to read from.
|
|
|
|
|
:param size: max buffer size.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
The buffered data reader.
|
|
|
|
|
:returns: the buffered data reader.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
class EndSignal():
|
|
|
|
|