From 960da5cbedda7505c4494cd94f5a76c4069b50ff Mon Sep 17 00:00:00 2001 From: Zhang Qinghua Date: Fri, 24 Jul 2020 11:12:25 +0800 Subject: [PATCH] Decouple the backend TBE from binding Python API. --- mindspore/_extends/remote/__init__.py | 19 ++ .../_extends/remote/kernel_build_server.py | 235 ++++++++++++++++++ .../backend/kernel_compiler/kernel_fusion.cc | 6 +- .../kernel_compiler/tbe/tbe_kernel_build.cc | 1 - .../kernel_compiler/tbe/tbe_kernel_build.h | 1 - .../tbe/tbe_kernel_parallel_build.cc | 67 ++--- .../tbe/tbe_kernel_parallel_build.h | 13 +- .../tbe_kernel_select/tbe_kernel_select.cc | 6 +- .../kernel_compiler/tbe/tbe_python_funcs.cc | 198 --------------- .../kernel_compiler/tbe/tbe_python_funcs.h | 45 ---- .../ccsrc/backend/session/CMakeLists.txt | 1 + .../session/ascend_inference_session.cc | 1 - .../ccsrc/backend/session/ascend_session.cc | 1 - .../backend/session/kernel_build_client.cc | 105 ++++++++ .../backend/session/kernel_build_client.h | 188 ++++++++++++++ mindspore/ccsrc/common/CMakeLists.txt | 15 +- mindspore/ccsrc/common/duplex_pipe.cc | 160 ++++++++++++ mindspore/ccsrc/common/duplex_pipe.h | 123 +++++++++ mindspore/ccsrc/common/duplex_pipe_win.cc | 48 ++++ .../device/ascend/ascend_device_address.cc | 6 +- .../device/ascend/ascend_kernel_runtime.cc | 1 - tests/ut/cpp/CMakeLists.txt | 6 + 22 files changed, 933 insertions(+), 313 deletions(-) create mode 100644 mindspore/_extends/remote/__init__.py create mode 100644 mindspore/_extends/remote/kernel_build_server.py delete mode 100644 mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.cc delete mode 100644 mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.h create mode 100644 mindspore/ccsrc/backend/session/kernel_build_client.cc create mode 100644 mindspore/ccsrc/backend/session/kernel_build_client.h create mode 100644 mindspore/ccsrc/common/duplex_pipe.cc create mode 100644 mindspore/ccsrc/common/duplex_pipe.h create mode 100644 mindspore/ccsrc/common/duplex_pipe_win.cc diff --git a/mindspore/_extends/remote/__init__.py b/mindspore/_extends/remote/__init__.py new file mode 100644 index 0000000000..b786fcce9d --- /dev/null +++ b/mindspore/_extends/remote/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +Server functions. + +Python functions that will be called in the c++ client part of MindSpore. +""" diff --git a/mindspore/_extends/remote/kernel_build_server.py b/mindspore/_extends/remote/kernel_build_server.py new file mode 100644 index 0000000000..8167e672cb --- /dev/null +++ b/mindspore/_extends/remote/kernel_build_server.py @@ -0,0 +1,235 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""kernel build server""" +import os +import sys +import time +from mindspore._extends.parallel_compile.tbe_compiler.tbe_process import create_tbe_parallel_compiler, op_select_format, check_supported + +class TbeBuilder: + """Tbe building wrapper""" + + def __init__(self): + self.tbe_builder = create_tbe_parallel_compiler() + + def start(self, json): + return self.tbe_builder.start_compile_op(json) + + def wait(self): + return self.tbe_builder.wait_one() + + def reset(self): + self.tbe_builder.reset_task_info() + + def exit(self): + self.tbe_builder.exit() + +class Messager: + '''Messager''' + + def __init__(self): + logger.info('[TRACE]', 'Messager init...') + self.message = '' + self.tbe_builder = TbeBuilder() + + def get_message(self): + """ + Get message from remote + + Returns: + message + """ + try: + # Not read by input() anymore + res = self.fin.readline() + if not res: + logger.info('[TRACE]', "read ") + self.exit() + if res[len(res) - 1] == '\n': + res = res[0:len(res)-1] + self.message = res + logger.debug('[IN]', self.message) + except EOFError: + self.exit() + finally: + pass + if self.message == '' or self.message == 'FIN': + self.send_ack() + self.exit() + return self.message + + def send_res(self, res, keep_format=True): + """ + Send result to remote + + Args: + keep_format: True or False + """ + logger.debug('[OUT]', str(res)) + if keep_format: + res_str = str(res).replace('\n', '[LF]').replace('\r', '[CR]').replace(' ', '[SP]') + else: + res_str = str(res).replace('\n', '').replace('\r', '').replace(' ', '') + tag = '[~]' # The same as client kTAG + + # Not write by print(tag + res_str, flush=True) any more + try: + self.fout.write(tag + res_str + "\n") + self.fout.flush() + except BrokenPipeError as err: + logger.info('[TRACE]', 'Write, ' + str(err)) + self.exit() + finally: + pass + + def send_ack(self, success=True): + """ + Send ack to remote + + Args: + success: True or False + """ + if success: + self.send_res('ACK') + else: + self.send_res('ERR') + + def handle(self): + """ + Communicate with remote + """ + arg = self.get_message() + if arg == 'START': + self.send_ack() + json = self.get_message() + res = self.tbe_builder.start(json) + self.send_res(res) + elif arg == 'WAIT': + self.send_ack() + task_id, res, pre = self.tbe_builder.wait() + logger.debug('[TRACE]', str(task_id) + '/' + str(res) + '/' + str(pre)) + if self.get_message() != 'CONT': + self.send_ack(False) + self.exit() + self.send_res(task_id) + if self.get_message() != 'CONT': + self.send_ack(False) + self.exit() + self.send_res(res) + if self.get_message() != 'CONT': + self.send_ack(False) + self.exit() + self.send_res(pre) + elif arg == 'RESET': + self.tbe_builder.reset() + self.send_ack() + elif arg == 'FORMAT': + self.send_ack() + json = self.get_message() + self.send_res(op_select_format(json)) + elif arg == 'SUPPORT': + self.send_ack() + json = self.get_message() + logger.debug('[SUPPORT]', json) + try: + res = check_supported(json) + except json.decoder.JSONDecodeError: + self.send_ack(False) + self.exit() + finally: + pass + self.send_res(res) + else: + self.send_ack(False) + self.exit() + + def loop(self): + """ + Messaging loop + """ + while True: + self.handle() + + def exit(self): + os.close(self.fdin) + os.close(self.fdout) + self.tbe_builder.reset() + self.tbe_builder.exit() + logger.info('[TRACE]', 'Messager Exit...') + exit() + + def run(self, fdin, fdout): + self.fdin = fdin + self.fdout = fdout + self.fin = os.fdopen(fdin, "r") + self.fout = os.fdopen(fdout, "w") + self.loop() + +class Logger: + """ + Replace dummy 'logger' to output log as below: + logger = Logger("remote_kernel_build_" + time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) + ".log") + """ + def __init__(self, level=1, dumpfile=False, filename='Logger.log'): + """ + Args: + level: 0 for debug and info, 1 for info + dumpfile: if dump log into file + """ + self.level = level + self.dumpfile = dumpfile + if self.dumpfile: + self.log = open(filename, "a") + + def write(self, msg): + self.log.write(msg) + self.flush() + + def writeline(self, tag, msg): + prefix = tag + ' REMOTE(' + str(os.getpid()) + ',python)' + line = prefix + '\t' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ':\t' + msg + print(line, flush=True) + if self.dumpfile: + self.write(line + '\n') + + def debug(self, tag, msg): + if self.level == 0: + self.writeline('[DEBUG]' + tag, msg) + + def info(self, tag, msg): + self.writeline('[INFO]' + tag, msg) + + def flush(self): + self.log.flush() + +class DummyLogger: + """DummyLogger""" + def __init__(self): + pass + + def debug(self, tag, msg): + pass + + def info(self, tag, msg): + pass + +logger = Logger() + +if __name__ == '__main__': + if len(sys.argv) != 3: + raise Exception('Incorrect argv: {}'.format(sys.argv)) + logger.debug('[TRACE]', 'argv: ' + str(sys.argv)) + messager = Messager() + messager.run(int(sys.argv[1]), int(sys.argv[2])) diff --git a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc index 0045e49bef..60a95f5c12 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc @@ -101,14 +101,14 @@ std::map KernelFusion(const std::vector int build_failed_num = 0; while (!build_manger->IsAllTaskFinish()) { int task_id = -1; - char *task_result = nullptr; - char *pre_build_result = nullptr; + std::string task_result; + std::string pre_build_result; auto ret = build_manger->WaitOne(&task_id, &task_result, &pre_build_result); if (!ret) { MS_EXCEPTION(ArgumentError) << "Build Failed. wait one ret:" << ret << ", task id:" << task_id; } - if ((task_result != nullptr) && (strcmp(task_result, "Success") != 0)) { + if (task_result != "Success") { MS_LOG(INFO) << "Fusion warning: Fuison op build failed, err log: " << task_result << " change to single op build."; build_failed_num++; diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc index 7e92013657..c6b62a1766 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc @@ -22,7 +22,6 @@ #include "frontend/parallel/ops_info/ops_utils.h" #include "backend/session/anf_runtime_algorithm.h" #include "backend/kernel_compiler/tbe/tbe_adapter.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "backend/kernel_compiler/tbe/tbe_convert_utils.h" #include "backend/kernel_compiler/tbe/tbe_utils.h" diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.h b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.h index 6447c6cff6..3a00169632 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.h +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.h @@ -26,7 +26,6 @@ #include #include "ir/dtype.h" #include "backend/kernel_compiler/kernel.h" -#include "pybind11/stl.h" #include "backend/kernel_compiler/oplib/oplib.h" #include "backend/kernel_compiler/tbe/tbe_adapter.h" diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.cc b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.cc index 48223f40c6..f694142763 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.cc @@ -29,18 +29,12 @@ #include "backend/kernel_compiler/tbe/tbe_kernel_mod.h" #include "backend/session/anf_runtime_algorithm.h" #include "./common.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "backend/kernel_compiler/tbe/tbe_convert_utils.h" #include "backend/kernel_compiler/tbe/tbe_utils.h" namespace mindspore { namespace kernel { using mindspore::kernel::tbe::TbeUtils; -constexpr auto kParallelCompileModule = "mindspore._extends.parallel_compile.tbe_compiler.tbe_process"; -constexpr auto kCreateParallelCompiler = "create_tbe_parallel_compiler"; -constexpr auto kStartCompileOp = "start_compile_op"; -constexpr auto kWaitOne = "wait_one"; -constexpr auto kResetTaskInfo = "reset_task_info"; bool TbeOpParallelPreBuild(const std::vector &anf_nodes) { auto build_manger = std::make_shared(); @@ -61,14 +55,14 @@ bool TbeOpParallelPreBuild(const std::vector &anf_nodes) { } while (!build_manger->IsAllPreTaskFinish()) { int task_id = -1; - char *task_result = nullptr; - char *pre_build_result = nullptr; + std::string task_result; + std::string pre_build_result; auto ret = build_manger->WaitOne(&task_id, &task_result, &pre_build_result); if (!ret) { MS_EXCEPTION(ArgumentError) << "Pre Build Failed. wait one ret:" << ret << ", task id:" << task_id; } - if ((task_result != nullptr) && (strcmp(task_result, "Success") != 0)) { + if (task_result != "Success") { MS_EXCEPTION(ArgumentError) << "task pre compile Failed, task id:" << task_id << ", cause:" << task_result; } @@ -116,14 +110,14 @@ bool TbeOpParallelBuild(const std::vector &anf_nodes) { } while (!build_manger->IsAllTaskFinish()) { int task_id = -1; - char *task_result = nullptr; - char *pre_build_result = nullptr; + std::string task_result; + std::string pre_build_result; auto ret = build_manger->WaitOne(&task_id, &task_result, &pre_build_result); if (!ret) { MS_EXCEPTION(ArgumentError) << "Build Failed. wait one ret:" << ret << ", task id:" << task_id; } - if ((task_result != nullptr) && (strcmp(task_result, "Success") != 0)) { + if (task_result != "Success") { MS_EXCEPTION(ArgumentError) << "task compile Failed, task id:" << task_id << ", cause:" << task_result; } (void)build_manger->TaskFinishProcess(task_id); @@ -131,43 +125,10 @@ bool TbeOpParallelBuild(const std::vector &anf_nodes) { return build_manger->GenSameOpKernelMod(); } -ParallelBuildManager::ParallelBuildManager() { tbe_parallel_compiler_ = TbePythonFuncs::TbeParallelCompiler(); } +ParallelBuildManager::ParallelBuildManager() {} ParallelBuildManager::~ParallelBuildManager() { ResetTaskInfo(); } -int32_t ParallelBuildManager::StartCompileOp(const nlohmann::json &kernel_json) const { - PyObject *pRes = nullptr; - PyObject *pArgs = PyTuple_New(1); - std::string json_str = kernel_json.dump(); - PyObject *arg1 = Py_BuildValue("s", json_str.c_str()); - (void)PyTuple_SetItem(pArgs, 0, arg1); - pRes = PyObject_CallMethod(tbe_parallel_compiler_, kStartCompileOp, "O", pArgs); - if (pRes == nullptr) { - PyErr_Print(); - MS_EXCEPTION(ArgumentError) << "Failed to call function start_compile_op"; - } - int task_id; - (void)PyArg_Parse(pRes, "i", &task_id); - MS_LOG(INFO) << "start compile , task id:" << task_id; - return task_id; -} - -bool ParallelBuildManager::WaitOne(int *task_id, char **task_result, char **pre_build_result) const { - MS_LOG(INFO) << "wait task start."; - MS_EXCEPTION_IF_NULL(task_id); - MS_EXCEPTION_IF_NULL(task_result); - PyObject *pRes = nullptr; - PyObject *pArg = Py_BuildValue("()"); - pRes = PyObject_CallMethod(tbe_parallel_compiler_, kWaitOne, "O", pArg); - if (pRes == nullptr) { - PyErr_Print(); - MS_EXCEPTION(ArgumentError) << "Failed to call function wait_one"; - return false; - } - (void)PyArg_ParseTuple(pRes, "iss", task_id, task_result, pre_build_result); - return true; -} - void ParallelBuildManager::SavePreTaskInfo(int32_t task_id, const mindspore::AnfNodePtr &anf_node) { MS_LOG(INFO) << "SavePreTaskInfo, task id: " << task_id; pre_task_map_[task_id] = anf_node; @@ -310,6 +271,15 @@ KernelModPtr ParallelBuildManager::GenKernelMod(const string &json_name, const s return kernel_mod_ptr; } +int ParallelBuildManager::StartCompileOp(const nlohmann::json &kernel_json) { + return KernelBuildClient::Instance().Start(kernel_json.dump()); +} + +bool ParallelBuildManager::WaitOne(int *task_id, std::string *task_result, std::string *pre_build_result) { + MS_EXCEPTION_IF_NULL(task_id); + return KernelBuildClient::Instance().Wait(task_id, task_result, pre_build_result); +} + void ParallelBuildManager::ResetTaskInfo() { if (task_map_.empty()) { MS_LOG(INFO) << "All tasks are compiled success."; @@ -317,10 +287,7 @@ void ParallelBuildManager::ResetTaskInfo() { } task_map_.clear(); same_op_list_.clear(); - if (tbe_parallel_compiler_ != nullptr) { - PyObject *pArg = Py_BuildValue("()"); - (void)PyObject_CallMethod(tbe_parallel_compiler_, kResetTaskInfo, "O", pArg); - } + KernelBuildClient::Instance().Reset(); } } // namespace kernel } // namespace mindspore diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h index 22b4f77268..a026f186c0 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h @@ -21,9 +21,11 @@ #include #include #include -#include "backend/kernel_compiler/kernel.h" -#include "pybind11/stl.h" #include + +#include "backend/kernel_compiler/kernel.h" +#include "backend/session/kernel_build_client.h" + namespace mindspore { namespace kernel { bool TbeOpParallelPreBuild(const std::vector &anf_nodes); @@ -42,7 +44,6 @@ class ParallelBuildManager { public: ParallelBuildManager(); ~ParallelBuildManager(); - int32_t StartCompileOp(const nlohmann::json &kernel_json) const; void SavePreTaskInfo(int32_t task_id, const AnfNodePtr &anf_node); void SaveTaskInfo(int32_t task_id, const AnfNodePtr &anf_node, const std::string &json_name, const std::vector &input_size_list, const std::vector &output_size_list, @@ -54,7 +55,6 @@ class ParallelBuildManager { const std::vector &input_size_list, const std::vector &output_size_list, AnfNode *node) const; - bool WaitOne(int *task_id, char **task_result, char **pre_build_result) const; bool IsAllPreTaskFinish() const; bool IsAllTaskFinish() const; void PreTaskFinishProcess(int32_t task_id, const std::string &pre_build_result); @@ -62,10 +62,13 @@ class ParallelBuildManager { KernelModPtr GenKernelMod(const string &json_name, const string &processor, const std::vector &input_size_list, const std::vector &output_size_list, const KernelPackPtr &kernel_pack) const; + + // Interactive with real backend, who could be implemented by Python. + int StartCompileOp(const nlohmann::json &kernel_json); + bool WaitOne(int *task_id, std::string *task_result, std::string *pre_build_result); void ResetTaskInfo(); private: - PyObject *tbe_parallel_compiler_; std::map pre_task_map_; std::map task_map_; std::vector same_op_list_; diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_select/tbe_kernel_select.cc b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_select/tbe_kernel_select.cc index bc7cd6b42c..8e923f7449 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_select/tbe_kernel_select.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_select/tbe_kernel_select.cc @@ -24,7 +24,6 @@ #include "backend/kernel_compiler/tbe/tbe_kernel_build.h" #include "nlohmann/json.hpp" #include "utils/context/ms_context.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "backend/optimizer/common/helper.h" #include "backend/kernel_compiler/tbe/tbe_convert_utils.h" #include "frontend/parallel/ops_info/ops_utils.h" @@ -32,6 +31,7 @@ #include "backend/kernel_compiler/tbe/tbe_kernel_select/tbe_kernel_reduce_selecter.h" #include "backend/kernel_compiler/tbe/tbe_kernel_select/common_utils.h" #include "backend/kernel_compiler/tbe/tbe_kernel_select/tbe_property_checker.h" +#include "backend/session/kernel_build_client.h" namespace mindspore { namespace kernel { @@ -314,7 +314,7 @@ bool TbeKernelSelect::TbeCheckSupported( if (!ret) { MS_LOG(EXCEPTION) << "Gen tbe single kernel json for check support failed."; } - ret = TbePythonFuncs::CheckSupported(kernel_json); + ret = KernelBuildClient::Instance().CheckSupported(kernel_json.dump()); AnfAlgo::SetSelectKernelBuildInfo(kernel_build_info_tmp, cnode_ptr_.get()); return ret; } @@ -488,7 +488,7 @@ std::string TbeKernelSelect::OpSelectFormat() { if (!ret) { MS_LOG(EXCEPTION) << "GenTbeSingleKernelJson failed."; } - res_json_str = TbePythonFuncs::OpSelectFormat(kernel_json); + res_json_str = KernelBuildClient::Instance().SelectFormat(kernel_json.dump()); if (res_json_str.empty()) { MS_LOG(EXCEPTION) << "op select format error."; } diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.cc b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.cc deleted file mode 100644 index facb07991a..0000000000 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.cc +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Copyright 2019 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" -#include "backend/kernel_compiler/tbe/tbe_utils.h" -#include "common/utils.h" -#include "utils/context/ms_context.h" - -namespace mindspore { -namespace kernel { -using mindspore::kernel::tbe::TbeUtils; -constexpr auto kTbeProcessModule = "mindspore._extends.parallel_compile.tbe_compiler.tbe_process"; -constexpr auto kCreateTbeParallelCompilerFunc = "create_tbe_parallel_compiler"; -constexpr auto kOpSelectFormatFunc = "op_select_format"; -constexpr auto kCheckSupportedFunc = "check_supported"; -constexpr auto kTBEException = "TBEException"; - -PyObject *TbePythonFuncs::pCreateTbeParallelCompilerFunc_ = nullptr; -PyObject *TbePythonFuncs::pTbeCompiler_ = nullptr; -PyObject *TbePythonFuncs::pOpSelectFormatFunc_ = nullptr; -PyObject *TbePythonFuncs::pCheckSupportedFunc_ = nullptr; -bool TbePythonFuncs::Init() { - static bool initialized = false; - if (initialized) { - return true; - } - // Initialize cache - TbeUtils::LoadCache(); - - // tbe_process - PyObject *pTbeProcessModule = nullptr; - pTbeProcessModule = PyImport_ImportModule(kTbeProcessModule); - if (pTbeProcessModule == nullptr) { - MS_LOG(ERROR) << "Failed to import [" << kTbeProcessModule << "] module."; - return false; - } - - pCreateTbeParallelCompilerFunc_ = PyObject_GetAttrString(pTbeProcessModule, kCreateTbeParallelCompilerFunc); - if (pCreateTbeParallelCompilerFunc_ == nullptr) { - MS_LOG(ERROR) << "Failed to transform opModule and FuncName to PyObject, opModule:[" << kTbeProcessModule - << "], FuncName:[" << kCreateTbeParallelCompilerFunc << "]."; - return false; - } - - pTbeCompiler_ = PyEval_CallObject(pCreateTbeParallelCompilerFunc_, nullptr); - if (pTbeCompiler_ == nullptr) { - PyErr_Print(); - MS_EXCEPTION(ArgumentError) << "Failed to call function : create_parallel_compiler."; - return false; - } - - pOpSelectFormatFunc_ = PyObject_GetAttrString(pTbeProcessModule, kOpSelectFormatFunc); - if (pOpSelectFormatFunc_ == nullptr) { - MS_LOG(ERROR) << "Failed to transform opModule and FuncName to PyObject, opModule:[" << kTbeProcessModule - << "], FuncName:[" << kOpSelectFormatFunc << "]."; - return false; - } - - pCheckSupportedFunc_ = PyObject_GetAttrString(pTbeProcessModule, kCheckSupportedFunc); - if (pCheckSupportedFunc_ == nullptr) { - MS_LOG(ERROR) << "Failed to transform opModule and FuncName to PyObject, opModule:[" << kTbeProcessModule - << "], FuncName:[" << kCheckSupportedFunc << "]."; - return false; - } - initialized = true; - MS_LOG(INFO) << "TbePythonFuncs initialized Success."; - return true; -} - -std::string TbePythonFuncs::PyObjectToStr(PyObject *PyObj) { - char *pChar = nullptr; - std::string str_res; - if (PyObj == nullptr) { - MS_LOG(ERROR) << "Input parameter is nullptr."; - return str_res; - } - PyObject *strArgs = PyObject_Str(PyObj); - if (strArgs != nullptr) { - (void)PyArg_Parse(strArgs, "s", &pChar); - } - if (pChar == nullptr) { - MS_LOG(ERROR) << "pChar is nullptr."; - return str_res; - } - str_res = pChar; - return str_res; -} - -std::string TbePythonFuncs::OpSelectFormat(const nlohmann::json &kernel_json) { - PyObject *pArg = nullptr; - PyObject *pRet = nullptr; - std::string res_json_str; - - if (!Init()) { - MS_LOG(ERROR) << "TbePythonFuncs Initialize Failed !"; - return res_json_str; - } - - // assembly Args - pArg = PyTuple_New(1); - std::string json_str = kernel_json.dump(); - (void)PyTuple_SetItem(pArg, 0, Py_BuildValue("s", json_str.c_str())); - if (pArg == nullptr) { - MS_LOG(ERROR) << "Failed to generate parameter from kernel_json to PyObject."; - return res_json_str; - } - - // call functions - if (pOpSelectFormatFunc_ == nullptr) { - MS_LOG(ERROR) << "function is nullptr."; - return res_json_str; - } - - pRet = PyEval_CallObject(pOpSelectFormatFunc_, pArg); - if (pRet == nullptr) { - PyErr_Print(); - MS_EXCEPTION(ArgumentError) << "Failed to call function [" << kOpSelectFormatFunc - << "], function args:" << PyObjectToStr(pArg); - } - - char *pstr = nullptr; - (void)PyArg_Parse(pRet, "s", &pstr); - res_json_str = pstr; - if (res_json_str.compare(0, strlen(kTBEException), kTBEException) == 0) { - MS_EXCEPTION(ArgumentError) << "Failed to call function [" << kOpSelectFormatFunc << "], " << res_json_str - << " ,function args:" << PyObjectToStr(pArg); - } - return res_json_str; -} - -bool TbePythonFuncs::CheckSupported(const nlohmann::json &kernel_json) { - PyObject *pArg = nullptr; - PyObject *pRes = nullptr; - bool ret = false; - - if (!Init()) { - MS_LOG(ERROR) << "TbePythonFuncs Initialize Failed !"; - return ret; - } - // assembly Args - pArg = PyTuple_New(1); - std::string json_str = kernel_json.dump(); - PyObject *arg1 = Py_BuildValue("s", json_str.c_str()); - (void)PyTuple_SetItem(pArg, 0, arg1); - if (pArg == nullptr) { - MS_LOG(ERROR) << "Failed to generate parameter from kernel_json to PyObject."; - return ret; - } - - // call functions - if (pCheckSupportedFunc_ == nullptr) { - MS_LOG(ERROR) << "function is nullptr."; - return ret; - } - - pRes = PyEval_CallObject(pCheckSupportedFunc_, pArg); - if (pRes == nullptr) { - PyErr_Print(); - MS_EXCEPTION(ArgumentError) << "Failed to call function [" << kCheckSupportedFunc - << "], function args: " << PyObjectToStr(pArg); - } - if (PyBool_Check(pRes)) { - ret = PyObject_IsTrue(pRes) != 0; - } else { - char *pstr = nullptr; - (void)PyArg_Parse(pRes, "s", &pstr); - std::string res_str = pstr; - if (res_str.compare(0, strlen(kTBEException), kTBEException) == 0) { - MS_EXCEPTION(ArgumentError) << "Failed to call function [" << kCheckSupportedFunc << "], " << res_str - << ", function args: " << PyObjectToStr(pArg); - } - } - - return ret; -} - -PyObject *TbePythonFuncs::TbeParallelCompiler() { - if (!Init()) { - MS_LOG(ERROR) << "TbePythonFuncs Initialize Failed !"; - return nullptr; - } - return pTbeCompiler_; -} -} // namespace kernel -} // namespace mindspore diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.h b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.h deleted file mode 100644 index f868fa2d63..0000000000 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_python_funcs.h +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_TBE_TBE_PYTHON_FUNCS_H_ -#define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_TBE_TBE_PYTHON_FUNCS_H_ - -#include -#include -#include "pybind11/stl.h" -#include "utils/log_adapter.h" - -namespace mindspore { -namespace kernel { -class TbePythonFuncs { - public: - TbePythonFuncs() = default; - ~TbePythonFuncs() = default; - static std::string OpSelectFormat(const nlohmann::json &kernel_json); - static bool CheckSupported(const nlohmann::json &kernel_json); - static PyObject *TbeParallelCompiler(); - - private: - static bool Init(); - static std::string PyObjectToStr(_object *PyObj); - static PyObject *pCreateTbeParallelCompilerFunc_; - static PyObject *pTbeCompiler_; - static PyObject *pOpSelectFormatFunc_; - static PyObject *pCheckSupportedFunc_; -}; -} // namespace kernel -} // namespace mindspore -#endif // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_TBE_TBE_PYTHON_FUNCS_H_ diff --git a/mindspore/ccsrc/backend/session/CMakeLists.txt b/mindspore/ccsrc/backend/session/CMakeLists.txt index b7b791ada9..b6d340a325 100644 --- a/mindspore/ccsrc/backend/session/CMakeLists.txt +++ b/mindspore/ccsrc/backend/session/CMakeLists.txt @@ -1,4 +1,5 @@ file(GLOB_RECURSE _SESSION_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + "kernel_build_client.cc" "kernel_graph.cc" "session_basic.cc" "session_factory.cc" diff --git a/mindspore/ccsrc/backend/session/ascend_inference_session.cc b/mindspore/ccsrc/backend/session/ascend_inference_session.cc index ee0816ee9e..1c2cd50707 100644 --- a/mindspore/ccsrc/backend/session/ascend_inference_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_inference_session.cc @@ -24,7 +24,6 @@ #include "backend/session/anf_runtime_algorithm.h" #include "common/utils.h" #include "common/trans.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "utils/config_manager.h" #include "utils/base_ref_extends.h" diff --git a/mindspore/ccsrc/backend/session/ascend_session.cc b/mindspore/ccsrc/backend/session/ascend_session.cc index cf2f85ed9f..04a7f2f105 100644 --- a/mindspore/ccsrc/backend/session/ascend_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_session.cc @@ -43,7 +43,6 @@ #include "common/utils.h" #include "backend/optimizer/common/helper.h" #include "runtime/device/kernel_runtime_manager.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "utils/config_manager.h" #include "utils/base_ref_extends.h" #include "debug/tensor_load.h" diff --git a/mindspore/ccsrc/backend/session/kernel_build_client.cc b/mindspore/ccsrc/backend/session/kernel_build_client.cc new file mode 100644 index 0000000000..97f55cb171 --- /dev/null +++ b/mindspore/ccsrc/backend/session/kernel_build_client.cc @@ -0,0 +1,105 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "backend/session/kernel_build_client.h" + +#include +#include + +namespace mindspore { +namespace kernel { +void ReplaceStr(std::string *dest, const std::string &replace, char new_char) { + std::string::size_type start = 0; + while ((start = (*dest).find(replace, start)) != std::string::npos) { + (*dest).replace(start, replace.size(), 1, new_char); + start++; // Replaced 1 charactor. + } +} + +int KernelBuildClient::Start(const std::string &json) { + // Start compiling.. + std::string res = SendRequest(kSTART); + if (res != kACK) { + MS_LOG(ERROR) << "START failed, res: " << res; + return -1; + } + // Send the json data. + res = SendRequest(json); + if (res == kFAILED) { + MS_LOG(ERROR) << "START send data failed, res: " << res; + return -1; + } + // Return task id. + return std::stoi(res); +} + +bool KernelBuildClient::Wait(int *task_id, std::string *task_result, std::string *pre_build_result) { + // Start waiting.. + std::string res = SendRequest(kWAIT); + if (res != kACK) { + MS_LOG(ERROR) << "WAIT failed, res: " << res; + return false; + } + // Request task id. + *task_id = std::stoi(SendRequest(kCONT)); + // Requst task result. + *task_result = SendRequest(kCONT); + // Request prebuild result. + *pre_build_result = SendRequest(kCONT); + return true; +} + +void KernelBuildClient::Reset() { + // Start compiling.. + std::string res = SendRequest(kRESET); + if (res != kACK) { + MS_LOG(EXCEPTION) << "RESET response is: " << res; + } +} + +std::string KernelBuildClient::SelectFormat(const std::string &json) { + // Start compiling.. + std::string res = SendRequest(kFORMAT); + if (res != kACK) { + MS_LOG(ERROR) << "FORMAT failed, res: " << res; + return ""; + } + // Send the json data. + res = SendRequest(json); + if (res == kERR) { + MS_LOG(ERROR) << "FORMAT send data failed, res: " << res; + return ""; + } + return res; +} + +bool KernelBuildClient::CheckSupported(const std::string &json) { + // Checking support.. + std::string res = SendRequest(kSUPPORT); + if (res != kACK) { + MS_LOG(ERROR) << "SUPPORT failed, res: " << res; + return false; + } + // Send the json data. + res = SendRequest(json); + if (res != kTRUE) { + MS_LOG(ERROR) << "SUPPORT send data failed, res: " << res; + return false; + } + return true; +} +} // namespace kernel +} // namespace mindspore diff --git a/mindspore/ccsrc/backend/session/kernel_build_client.h b/mindspore/ccsrc/backend/session/kernel_build_client.h new file mode 100644 index 0000000000..8b38d53e62 --- /dev/null +++ b/mindspore/ccsrc/backend/session/kernel_build_client.h @@ -0,0 +1,188 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_BACKEND_SESSION_KERNEL_BUILD_CLIENT_H_ +#define MINDSPORE_CCSRC_BACKEND_SESSION_KERNEL_BUILD_CLIENT_H_ + +#include +#include +#include +#include + +#include "common/duplex_pipe.h" +#include "utils/log_adapter.h" + +namespace mindspore { +namespace kernel { +void ReplaceStr(std::string *dest, const std::string &replace, char new_char); +class KernelBuildClient { + public: + // Server configure + constexpr inline static auto kEnv = "python"; + constexpr inline static auto kGetPathScript = + "-c " + "\"" + "import pkgutil;" + "path = pkgutil" + ".get_loader(\\\"mindspore._extends.remote.kernel_build_server\\\")" // Server module name + ".get_filename();" + "print('[~]' + path)" + "\""; + + // Receive the response from server + constexpr inline static auto kACK = "ACK"; + constexpr inline static auto kERR = "ERR"; + constexpr inline static auto kFAILED = "-1"; + // Send Finish request to server + constexpr inline static auto kFIN = "FIN"; + + // Send building request to server + constexpr inline static auto kSTART = "START"; + constexpr inline static auto kWAIT = "WAIT"; + constexpr inline static auto kCONT = "CONT"; + constexpr inline static auto kSUCCESS = "Success"; + constexpr inline static auto kRESET = "RESET"; + + // Send server info. query to server + constexpr inline static auto kFORMAT = "FORMAT"; + constexpr inline static auto kSUPPORT = "SUPPORT"; + constexpr inline static auto kTRUE = "True"; + + // Revert \n, \r, [space]. + constexpr inline static auto kLF = "[LF]"; + constexpr inline static auto kCR = "[CR]"; + constexpr inline static auto kSP = "[SP]"; + + // The TAG as prefix of real command from remote. + constexpr inline static auto kTAG = "[~]"; + + constexpr inline static int kBufferSize = 4096; + constexpr inline static unsigned int kTimeOutSeconds = 20; + + static KernelBuildClient &Instance() { + static KernelBuildClient instance; + return instance; + } + std::string GetScriptPath() { + std::string cmd = kEnv; + (void)cmd.append(1, ' ').append(kGetPathScript); + FILE *fpipe = popen(cmd.c_str(), "r"); + if (fpipe == nullptr) { + MS_LOG(EXCEPTION) << "popen failed, " << strerror(errno) << "(" << errno << ")"; + } + bool start = false; + std::string result; + char buf[kBufferSize]; + while (std::fgets(buf, sizeof(buf), fpipe) != nullptr) { + if (std::strncmp(buf, kTAG, std::strlen(kTAG)) == 0) { + start = true; + } + // Filter with 'kTAG' and '\n' + if (start) { + auto size = std::strlen(buf); + bool line_end = buf[size - 1] == '\n'; + result.append(buf, line_end ? size - 1 : size); + if (line_end) { + break; + } + } + } + pclose(fpipe); + const std::string py_suffix = ".py"; + if (result.empty() || result.rfind(py_suffix) != (result.length() - py_suffix.length())) { + MS_LOG(EXCEPTION) << "py file seems incorrect, result: {" << result << "}"; + } + result = result.substr(strlen(kTAG)); + MS_LOG(DEBUG) << "result: " << result; + return result; + } + + void Open() { + if (!init_) { + // Exception's thrown if open failed + if (dp_->Open({kEnv, GetScriptPath()}, true) != -1) { + dp_->SetTimeOutSeconds(kTimeOutSeconds); + dp_->SetTimeOutCallback([this]() { SendRequest(kFIN); }); + init_ = true; + } + } + } + void Close() { + if (init_) { + dp_->Close(); + init_ = false; + } + } + + // Send a request and fetch its response + std::string SendRequest(std::string data) { + Request(data); + return Response(); + } + void Request(std::string req) { + if (!init_) { + MS_LOG(EXCEPTION) << "Try to send request before Open()"; + } + MS_LOG(DEBUG) << "\t[" << req << "]"; + *dp_ << req; + } + std::string Response() { + if (!init_) { + MS_LOG(EXCEPTION) << "Try to get response before Open()"; + } + std::string res; + *dp_ >> res; + // Filter out the interference + auto start = res.find(kTAG); + if (start == std::string::npos) { + MS_LOG(EXCEPTION) << "Response seems incorrect, res: " << res; + } + res = res.substr(start + std::strlen(kTAG), res.size() - start); + // Revert the line feed and space + if (res != kSUCCESS && res != kACK && res != kERR && res != kTRUE) { + ReplaceStr(&res, kLF, '\n'); + ReplaceStr(&res, kSP, ' '); + } + MS_LOG(DEBUG) << "\t[" << res << "]"; + return res; + } + + // Before building. + std::string SelectFormat(const std::string &json); + bool CheckSupported(const std::string &json); + + // Run building. + int Start(const std::string &json); + bool Wait(int *task_id, std::string *task_result, std::string *pre_build_result); + void Reset(); + + KernelBuildClient(const KernelBuildClient &) = delete; + KernelBuildClient &operator=(const KernelBuildClient &) = delete; + + KernelBuildClient(KernelBuildClient &&) = delete; + KernelBuildClient &operator=(KernelBuildClient &&) = delete; + + private: + KernelBuildClient() : init_(false), dp_(std::make_shared()) { Open(); } + ~KernelBuildClient() { Close(); } + + bool init_; + std::shared_ptr dp_; +}; +} // namespace kernel +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_BACKEND_SESSION_KERNEL_BUILD_CLIENT_H_ diff --git a/mindspore/ccsrc/common/CMakeLists.txt b/mindspore/ccsrc/common/CMakeLists.txt index 6673af4b70..cc393db54a 100644 --- a/mindspore/ccsrc/common/CMakeLists.txt +++ b/mindspore/ccsrc/common/CMakeLists.txt @@ -1,3 +1,16 @@ -file(GLOB_RECURSE _COMMON_ALL_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") +if (CMAKE_SYSTEM_NAME MATCHES "Windows") + file(GLOB_RECURSE _COMMON_ALL_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + "trans.cc" + "utils.cc" + "duplex_pipe_win.cc" + ) +else() + file(GLOB_RECURSE _COMMON_ALL_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + "trans.cc" + "utils.cc" + "duplex_pipe.cc" + ) +endif() + set_property(SOURCE ${_COMMON_ALL_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_COMMON) add_library(_mindspore_common_obj OBJECT ${_COMMON_ALL_SRC_FILES}) diff --git a/mindspore/ccsrc/common/duplex_pipe.cc b/mindspore/ccsrc/common/duplex_pipe.cc new file mode 100644 index 0000000000..b41be6e573 --- /dev/null +++ b/mindspore/ccsrc/common/duplex_pipe.cc @@ -0,0 +1,160 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/duplex_pipe.h" + +#include +#include +#include +#include + +namespace mindspore { +int DuplexPipe::Open(std::initializer_list arg_list, bool append_fds) { + if (pipe(fd1_) == -1) { + DP_EXCEPTION << "pipe 1 failed, " << strerror(errno) << "(" << errno << ")"; + } + if (pipe(fd2_) == -1) { + close(fd1_[0]); + close(fd1_[1]); + DP_EXCEPTION << "pipe 2 failed, " << strerror(errno) << "(" << errno << ")"; + } + + pid_ = fork(); + if (pid_ < 0) { + close(fd1_[0]); + close(fd1_[1]); + close(fd2_[0]); + close(fd2_[1]); + DP_EXCEPTION << "fork failed, " << strerror(errno) << "(" << errno << ")"; + } else if (pid_ == 0) { // Remote process + DP_INFO << "Remote process, pid: " << getpid() << ", " << fd1_[0] << "/" << fd2_[1]; + remote_stdout_ = dup(STDOUT_FILENO); + remote_stdin_ = dup(STDIN_FILENO); + remote_stderr_ = dup(STDERR_FILENO); + close(fd1_[1]); + close(fd2_[0]); + if (!append_fds) { + dup2(fd1_[0], STDIN_FILENO); + dup2(fd2_[1], STDOUT_FILENO); + } + std::vector args; + std::transform(arg_list.begin(), arg_list.end(), std::back_inserter(args), + [](const std::string &arg) -> const char * { return arg.c_str(); }); + if (append_fds) { + std::string fd10 = std::to_string(fd1_[0]).c_str(); + args.emplace_back(fd10.c_str()); + std::string fd21 = std::to_string(fd2_[1]).c_str(); + args.emplace_back(fd21.c_str()); + } + args.emplace_back(nullptr); + if (execvp(args[0], const_cast(&args[0])) == -1) { + DP_EXCEPTION << "execute " << args[0] << " failed, " << strerror(errno) << "(" << errno << ")"; + } + } else { // Local process + DP_INFO << "Local process, id: " << getpid() << ", " << fd2_[0] << "/" << fd1_[1]; + local_stdout_ = dup(STDOUT_FILENO); + local_stdin_ = dup(STDIN_FILENO); + local_stderr_ = dup(STDERR_FILENO); + close(fd1_[0]); + close(fd2_[1]); + } + return 0; +} + +void DuplexPipe::Write(const std::string &buf, bool flush) { + // Write the string into pipe + if (write(fd1_[1], buf.data(), buf.size()) == -1) { + DP_ERROR << "write failed, error: " << strerror(errno) << "(" << errno << ")"; + return; + } + if (flush) { + // Flush into the pipe + if (write(fd1_[1], "\n", 1) == -1) { + DP_ERROR << "write failed, error: " << strerror(errno) << "(" << errno << ")"; + return; + } + } + DP_DEBUG << "<< [" << buf << "]"; +} + +std::string DuplexPipe::Read() { + // Read the string from pipe + std::string buf; + ssize_t size; + // MAYBE BLOCKED + // Read one line or multiple lines + while (SetTimeOut(), (size = read(fd2_[0], c_buf_, kBufferSize)) > 0) { // Till reading something + CancelTimeOut(); + DP_DEBUG << ">> [" << c_buf_ << "]"; + bool line_end = c_buf_[size - 1] == '\n'; + buf.append(c_buf_, line_end ? size - 1 : size); // Copy without the last '\n' + if (line_end) { + break; + } + } + return buf; +} + +void DuplexPipe::WriteWithStdout(const std::string &buf, bool flush) { + dup2(fd1_[1], STDOUT_FILENO); + // Write the string into pipe + std::cout << buf; + if (flush) { + // Flush into the pipe + std::cout << std::endl; + } + dup2(local_stdout_, STDOUT_FILENO); +} + +std::string DuplexPipe::ReadWithStdin() { + std::string buf; + dup2(fd2_[0], STDIN_FILENO); + // Maybe blocked + SetTimeOut(); + std::getline(std::cin, buf); // Not use 'std::cin >>' to include space + CancelTimeOut(); + dup2(local_stdin_, STDIN_FILENO); + return buf; +} + +DuplexPipe &DuplexPipe::operator<<(const std::string &buf) { + Write(buf); + return *this; +} + +DuplexPipe &DuplexPipe::operator>>(std::string &buf) { + buf = Read(); + return *this; +} + +void DuplexPipe::Close() { + close(fd1_[0]); + close(fd1_[1]); + close(fd2_[0]); + close(fd2_[1]); +} + +void DuplexPipe::Alarm::Set(std::shared_ptr dp, unsigned int interval_secs) { + dp_ = dp; + signal(SIGALRM, SigHandler); + alarm(interval_secs); +} + +void DuplexPipe::Alarm::Cancel() { + alarm(0); + dp_.reset(); +} +} // namespace mindspore diff --git a/mindspore/ccsrc/common/duplex_pipe.h b/mindspore/ccsrc/common/duplex_pipe.h new file mode 100644 index 0000000000..a051e9b560 --- /dev/null +++ b/mindspore/ccsrc/common/duplex_pipe.h @@ -0,0 +1,123 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_ +#define MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_ + +#include +#include +#include +#include +#include + +#include "utils/log_adapter.h" +#define DP_DEBUG MS_LOG(DEBUG) << "[DuplexPipe] " +#define DP_INFO MS_LOG(INFO) << "[DuplexPipe] " +#define DP_ERROR MS_LOG(ERROR) << "[DuplexPipe] " +#define DP_EXCEPTION MS_LOG(EXCEPTION) << "[DuplexPipe] " + +namespace mindspore { +// A tool to run a command as child process and build a duplex pipe between them. +// Similar to 'popen()', but use duplex not simplex pipe, more like 'socketpair'. +class DuplexPipe : public std::enable_shared_from_this { + public: + constexpr inline static int kBufferSize = 4096; + constexpr inline static unsigned int kTimeOutSeconds = 5; + + DuplexPipe() = default; + ~DuplexPipe() = default; + + // Create a subprocess and open a duplex pipe between local and remote + int Open(std::initializer_list arg_list, bool append_fds = false); + void Close(); + void SetTimeOutSeconds(unsigned int secs) { time_out_secs_ = secs; } + void SetTimeOutCallback(const std::function &cb) { + has_time_out_callback_ = true; + time_out_callback_ = cb; + } + + // Write the 'buf' to remote stdin + void Write(const std::string &buf, bool flush = true); + // Read from remote stdout/stderr into 'c_buf_' + std::string Read(); + + void WriteWithStdout(const std::string &buf, bool flush); + std::string ReadWithStdin(); + + DuplexPipe &operator<<(const std::string &buf); + DuplexPipe &operator>>(std::string &buf); + + private: + void SetTimeOut() { alarm_.Set(shared_from_this(), time_out_secs_); } + void CancelTimeOut() { alarm_.Cancel(); } + void TimeOut() { + if (has_time_out_callback_) { + time_out_callback_(); + } + Close(); + DP_EXCEPTION << "Time out when read from pipe"; + } + + // Subprocess id in parent process, + // otherwise zero in child process. + pid_t pid_; + + // Pipe: { Local:fd1_[1] --> Remote:fd1_[0] } + // Remote:fd1_[0] would be redirected by subprocess's stdin. + // Local:fd1_[1] would be used by 'Write()' as output. + int fd1_[2]; + + // Pipe: { Remote:fd2_[1] --> Local:fd2_[0] } + // Remote:fd2_[1] would be redirected by subprocess's stdout. + // Local:fd2_[0] would be used by 'Read()' as input. + int fd2_[2]; + + // // Used and returned by 'Read()'. + // std::string buf_; + char c_buf_[kBufferSize]; + + int local_stdin_; + int local_stdout_; + int local_stderr_; + int remote_stdin_; + int remote_stdout_; + int remote_stderr_; + + class Alarm { + public: + Alarm() = default; + ~Alarm() = default; + + void Set(std::shared_ptr dp, unsigned int interval_secs); + void Cancel(); + + private: + static void SigHandler(int sig) { + DP_INFO << "Signal: " << sig; + dp_->TimeOut(); + } + + inline static std::shared_ptr dp_; + }; + + unsigned int time_out_secs_ = kTimeOutSeconds; + bool has_time_out_callback_ = false; + std::function time_out_callback_; + Alarm alarm_; +}; +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_ diff --git a/mindspore/ccsrc/common/duplex_pipe_win.cc b/mindspore/ccsrc/common/duplex_pipe_win.cc new file mode 100644 index 0000000000..7face96522 --- /dev/null +++ b/mindspore/ccsrc/common/duplex_pipe_win.cc @@ -0,0 +1,48 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/duplex_pipe.h" + +#include +#include + +namespace mindspore { +int DuplexPipe::Open(std::initializer_list arg_list, bool append_fds) { + DP_EXCEPTION << "Not support for Windows by now."; +} + +void DuplexPipe::Write(const std::string &buf, bool flush) { DP_EXCEPTION << "Not support for Windows by now."; } + +std::string DuplexPipe::Read() { DP_EXCEPTION << "Not support for Windows by now."; } + +void DuplexPipe::WriteWithStdout(const std::string &buf, bool flush) { + DP_EXCEPTION << "Not support for Windows by now."; +} + +std::string DuplexPipe::ReadWithStdin() { DP_EXCEPTION << "Not support for Windows by now."; } + +DuplexPipe &DuplexPipe::operator<<(const std::string &buf) { DP_EXCEPTION << "Not support for Windows by now."; } + +DuplexPipe &DuplexPipe::operator>>(std::string &buf) { DP_EXCEPTION << "Not support for Windows by now."; } + +void DuplexPipe::Close() { DP_EXCEPTION << "Not support for Windows by now."; } + +void DuplexPipe::Alarm::Set(std::shared_ptr dp, unsigned int interval_secs) { + DP_EXCEPTION << "Not support for Windows by now."; +} + +void DuplexPipe::Alarm::Cancel() { DP_EXCEPTION << "Not support for Windows by now."; } +} // namespace mindspore diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_device_address.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_device_address.cc index 9d4466c46d..7e42763fe8 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_device_address.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_device_address.cc @@ -376,13 +376,13 @@ kernel::KernelModPtr AscendDeviceAddress::CompileTransDataAndObtainKernelMod(con } while (!build_manager->IsAllTaskFinish()) { int task_id = -1; - char *task_result = nullptr; - char *pre_build_result = nullptr; + std::string task_result; + std::string pre_build_result; auto ret = build_manager->WaitOne(&task_id, &task_result, &pre_build_result); if (!ret) { MS_EXCEPTION(ArgumentError) << "Build Failed. wait one ret:" << ret << ", task id:" << task_id; } - if ((task_result != nullptr) && (strcmp(task_result, "Success") != 0)) { + if (task_result != "Success") { MS_EXCEPTION(ArgumentError) << "task compile Failed, task id:" << task_id << ", cause:" << task_result; } (void)build_manager->TaskFinishProcess(task_id, false); diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc index 32045eb665..75ce377752 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc @@ -37,7 +37,6 @@ #include "backend/session/anf_runtime_algorithm.h" #include "runtime/device/ascend/profiling/profiling_utils.h" #include "backend/kernel_compiler/tbe/tbe_utils.h" -#include "backend/kernel_compiler/tbe/tbe_python_funcs.h" #include "backend/optimizer/mem_reuse/mem_reuse_checker.h" #include "runtime/device/ascend/ascend_memory_manager.h" #include "debug/tensor_load.h" diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index f746b323ea..3cb637b982 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -78,6 +78,7 @@ file(GLOB_RECURSE MINDSPORE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "../../../mindspore/ccsrc/backend/session/kernel_graph.cc" "../../../mindspore/ccsrc/backend/session/session_basic.cc" "../../../mindspore/ccsrc/backend/session/session_factory.cc" + "../../../mindspore/ccsrc/backend/session/kernel_build_client.cc" "../../../mindspore/ccsrc/vm/*.cc" "../../../mindspore/ccsrc/pipeline/pynative/*.cc" "../../../mindspore/ccsrc/pybind_api/*.cc" @@ -124,6 +125,11 @@ file(GLOB_RECURSE MINDSPORE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "../../../mindspore/ccsrc/backend/kernel_compiler/cpu/sparse_apply_proximal_adagrad_cpu_kernel.cc" ) +if (CMAKE_SYSTEM_NAME MATCHES "Windows") + list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/common/duplex_pipe.cc") +else() + list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/common/duplex_pipe_win.cc") +endif() list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/debug/dump_proto.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/core/ir/lite/tensor.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/frontend/parallel/strategy_checkpoint/parallel_strategy_checkpoint.cc")