!6346 add timeout value to pyfunc

Merge pull request !6346 from yanghaitao/yht_pyfunc_timeout
pull/6346/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit e60e4920f9

@ -50,6 +50,11 @@ Status PyFuncOp::Compute(const TensorRow &input, TensorRow *output) {
} else {
ret_py_obj = this->py_func_ptr_();
}
// Object is none if pyfunc timeout
if (ret_py_obj.is_none()) {
MS_LOG(INFO) << "Pyfunc execute time out";
goto TimeoutError;
}
if (output_type_ != DataType::DE_UNKNOWN) {
RETURN_IF_NOT_OK(CastOutput(ret_py_obj, output));
@ -87,6 +92,10 @@ ComputeReturn:
ShapeMisMatch:
ret = Status(StatusCode::kShapeMisMatch, "PyFunc should return a numpy array or a numpy array tuple");
goto ComputeReturn;
TimeoutError:
ret = Status(StatusCode::kTimeOut, "PyFunc timeout");
goto ComputeReturn;
}
Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) {

@ -81,6 +81,7 @@ enum class StatusCode : char {
kBoundingBoxOutOfBounds = 11,
kBoundingBoxInvalidShape = 12,
kSyntaxError = 13,
kTimeOut = 14,
// Make this error code the last one. Add new error code above it.
kUnexpectedError = 127
};

@ -1975,7 +1975,11 @@ class _PythonCallable:
try:
# This call will send the tensors along with Python callable index to the process pool.
# Block, yield GIL. Current thread will reacquire GIL once result is returned.
return self.pool.apply(_pyfunc_worker_exec, [self.idx, *args])
result = self.pool.apply_async(_pyfunc_worker_exec, [self.idx, *args])
return result.get(60)
except multiprocessing.TimeoutError:
# Ensure c++ pyfunc threads exit normally if python sub-process is killed unnormally.
return None
except KeyboardInterrupt:
self.pool.terminate()
self.pool.join()

Loading…
Cancel
Save