You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/contrib/reader/distributed_reader.py

67 lines
2.0 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
__all__ = ["distributed_sampler"]
def distributed_sampler(reader, batch_size):
"""
Create a distributed reader.
:param reader: the data reader to read from.
:type reader: callable
:param batch_size: the size of the batch
:type batch_size: int
"""
def batch_reader():
if not os.getenv('PADDLE_TRAINER_ID'):
raise RuntimeError(
"The current program is not in distributed mode.")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
def _slice_data(size):
per_node_lines = size // trainer_count
return [
trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines
]
r = reader()
b = []
for instance in r:
b.append(instance)
if len(b) == batch_size:
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
b = []
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
# Batch size check
batch_size = int(batch_size)
if batch_size <= 0:
raise ValueError("batch_size should be a positive integeral value, "
"but got batch_size={}".format(batch_size))
return batch_reader