You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
graphengine/ge/session/session_manager.cc

423 lines
14 KiB

5 years ago
/**
* Copyright 2020 Huawei Technologies Co., Ltd
5 years ago
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "session/session_manager.h"
#include <memory>
#include <utility>
#include "common/ge/ge_util.h"
#include "framework/common/debug/ge_log.h"
5 years ago
#include "graph/ge_context.h"
#include "graph/load/model_manager/model_manager.h"
#include "graph/manager/util/rt_context_util.h"
5 years ago
using std::map;
using std::string;
using std::vector;
namespace ge {
Status SessionManager::Initialize(const std::map<std::string, std::string> &options) {
if (init_flag_) {
GELOGW("Session Manager has been initialized.");
return SUCCESS;
}
init_flag_ = true;
return SUCCESS;
}
Status SessionManager::Finalize() {
if (!init_flag_) {
GELOGW("Session Manager has not been initialized.");
return SUCCESS;
}
std::lock_guard<std::mutex> lock(mutex_);
for (auto iter = session_manager_map_.begin(); iter != session_manager_map_.end(); ++iter) {
(void)iter->second->Finalize();
}
session_manager_map_.clear();
init_flag_ = false;
return SUCCESS;
}
Status SessionManager::SetRtContext(SessionId session_id, rtContext_t rt_context) {
5 years ago
GELOGI("set rt_context RT_CTX_NORMAL_MODE, device id:%u.", GetContext().DeviceId());
GE_CHK_RT_RET(rtCtxCreate(&rt_context, RT_CTX_NORMAL_MODE, static_cast<int32_t>(GetContext().DeviceId())));
GE_CHK_RT_RET(rtCtxSetCurrent(rt_context));
RtContextUtil::GetInstance().AddRtContext(session_id, rt_context);
5 years ago
return SUCCESS;
}
Status SessionManager::CreateSession(const std::map<std::string, std::string> &options, SessionId &session_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionId next_session_id = 0;
std::lock_guard<std::mutex> lock(mutex_);
Status nextSessionIdRet = GetNextSessionId(next_session_id);
if (nextSessionIdRet != SUCCESS) {
return nextSessionIdRet;
5 years ago
}
SessionPtr sessionPtr = MakeShared<InnerSession>(next_session_id, options);
if (sessionPtr == nullptr) {
5 years ago
return MEMALLOC_FAILED;
}
Status ret = sessionPtr->Initialize();
5 years ago
if (ret != SUCCESS) {
return ret;
}
(void)session_manager_map_.emplace(std::pair<SessionId, SessionPtr>(next_session_id, sessionPtr));
5 years ago
session_id = next_session_id;
// create a context
ret = SetRtContext(session_id, rtContext_t());
5 years ago
return ret;
}
Status SessionManager::DestroySession(SessionId session_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
}
if (ModelManager::GetInstance() != nullptr) {
ModelManager::GetInstance()->DestroyAicpuSession(session_id);
}
// Unified destruct rt_context
RtContextUtil::GetInstance().DestroyRtContexts(session_id);
5 years ago
SessionPtr innerSession = it->second;
Status ret = innerSession->Finalize();
5 years ago
if (ret != SUCCESS) {
return ret;
}
(void)session_manager_map_.erase(session_id);
return ret;
}
Status SessionManager::GetVariable(SessionId session_id, const std::string &name, Tensor &val) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->GetVariable(name, val);
5 years ago
}
Status SessionManager::AddGraph(SessionId session_id, uint32_t graph_id, const Graph &graph) {
std::map<std::string, std::string> options;
return AddGraph(session_id, graph_id, graph, options);
}
Status SessionManager::AddGraph(SessionId session_id, uint32_t graph_id, const Graph &graph,
const std::map<std::string, std::string> &options) {
5 years ago
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
auto compute_graph = GraphUtils::GetComputeGraph(graph);
GE_CHECK_NOTNULL(compute_graph);
5 years ago
std::string session_graph_id = std::to_string(session_id) + "_" + std::to_string(graph_id);
if (!AttrUtils::SetStr(*compute_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id)) {
GELOGW("Set graph session_graph_id attr failed.");
} else {
GELOGD("Set graph session_graph_id attr to [%s]", session_graph_id.c_str());
}
for (auto graph : compute_graph->GetAllSubgraphs()) {
AttrUtils::SetStr(*graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
}
5 years ago
}
return innerSession->AddGraph(graph_id, graph, options);
5 years ago
}
4 years ago
Status SessionManager::AddGraphWithCopy(SessionId session_id, uint32_t graph_id, const Graph &graph,
const std::map<std::string, std::string> &options) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
4 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
auto compute_graph = GraphUtils::GetComputeGraph(graph);
GE_CHECK_NOTNULL(compute_graph);
std::string session_graph_id = std::to_string(session_id) + "_" + std::to_string(graph_id);
if (!AttrUtils::SetStr(*compute_graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id)) {
GELOGW("Set graph session_graph_id attr failed.");
} else {
GELOGD("Set graph session_graph_id attr to [%s]", session_graph_id.c_str());
}
for (auto graph : compute_graph->GetAllSubgraphs()) {
AttrUtils::SetStr(*graph, ATTR_NAME_SESSION_GRAPH_ID, session_graph_id);
}
}
return innerSession->AddGraphWithCopy(graph_id, graph, options);
}
5 years ago
Status SessionManager::RunGraph(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->RunGraph(graph_id, inputs, outputs);
5 years ago
}
Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->RemoveGraph(graph_id);
5 years ago
}
bool SessionManager::HasSession(SessionId session_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return false;
}
return session_manager_map_.find(session_id) != session_manager_map_.end();
}
Status SessionManager::GetNextSessionId(SessionId &next_session_id) {
5 years ago
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
static SessionId session_id = 0;
next_session_id = session_id++;
return SUCCESS;
}
Status SessionManager::RegisterCallBackFunc(
4 years ago
SessionId session_id, const std::string &key,
const std::function<Status(uint32_t, const std::map<std::string, ge::Tensor> &)> &callback) {
5 years ago
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->RegisterCallBackFunc(key, callback);
}
Status SessionManager::RegisterCallBackFunc(
SessionId session_id, const std::string &key,
const std::function<Status(uint32_t, const std::map<AscendString, ge::Tensor> &)> &callback) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
return innerSession->RegisterCallBackFunc(key, callback);
5 years ago
}
Status SessionManager::BuildGraph(SessionId session_id, uint32_t graph_id, const std::vector<InputTensorInfo> &inputs) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
return innerSession->BuildGraph(graph_id, inputs);
}
Status SessionManager::RunGraphAsync(SessionId session_id, uint32_t graph_id,
const std::vector<InputTensorInfo> &inputs, RunAsyncCallback callback) {
5 years ago
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->RunGraphAsync(graph_id, inputs, callback);
5 years ago
}
Status SessionManager::GetVariables(SessionId session_id, const std::vector<std::string> &var_names,
std::vector<Tensor> &var_values) {
// step 0: init session manager
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
// step 1: get all variable
std::map<std::string, GeTensorDesc> all_variables;
Status ret = innerSession->GetAllVariables(all_variables);
if (ret != SUCCESS) {
GELOGE(FAILED, "Get all variables failed.");
return FAILED;
}
// srep 2: create check point graph
Graph graph = Graph("checkpoint");
ret = innerSession->GenCheckPointGraph(all_variables, graph);
if (ret != SUCCESS) {
GELOGE(FAILED, "Build check point graph failed.");
return FAILED;
}
// step 3: run check point graph
uint32_t graph_id = GetCurrentSecondTimestap();
ret = AddGraph(session_id, graph_id, graph);
if (ret != SUCCESS) {
GELOGE(FAILED, "Add check point graph failed.");
return FAILED;
}
vector<Tensor> inputs;
vector<Tensor> outputs;
ret = RunGraph(session_id, graph_id, inputs, outputs);
if (ret != SUCCESS) {
GELOGE(FAILED, "Run check point graph failed.");
return FAILED;
}
// step 4: save variables
ret = innerSession->SaveVariables(graph, var_names, outputs, var_values);
GELOGD("[SessionManager] outputs size is [%zu], var values size is [%zu].", outputs.size(), var_values.size());
if (ret != SUCCESS) {
GELOGE(FAILED, "Save variables failed.");
return FAILED;
}
// step 5: remove graph
ret = innerSession->RemoveGraph(graph_id);
if (ret != SUCCESS) {
GELOGE(FAILED, "Remove graph failed.");
return FAILED;
}
return ret;
}
5 years ago
bool SessionManager::IsGraphNeedRebuild(SessionId session_id, uint32_t graph_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, "Session manager is not initialized.");
5 years ago
return true;
}
SessionPtr innerSession = nullptr;
5 years ago
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
GELOGE(GE_SESSION_NOT_EXIST, "The session %lu does not exists", session_id);
return true;
} else {
innerSession = it->second;
5 years ago
}
}
return innerSession->IsGraphNeedRebuild(graph_id);
5 years ago
}
} // namespace ge