diff --git a/cmake/external_libs/libevent.cmake b/cmake/external_libs/libevent.cmake index e94df0f94b..dcf58cd625 100644 --- a/cmake/external_libs/libevent.cmake +++ b/cmake/external_libs/libevent.cmake @@ -16,7 +16,7 @@ message("libevent using openssl stub dir: " ${openssl_ROOT}) mindspore_add_pkg(libevent VER 2.1.12 - LIBS event event_pthreads event_core + LIBS event event_pthreads event_core event_openssl URL ${REQ_URL} MD5 ${MD5} CMAKE_OPTION -DCMAKE_BUILD_TYPE:STRING=Release -DBUILD_TESTING=OFF -DOPENSSL_ROOT_DIR:PATH=${openssl_ROOT}) @@ -26,3 +26,4 @@ include_directories(${libevent_INC}) add_library(mindspore::event ALIAS libevent::event) add_library(mindspore::event_pthreads ALIAS libevent::event_pthreads) add_library(mindspore::event_core ALIAS libevent::event_core) +add_library(mindspore::event_openssl ALIAS libevent::event_openssl) diff --git a/mindspore/ccsrc/CMakeLists.txt b/mindspore/ccsrc/CMakeLists.txt index 5474f35064..ac14d65412 100644 --- a/mindspore/ccsrc/CMakeLists.txt +++ b/mindspore/ccsrc/CMakeLists.txt @@ -370,8 +370,9 @@ elseif(CMAKE_SYSTEM_NAME MATCHES "Darwin") target_link_libraries(_c_expression PRIVATE -Wl,-force_load mindspore mindspore_core -Wl,-noall_load) else() if(ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)) + find_package(OpenSSL REQUIRED) target_link_libraries(mindspore proto_input mindspore::protobuf - mindspore::event mindspore::event_pthreads) + mindspore::event mindspore::event_pthreads mindspore::event_openssl OpenSSL::SSL OpenSSL::Crypto) target_link_libraries(mindspore -Wl,--no-as-needed mindspore::event_core ps_cache) if(${ENABLE_IBVERBS} STREQUAL "ON") target_link_libraries(mindspore ibverbs rdmacm) diff --git a/mindspore/ccsrc/ps/CMakeLists.txt b/mindspore/ccsrc/ps/CMakeLists.txt index 6550ebe4e5..26189758da 100644 --- a/mindspore/ccsrc/ps/CMakeLists.txt +++ b/mindspore/ccsrc/ps/CMakeLists.txt @@ -22,7 +22,8 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_client.cc") list(REMOVE_ITEM _PS_SRC_FILES "worker.cc") list(REMOVE_ITEM _PS_SRC_FILES "parameter_server.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/worker_queue.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_request_handler.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/ssl_wrapper.cc") endif() if(NOT ENABLE_D) diff --git a/mindspore/ccsrc/ps/constants.h b/mindspore/ccsrc/ps/constants.h index b6e2824052..1d716334c8 100644 --- a/mindspore/ccsrc/ps/constants.h +++ b/mindspore/ccsrc/ps/constants.h @@ -51,6 +51,9 @@ constexpr char kSparseAdamOp[] = "Adam"; constexpr char kSparseLazyAdamOp[] = "LazyAdam"; constexpr char kSparseFtrlOp[] = "FTRL"; +constexpr char kCertificateChain[] = "server.crt"; +constexpr char kPrivateKey[] = "server.key.unsecure"; + constexpr int64_t kInitWeightsCmd = 10; constexpr int64_t kInitWeightToOptimIdCmd = 11; constexpr int64_t kInitOptimInputsShapeCmd = 12; diff --git a/mindspore/ccsrc/ps/core/communicator/worker_queue.cc b/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc similarity index 65% rename from mindspore/ccsrc/ps/core/communicator/worker_queue.cc rename to mindspore/ccsrc/ps/core/communicator/http_request_handler.cc index 28c1119119..d38d76db69 100644 --- a/mindspore/ccsrc/ps/core/communicator/worker_queue.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc @@ -14,16 +14,39 @@ * limitations under the License. */ -#include "ps/core/communicator/worker_queue.h" +#include "ps/core/communicator/http_request_handler.h" namespace mindspore { namespace ps { namespace core { -bool WorkerQueue::Initialize(int fd, std::unordered_map handlers) { +bool HttpRequestHandler::Initialize(int fd, const std::unordered_map &handlers) { evbase_ = event_base_new(); MS_EXCEPTION_IF_NULL(evbase_); struct evhttp *http = evhttp_new(evbase_); MS_EXCEPTION_IF_NULL(http); + + SSL_CTX_set_options(SSLWrapper::GetInstance().GetSSLCtx(), + SSL_OP_SINGLE_DH_USE | SSL_OP_SINGLE_ECDH_USE | SSL_OP_NO_SSLv2); + EC_KEY *ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1); + MS_EXCEPTION_IF_NULL(ecdh); + + if (!SSL_CTX_use_certificate_chain_file(SSLWrapper::GetInstance().GetSSLCtx(), kCertificateChain)) { + MS_LOG(ERROR) << "SSL use certificate chain file failed!"; + return false; + } + + if (!SSL_CTX_use_PrivateKey_file(SSLWrapper::GetInstance().GetSSLCtx(), kPrivateKey, SSL_FILETYPE_PEM)) { + MS_LOG(ERROR) << "SSL use private key file failed!"; + return false; + } + + if (!SSL_CTX_check_private_key(SSLWrapper::GetInstance().GetSSLCtx())) { + MS_LOG(ERROR) << "SSL check private key file failed!"; + return false; + } + + evhttp_set_bevcb(http, BuffereventCallback, SSLWrapper::GetInstance().GetSSLCtx()); + int result = evhttp_accept_socket(http, fd); if (result < 0) { MS_LOG(ERROR) << "Evhttp accept socket failed!"; @@ -56,7 +79,7 @@ bool WorkerQueue::Initialize(int fd, std::unordered_map(arg); + SSL *ssl = SSL_new(ctx); + bufferevent *bev = bufferevent_openssl_socket_new(base, -1, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE); + return bev; +} } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/worker_queue.h b/mindspore/ccsrc/ps/core/communicator/http_request_handler.h similarity index 57% rename from mindspore/ccsrc/ps/core/communicator/worker_queue.h rename to mindspore/ccsrc/ps/core/communicator/http_request_handler.h index eb25734ea1..d0d53164d1 100644 --- a/mindspore/ccsrc/ps/core/communicator/worker_queue.h +++ b/mindspore/ccsrc/ps/core/communicator/http_request_handler.h @@ -14,12 +14,14 @@ * limitations under the License. */ -#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ -#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_REQUEST_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_REQUEST_HANDLER_H_ #include #include #include +#include +#include #include #include @@ -27,19 +29,26 @@ #include "utils/log_adapter.h" #include "ps/core/communicator/http_message_handler.h" +#include "ps/core/communicator/ssl_wrapper.h" +#include "ps/constants.h" namespace mindspore { namespace ps { namespace core { using OnRequestReceive = std::function)>; -class WorkerQueue { + +/* Each thread corresponds to one HttpRequestHandler, which is used to create one eventbase. All eventbase are listened + * on the same fd. Every evhttp_request is executed in one thread. + */ +class HttpRequestHandler { public: - WorkerQueue() : evbase_(nullptr) {} - virtual ~WorkerQueue() = default; + HttpRequestHandler() : evbase_(nullptr) {} + virtual ~HttpRequestHandler() = default; - bool Initialize(int fd, std::unordered_map handlers); + bool Initialize(int fd, const std::unordered_map &handlers); void Run(); void Stop(); + static bufferevent *BuffereventCallback(event_base *base, void *arg); private: struct event_base *evbase_; @@ -47,4 +56,4 @@ class WorkerQueue { } // namespace core } // namespace ps } // namespace mindspore -#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_WORKER_QUEUE_H_ +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_REQUEST_HANDLER_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/http_server.cc b/mindspore/ccsrc/ps/core/communicator/http_server.cc index c1b0261880..ce26ee11bf 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_server.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_server.cc @@ -118,10 +118,10 @@ bool HttpServer::RegisterRoute(const std::string &url, OnRequestReceive *functio bool HttpServer::Start() { MS_LOG(INFO) << "Start http server!"; for (size_t i = 0; i < thread_num_; i++) { - auto worker_queue = std::make_shared(); - worker_queue->Initialize(fd_, request_handlers_); - worker_queues_.push_back(worker_queue); - worker_threads_.emplace_back(std::make_shared(&WorkerQueue::Run, worker_queue)); + auto http_request_handler = std::make_shared(); + http_request_handler->Initialize(fd_, request_handlers_); + http_request_handlers.push_back(http_request_handler); + worker_threads_.emplace_back(std::make_shared(&HttpRequestHandler::Run, http_request_handler)); } return true; } @@ -139,7 +139,7 @@ void HttpServer::Stop() { if (!is_stop_.load()) { for (size_t i = 0; i < thread_num_; i++) { - worker_queues_[i]->Stop(); + http_request_handlers[i]->Stop(); } is_stop_ = true; } diff --git a/mindspore/ccsrc/ps/core/communicator/http_server.h b/mindspore/ccsrc/ps/core/communicator/http_server.h index cbb03f85a1..1d5ab88d86 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_server.h +++ b/mindspore/ccsrc/ps/core/communicator/http_server.h @@ -38,7 +38,7 @@ #include #include -#include "ps/core/communicator/worker_queue.h" +#include "ps/core/communicator/http_request_handler.h" namespace mindspore { namespace ps { @@ -75,7 +75,7 @@ class HttpServer { int request_timeout_; size_t thread_num_; std::vector> worker_threads_; - std::vector> worker_queues_; + std::vector> http_request_handlers; int32_t backlog_; std::unordered_map request_handlers_; int fd_; diff --git a/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.cc b/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.cc new file mode 100644 index 0000000000..28df9165b6 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.cc @@ -0,0 +1,59 @@ + +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/ssl_wrapper.h" + +namespace mindspore { +namespace ps { +namespace core { +SSLWrapper::SSLWrapper() : ssl_ctx_(nullptr) { InitSSL(); } + +SSLWrapper::~SSLWrapper() { CleanSSL(); } + +void SSLWrapper::InitSSL() { + SSL_library_init(); + ERR_load_crypto_strings(); + SSL_load_error_strings(); + OpenSSL_add_all_algorithms(); + int rand = RAND_poll(); + if (rand == 0) { + MS_LOG(ERROR) << "RAND_poll failed"; + } + ssl_ctx_ = SSL_CTX_new(SSLv23_server_method()); + if (!ssl_ctx_) { + MS_LOG(ERROR) << "SSL_CTX_new failed"; + } + X509_STORE *store = SSL_CTX_get_cert_store(ssl_ctx_); + if (X509_STORE_set_default_paths(store) != 1) { + MS_LOG(ERROR) << "X509_STORE_set_default_paths failed"; + } +} + +void SSLWrapper::CleanSSL() { + if (ssl_ctx_ != nullptr) { + SSL_CTX_free(ssl_ctx_); + } + ERR_free_strings(); + EVP_cleanup(); + ERR_remove_thread_state(nullptr); + CRYPTO_cleanup_all_ex_data(); +} + +SSL_CTX *SSLWrapper::GetSSLCtx() { return ssl_ctx_; } +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.h b/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.h new file mode 100644 index 0000000000..147b4aff08 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/ssl_wrapper.h @@ -0,0 +1,51 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_SSL_WRAPPER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_SSL_WRAPPER_H_ + +#include +#include +#include + +#include "utils/log_adapter.h" + +namespace mindspore { +namespace ps { +namespace core { +class SSLWrapper { + public: + static SSLWrapper &GetInstance() { + static SSLWrapper instance; + return instance; + } + SSL_CTX *GetSSLCtx(); + + private: + SSLWrapper(); + virtual ~SSLWrapper(); + SSLWrapper(const SSLWrapper &) = delete; + SSLWrapper &operator=(const SSLWrapper &) = delete; + + void InitSSL(); + void CleanSSL(); + + SSL_CTX *ssl_ctx_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_SSL_WRAPPER_H_ diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index 27aa68c49f..11d017da30 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -161,6 +161,8 @@ list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/scheduler.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info_builder.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/worker.cc") +list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_request_handler.cc") +list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_server.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/parameter_server.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/gpu/gpu_ps_cache.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/ascend/ascend_ps_cache.cc")