|
|
|
@ -12,25 +12,27 @@
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
|
|
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
|
|
|
|
|
'ComposeNotAligned', 'batched', 'firstn'
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
import itertools
|
|
|
|
|
import random
|
|
|
|
|
from Queue import Queue
|
|
|
|
|
from threading import Thread
|
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
|
|
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
|
|
|
|
|
'ComposeNotAligned', 'batched', 'firstn'
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def map_readers(func, *readers):
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader that outputs return value of function using
|
|
|
|
|
output of each data readers as arguments.
|
|
|
|
|
|
|
|
|
|
:param func: function to use.
|
|
|
|
|
:param *readers: readers whose outputs will be used as arguments of func.
|
|
|
|
|
:returns: the created data reader.
|
|
|
|
|
:param func: function to use. The type of func should be (Sample) => Sample
|
|
|
|
|
:type: callable
|
|
|
|
|
:param readers: readers whose outputs will be used as arguments of func.
|
|
|
|
|
:return: the created data reader.
|
|
|
|
|
:rtype: callable
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def reader():
|
|
|
|
@ -45,16 +47,19 @@ def map_readers(func, *readers):
|
|
|
|
|
|
|
|
|
|
def shuffle(reader, buf_size):
|
|
|
|
|
"""
|
|
|
|
|
Creates a data reader whose data output is suffled.
|
|
|
|
|
Creates a data reader whose data output is shuffled.
|
|
|
|
|
|
|
|
|
|
Output from the iterator that created by original reader will be
|
|
|
|
|
buffered into shuffle buffer, and then shuffled. The size of shuffle buffer
|
|
|
|
|
is determined by argument buf_size.
|
|
|
|
|
|
|
|
|
|
:param reader: the original reader whose output will be shuffled.
|
|
|
|
|
:type reader: callable
|
|
|
|
|
:param buf_size: shuffle buffer size.
|
|
|
|
|
:type buf_size: int
|
|
|
|
|
|
|
|
|
|
:returns:the new reader whose output is shuffled.
|
|
|
|
|
:return: the new reader whose output is shuffled.
|
|
|
|
|
:rtype: callable
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def data_reader():
|
|
|
|
@ -88,7 +93,8 @@ def chain(*readers):
|
|
|
|
|
[0, 0, 0, 1, 1, 1, 2, 2, 2]
|
|
|
|
|
|
|
|
|
|
:param readers: input readers.
|
|
|
|
|
:returns: the new data reader.
|
|
|
|
|
:return: the new data reader.
|
|
|
|
|
:rtype: callable
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def reader():
|
|
|
|
@ -115,12 +121,13 @@ def compose(*readers, **kwargs):
|
|
|
|
|
The composed reader will output:
|
|
|
|
|
(1, 2, 3, 4, 5)
|
|
|
|
|
|
|
|
|
|
:*readers: readers that will be composed together.
|
|
|
|
|
:check_alignment: if True, will check if input readers are aligned
|
|
|
|
|
:param readers: readers that will be composed together.
|
|
|
|
|
:param check_alignment: if True, will check if input readers are aligned
|
|
|
|
|
correctly. If False, will not check alignment and trailing outputs
|
|
|
|
|
will be discarded. Defaults to True.
|
|
|
|
|
:type check_alignment: bool
|
|
|
|
|
|
|
|
|
|
:returns: the new data reader.
|
|
|
|
|
:return: the new data reader.
|
|
|
|
|
|
|
|
|
|
:raises ComposeNotAligned: outputs of readers are not aligned.
|
|
|
|
|
Will not raise when check_alignment is set to False.
|
|
|
|
@ -161,7 +168,9 @@ def buffered(reader, size):
|
|
|
|
|
as the buffer is not empty.
|
|
|
|
|
|
|
|
|
|
:param reader: the data reader to read from.
|
|
|
|
|
:type reader: callable
|
|
|
|
|
:param size: max buffer size.
|
|
|
|
|
:type size: int
|
|
|
|
|
|
|
|
|
|
:returns: the buffered data reader.
|
|
|
|
|
"""
|
|
|
|
@ -196,9 +205,13 @@ def buffered(reader, size):
|
|
|
|
|
def batched(reader, batch_size):
|
|
|
|
|
"""
|
|
|
|
|
Create a batched reader.
|
|
|
|
|
|
|
|
|
|
:param reader: the data reader to read from.
|
|
|
|
|
:param batch_size: batch_size
|
|
|
|
|
:type reader: callable
|
|
|
|
|
:param batch_size: size of each mini-batch
|
|
|
|
|
:type batch_size: int
|
|
|
|
|
:return: the batched reader.
|
|
|
|
|
:rtype: callable
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def batched_reader():
|
|
|
|
@ -218,6 +231,13 @@ def batched(reader, batch_size):
|
|
|
|
|
def firstn(reader, n):
|
|
|
|
|
"""
|
|
|
|
|
Limit the max number of samples that reader could return.
|
|
|
|
|
|
|
|
|
|
:param reader: the data reader to read from.
|
|
|
|
|
:type reader: callable
|
|
|
|
|
:param n: the max number of samples that return.
|
|
|
|
|
:type n: int
|
|
|
|
|
:return: the decorated reader.
|
|
|
|
|
:rtype: callable
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# TODO(yuyang18): Check if just drop the reader, could clean the opened
|
|
|
|
|