|
|
@ -2195,7 +2195,9 @@ def _pyfunc_worker_exec(index, op_id, mapping, lock, record, *args):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Internal function for call certain pyfunc in python process.
|
|
|
|
Internal function for call certain pyfunc in python process.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
# Some threads in multiprocess.pool can't process sigint signal,
|
|
|
|
|
|
|
|
# and will occur hang problem, so ctrl+c will pass to parent process.
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
|
|
if record:
|
|
|
|
if record:
|
|
|
|
pid = os.getpid()
|
|
|
|
pid = os.getpid()
|
|
|
|
with lock:
|
|
|
|
with lock:
|
|
|
@ -2203,8 +2205,6 @@ def _pyfunc_worker_exec(index, op_id, mapping, lock, record, *args):
|
|
|
|
data[1].add(pid)
|
|
|
|
data[1].add(pid)
|
|
|
|
mapping[op_id] = data
|
|
|
|
mapping[op_id] = data
|
|
|
|
return _GLOBAL_PYFUNC_LIST[index](*args)
|
|
|
|
return _GLOBAL_PYFUNC_LIST[index](*args)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
|
|
|
raise Exception("Multiprocess MapOp worker receives KeyboardInterrupt")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# PythonCallable wrapper for multiprocess pyfunc
|
|
|
|
# PythonCallable wrapper for multiprocess pyfunc
|
|
|
@ -3262,6 +3262,10 @@ def _generator_worker_loop(dataset, idx_queue, result_queue, eof, is_multiproces
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
idx = idx_queue.get(timeout=1)
|
|
|
|
idx = idx_queue.get(timeout=1)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
|
|
|
if is_multiprocessing:
|
|
|
|
|
|
|
|
eof.set()
|
|
|
|
|
|
|
|
idx_queue.cancel_join_thread()
|
|
|
|
|
|
|
|
result_queue.cancel_join_thread()
|
|
|
|
raise Exception("Generator worker receives KeyboardInterrupt.")
|
|
|
|
raise Exception("Generator worker receives KeyboardInterrupt.")
|
|
|
|
except queue.Empty:
|
|
|
|
except queue.Empty:
|
|
|
|
if eof.is_set():
|
|
|
|
if eof.is_set():
|
|
|
@ -3288,6 +3292,10 @@ def _generator_worker_loop(dataset, idx_queue, result_queue, eof, is_multiproces
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result_queue.put(result, timeout=5)
|
|
|
|
result_queue.put(result, timeout=5)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
|
|
|
if is_multiprocessing:
|
|
|
|
|
|
|
|
eof.set()
|
|
|
|
|
|
|
|
idx_queue.cancel_join_thread()
|
|
|
|
|
|
|
|
result_queue.cancel_join_thread()
|
|
|
|
raise Exception("Generator worker receives KeyboardInterrupt.")
|
|
|
|
raise Exception("Generator worker receives KeyboardInterrupt.")
|
|
|
|
except queue.Full:
|
|
|
|
except queue.Full:
|
|
|
|
if eof.is_set():
|
|
|
|
if eof.is_set():
|
|
|
|