fix dataloader performace decrease & unittest hanging. test=develop (#26739)

revert-26856-strategy_example2
Kaipeng Deng 5 years ago committed by GitHub
parent 1ec30cb160
commit c03092b188
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -608,22 +608,24 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
# in _send_idx but will not increase _rcvd_idx, so we check
# whether the worker is still alive here to skip the discarded
# batch indices and increase _rcvd_idx
while self._rcvd_idx < self._send_idx:
info = self._task_infos[self._rcvd_idx]
if len(info) == 2 or self._worker_status[info[0]]:
break
del self._task_infos[self._rcvd_idx]
self._rcvd_idx += 1
self._batches_outstanding -= 1
else:
# NOTE: _rcvd_idx and _send_idx only record batches among
# workers, if batches among workers drained, there
# may also be data in blocking queue
if self._batches_outstanding < len(self._places):
return None
continue
if len(self._task_infos[self._rcvd_idx]) == 2:
if self._dataset_kind == _DatasetKind.ITER:
while self._rcvd_idx < self._send_idx:
info = self._task_infos[self._rcvd_idx]
if len(info) == 2 or self._worker_status[info[0]]:
break
del self._task_infos[self._rcvd_idx]
self._rcvd_idx += 1
self._batches_outstanding -= 1
else:
# NOTE: _rcvd_idx and _send_idx only record batches among
# workers, if batches among workers drained, there
# may also be data in blocking queue
if self._batches_outstanding < len(self._places):
return None
continue
if self._rcvd_idx in self._task_infos and \
len(self._task_infos[self._rcvd_idx]) == 2:
return self._task_infos.pop(self._rcvd_idx)[1]
try:

Loading…
Cancel
Save