!7179 fix exception not exit

Merge pull request !7179 from kisnwang/fix-exception-not-exit
pull/7179/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 618a876e0c

@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
#include "backend/session/executor.h" #include "backend/session/executor.h"
#include <exception>
#include "runtime/device/kernel_runtime_manager.h" #include "runtime/device/kernel_runtime_manager.h"
#include "backend/session/executor_manager.h" #include "backend/session/executor_manager.h"
#include "utils/comm_manager.h" #include "utils/comm_manager.h"
@ -40,10 +41,7 @@ void UpdateOutputTensors(const VectorRef *outputs,
tensor->set_device_address(address); tensor->set_device_address(address);
} }
if (tensor->NeedSyncDeviceToHostImmediately()) { if (tensor->NeedSyncDeviceToHostImmediately()) {
auto tensor_address = tensor->device_address(); tensor->data_sync(false);
MS_EXCEPTION_IF_NULL(tensor_address);
tensor_address->SyncDeviceToHost(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c());
tensor->set_device_address(nullptr); tensor->set_device_address(nullptr);
tensor->set_sync_status(kNeedSyncHostToDevice); tensor->set_sync_status(kNeedSyncHostToDevice);
} }
@ -85,7 +83,11 @@ void BuildGraphTask::Run() {
void RunGraphTask::Run() { void RunGraphTask::Run() {
MS_EXCEPTION_IF_NULL(session_); MS_EXCEPTION_IF_NULL(session_);
session_->RunGraphImpl(graph_id_, input_tensors_, &outputs_); try {
session_->RunGraphImpl(graph_id_, input_tensors_, &outputs_);
} catch (const std::exception &e) {
MsException::GetInstance().SetException();
}
UpdateOutputTensors(&outputs_, tensor_to_node_); UpdateOutputTensors(&outputs_, tensor_to_node_);
for (auto &tensor : input_need_lock_tensors_) { for (auto &tensor : input_need_lock_tensors_) {
tensor->SetNeedWait(false); tensor->SetNeedWait(false);
@ -115,14 +117,6 @@ Executor::Executor(const std::string &device_name, uint32_t device_id) {
Executor::~Executor() { WorkerJoin(); } Executor::~Executor() { WorkerJoin(); }
void Executor::CheckException() {
if (exception_ptr_ != nullptr) {
auto exception_ptr = exception_ptr_;
exception_ptr_ = nullptr;
std::rethrow_exception(exception_ptr);
}
}
void Executor::WorkerJoin() { void Executor::WorkerJoin() {
// Avoid worker thread join itself which will cause deadlock // Avoid worker thread join itself which will cause deadlock
if (worker_->joinable() && worker_->get_id() != std::this_thread::get_id()) { if (worker_->joinable() && worker_->get_id() != std::this_thread::get_id()) {
@ -152,7 +146,7 @@ void Executor::WorkerLoop() {
try { try {
task->Run(); task->Run();
} catch (const std::exception &e) { } catch (const std::exception &e) {
exception_ptr_ = std::current_exception(); MsException::GetInstance().SetException();
} }
if (task->type_ != kRunGraph || task->sync_run_) { if (task->type_ != kRunGraph || task->sync_run_) {
task = nullptr; task = nullptr;
@ -200,48 +194,40 @@ bool Executor::IsTaskReady(const std::shared_ptr<RunGraphTask> &task) {
return true; return true;
} }
GraphId Executor::CompileGraph(const SessionPtr &session, const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { void Executor::SyncRunTask(const std::shared_ptr<Task> &task) {
CheckException();
std::unique_lock<std::mutex> lock(task_mutex_); std::unique_lock<std::mutex> lock(task_mutex_);
ready_tasks_.push(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
MsException::GetInstance().CheckException();
}
GraphId Executor::CompileGraph(const SessionPtr &session, const AnfNodePtrList &lst, const AnfNodePtrList &outputs) {
auto task = std::make_shared<CompileNodesTask>(); auto task = std::make_shared<CompileNodesTask>();
task->session_ = session; task->session_ = session;
task->nodes_ = lst; task->nodes_ = lst;
task->output_nodes_ = outputs; task->output_nodes_ = outputs;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
CheckException();
return task->graph_id_; return task->graph_id_;
} }
GraphId Executor::CompileGraph(const SessionPtr &session, NotNull<FuncGraphPtr> func_graph) { GraphId Executor::CompileGraph(const SessionPtr &session, NotNull<FuncGraphPtr> func_graph) {
CheckException();
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<CompileGraphTask>(); auto task = std::make_shared<CompileGraphTask>();
task->session_ = session; task->session_ = session;
task->func_graph_ = func_graph; task->func_graph_ = func_graph;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
CheckException();
return task->graph_id_; return task->graph_id_;
} }
void Executor::BuildGraph(const SessionPtr &session, GraphId graphId) { void Executor::BuildGraph(const SessionPtr &session, GraphId graphId) {
CheckException();
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<BuildGraphTask>(); auto task = std::make_shared<BuildGraphTask>();
task->session_ = session; task->session_ = session;
task->graph_id_ = graphId; task->graph_id_ = graphId;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
CheckException();
} }
void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id, void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id,
const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) { const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) {
CheckException();
MS_EXCEPTION_IF_NULL(session); MS_EXCEPTION_IF_NULL(session);
MS_EXCEPTION_IF_NULL(outputs); MS_EXCEPTION_IF_NULL(outputs);
auto task = std::make_shared<RunGraphTask>(); auto task = std::make_shared<RunGraphTask>();
@ -251,30 +237,25 @@ void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id,
session->CreateOutputTensors(graph_id, inputs, outputs, &task->tensor_to_node_); session->CreateOutputTensors(graph_id, inputs, outputs, &task->tensor_to_node_);
task->outputs_ = *outputs; task->outputs_ = *outputs;
task->sync_run_ = true; task->sync_run_ = true;
std::unique_lock<std::mutex> lock(task_mutex_);
ready_tasks_.push(task);
task_cond_var_.notify_all();
mindspore::ScopedLongRunning long_running; mindspore::ScopedLongRunning long_running;
sync_cond_var_.wait(lock); SyncRunTask(task);
CheckException();
} }
void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id, void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id,
const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) { const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) {
CheckException();
MS_EXCEPTION_IF_NULL(session); MS_EXCEPTION_IF_NULL(session);
MS_EXCEPTION_IF_NULL(outputs); MS_EXCEPTION_IF_NULL(outputs);
auto task = std::make_shared<RunGraphTask>(); auto task = std::make_shared<RunGraphTask>();
task->session_ = session; task->session_ = session;
task->graph_id_ = graph_id; task->graph_id_ = graph_id;
task->input_tensors_ = inputs; task->input_tensors_ = inputs;
task->input_need_lock_tensors_ = session->GetNeedLockInputTensors(graph_id, inputs);
// lock inputs // lock inputs
for (auto &tensor : inputs) { for (auto &tensor : inputs) {
if (tensor->NeedWait()) { if (tensor->NeedWait()) {
task->input_need_wait_tensors_.emplace_back(tensor); task->input_need_wait_tensors_.emplace_back(tensor);
} }
} }
task->input_need_lock_tensors_ = session->GetNeedLockInputTensors(graph_id, inputs);
for (auto &tensor : task->input_need_lock_tensors_) { for (auto &tensor : task->input_need_lock_tensors_) {
tensor->SetNeedWait(true); tensor->SetNeedWait(true);
} }
@ -285,12 +266,8 @@ void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id,
// sync run graph without output tensor(int dataset graph) // sync run graph without output tensor(int dataset graph)
if (!TensorInVector(outputs)) { if (!TensorInVector(outputs)) {
task->sync_run_ = true; task->sync_run_ = true;
std::unique_lock<std::mutex> lock(task_mutex_);
ready_tasks_.push(task);
task_cond_var_.notify_all();
mindspore::ScopedLongRunning long_running; mindspore::ScopedLongRunning long_running;
sync_cond_var_.wait(lock); SyncRunTask(task);
CheckException();
return; return;
} }
@ -307,54 +284,38 @@ void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id,
void Executor::BuildOp(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info, void Executor::BuildOp(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info,
const std::vector<tensor::TensorPtr> &input_tensors, const std::vector<int> &tensors_mask) { const std::vector<tensor::TensorPtr> &input_tensors, const std::vector<int> &tensors_mask) {
CheckException();
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<BuildOpTask>(); auto task = std::make_shared<BuildOpTask>();
task->session_ = session; task->session_ = session;
task->op_run_info_ = op_run_info; task->op_run_info_ = op_run_info;
task->graph_info_ = graph_info; task->graph_info_ = graph_info;
task->input_tensors_ = input_tensors; task->input_tensors_ = input_tensors;
task->tensors_mask_ = tensors_mask; task->tensors_mask_ = tensors_mask;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
CheckException();
} }
void Executor::RunOp(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info, void Executor::RunOp(const SessionPtr &session, OpRunInfo *op_run_info, const GraphInfo &graph_info,
const std::vector<tensor::TensorPtr> &input_tensors, VectorRef *outputs) { const std::vector<tensor::TensorPtr> &input_tensors, VectorRef *outputs) {
CheckException();
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<RunOpTask>(); auto task = std::make_shared<RunOpTask>();
task->session_ = session; task->session_ = session;
task->op_run_info_ = op_run_info; task->op_run_info_ = op_run_info;
task->graph_info_ = graph_info; task->graph_info_ = graph_info;
task->input_tensors_ = input_tensors; task->input_tensors_ = input_tensors;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
CheckException();
*outputs = task->outputs_; *outputs = task->outputs_;
} }
bool Executor::CreateCommGroup(const std::string &group_name, std::vector<uint32_t> ranks) { bool Executor::CreateCommGroup(const std::string &group_name, std::vector<uint32_t> ranks) {
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<CreateCommGroupTask>(); auto task = std::make_shared<CreateCommGroupTask>();
task->group_name_ = group_name; task->group_name_ = group_name;
task->ranks_ = ranks; task->ranks_ = ranks;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
return task->result_; return task->result_;
} }
bool Executor::DestroyCommGroup(const std::string &group_name) { bool Executor::DestroyCommGroup(const std::string &group_name) {
std::unique_lock<std::mutex> lock(task_mutex_);
auto task = std::make_shared<DestroyCommGroupTask>(); auto task = std::make_shared<DestroyCommGroupTask>();
task->group_name_ = group_name; task->group_name_ = group_name;
ready_tasks_.push(task); SyncRunTask(task);
task_cond_var_.notify_all();
sync_cond_var_.wait(lock);
return task->result_; return task->result_;
} }

@ -26,7 +26,6 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <exception>
#include "backend/session/session_basic.h" #include "backend/session/session_basic.h"
#include "ir/anf.h" #include "ir/anf.h"
#include "ir/tensor.h" #include "ir/tensor.h"
@ -168,6 +167,7 @@ class Executor {
bool DestroyCommGroup(const std::string &group_name); bool DestroyCommGroup(const std::string &group_name);
private: private:
void SyncRunTask(const std::shared_ptr<Task> &task);
void UpdateOutputTensors(VectorRef *outputs, void UpdateOutputTensors(VectorRef *outputs,
const std::map<tensor::TensorPtr, session::KernelWithIndex> &tensor_to_node); const std::map<tensor::TensorPtr, session::KernelWithIndex> &tensor_to_node);
std::vector<std::shared_ptr<RunGraphTask>> GetNewReadyTasks(); std::vector<std::shared_ptr<RunGraphTask>> GetNewReadyTasks();
@ -184,7 +184,6 @@ class Executor {
std::queue<std::shared_ptr<Task>> ready_tasks_; std::queue<std::shared_ptr<Task>> ready_tasks_;
std::list<std::shared_ptr<RunGraphTask>> pending_tasks_; std::list<std::shared_ptr<RunGraphTask>> pending_tasks_;
std::shared_ptr<std::thread> worker_; std::shared_ptr<std::thread> worker_;
std::exception_ptr exception_ptr_{nullptr};
}; };
} // namespace session } // namespace session
} // namespace mindspore } // namespace mindspore

@ -954,10 +954,7 @@ bool TensorNeedSync(const AnfNodePtr &parameter, const tensor::TensorPtr &tensor
} }
auto tensor_address = tensor->device_address(); auto tensor_address = tensor->device_address();
if (tensor_address != device_address) { if (tensor_address != device_address) {
if (tensor_address != nullptr) { tensor->data_sync(false);
tensor_address->SyncDeviceToHost(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c());
}
return true; return true;
} }
return false; return false;

@ -251,8 +251,7 @@ void CPUKernelRuntime::BindInputTensorAddressPtr(const session::KernelGraph &ker
MS_EXCEPTION_IF_NULL(address); MS_EXCEPTION_IF_NULL(address);
MS_EXCEPTION_IF_NULL(tensor); MS_EXCEPTION_IF_NULL(tensor);
if (tensor_address != nullptr && tensor_address != address) { if (tensor_address != nullptr && tensor_address != address) {
tensor_address->SyncDeviceToHost(tensor->shape(), LongToSize(tensor->data().nbytes()), tensor->data_type(), tensor->data_sync(false);
tensor->data_c());
} }
if (tensor->data_type() == address->type_id_ || tensor->data_type() == kNumberTypeFloat32 || if (tensor->data_type() == address->type_id_ || tensor->data_type() == kNumberTypeFloat32 ||
tensor->data_type() == kNumberTypeInt32 || tensor->data_type() == kNumberTypeInt64) { tensor->data_type() == kNumberTypeInt32 || tensor->data_type() == kNumberTypeInt64) {

@ -29,6 +29,7 @@
#include "utils/log_adapter.h" #include "utils/log_adapter.h"
#include "base/float16.h" #include "base/float16.h"
#include "utils/shape_utils.h" #include "utils/shape_utils.h"
#include "utils/ms_exception.h"
// brief mindspore namespace. // brief mindspore namespace.
// //
@ -88,6 +89,7 @@ struct WaitEvent {
return; return;
} }
cond_var_.wait(lock, [this] { return !need_wait_; }); cond_var_.wait(lock, [this] { return !need_wait_; });
MsException::GetInstance().CheckException();
} }
void set_need_wait(bool need_wait) { void set_need_wait(bool need_wait) {

@ -0,0 +1,48 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_UTILS_MS_EXCEPTION_H_
#define MINDSPORE_CORE_UTILS_MS_EXCEPTION_H_
#include <exception>
#include "utils/ms_utils.h"
namespace mindspore {
class MsException {
public:
static MsException &GetInstance() {
static MsException instance;
return instance;
}
void SetException() { exception_ptr_ = std::current_exception(); }
void CheckException() {
if (exception_ptr_ != nullptr) {
auto exception_ptr = exception_ptr_;
exception_ptr_ = nullptr;
std::rethrow_exception(exception_ptr);
}
}
private:
MsException() = default;
~MsException() = default;
DISABLE_COPY_AND_ASSIGN(MsException)
std::exception_ptr exception_ptr_{nullptr};
};
} // namespace mindspore
#endif // MINDSPORE_CORE_UTILS_MS_EXCEPTION_H_
Loading…
Cancel
Save