From 425e5cae49c54f9403d644749dc62ca311efae9a Mon Sep 17 00:00:00 2001 From: chendongsheng Date: Mon, 22 Feb 2021 16:10:50 +0800 Subject: [PATCH] added log callback --- mindspore/ccsrc/ps/CMakeLists.txt | 2 +- mindspore/ccsrc/ps/core/abstract_node.cc | 29 +++---- mindspore/ccsrc/ps/core/abstract_node.h | 2 +- mindspore/ccsrc/ps/core/cluster_config.cc | 85 ------------------- mindspore/ccsrc/ps/core/cluster_config.h | 65 -------------- mindspore/ccsrc/ps/core/cluster_metadata.cc | 82 ++++++++++++++++++ mindspore/ccsrc/ps/core/cluster_metadata.h | 84 ++++++++++++++++++ mindspore/ccsrc/ps/core/comm_util.cc | 24 +++++- mindspore/ccsrc/ps/core/comm_util.h | 4 +- mindspore/ccsrc/ps/core/node.h | 5 +- mindspore/ccsrc/ps/core/node_manager.cc | 8 +- mindspore/ccsrc/ps/core/scheduler_node.cc | 17 ++-- mindspore/ccsrc/ps/core/scheduler_node.h | 4 +- mindspore/ccsrc/ps/core/server_node.cc | 6 ++ mindspore/ccsrc/ps/core/server_node.h | 4 +- mindspore/ccsrc/ps/core/tcp_client.cc | 15 ++-- mindspore/ccsrc/ps/core/tcp_client.h | 4 +- mindspore/ccsrc/ps/core/tcp_server.cc | 27 +++++- mindspore/ccsrc/ps/core/tcp_server.h | 3 +- mindspore/ccsrc/ps/core/worker_node.h | 4 +- .../ps/core/cluster_available_timeout_test.cc | 4 +- ...onfig_test.cc => cluster_metadata_test.cc} | 22 ++--- tests/ut/cpp/ps/core/common_util_test.cc | 2 +- tests/ut/cpp/ps/core/http_client_test.cc | 2 + tests/ut/cpp/ps/core/http_server_test.cc | 5 +- 25 files changed, 292 insertions(+), 217 deletions(-) delete mode 100644 mindspore/ccsrc/ps/core/cluster_config.cc delete mode 100644 mindspore/ccsrc/ps/core/cluster_config.h create mode 100644 mindspore/ccsrc/ps/core/cluster_metadata.cc create mode 100644 mindspore/ccsrc/ps/core/cluster_metadata.h rename tests/ut/cpp/ps/core/{cluster_config_test.cc => cluster_metadata_test.cc} (56%) diff --git a/mindspore/ccsrc/ps/CMakeLists.txt b/mindspore/ccsrc/ps/CMakeLists.txt index 29f1e8cee2..b9c173f645 100644 --- a/mindspore/ccsrc/ps/CMakeLists.txt +++ b/mindspore/ccsrc/ps/CMakeLists.txt @@ -12,7 +12,7 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_client.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_message_handler.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_server.cc") - list(REMOVE_ITEM _PS_SRC_FILES "core/cluster_config.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/cluster_metadata.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc") list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc") diff --git a/mindspore/ccsrc/ps/core/abstract_node.cc b/mindspore/ccsrc/ps/core/abstract_node.cc index fe608f88e8..39e177f2f2 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.cc +++ b/mindspore/ccsrc/ps/core/abstract_node.cc @@ -107,8 +107,8 @@ bool AbstractNode::Send(const NodeRole &node_role, const std::vector & const uint32_t &timeout) { uint64_t request_id = AddMessageTrack(data.size()); - if (rank_ids.size() != data.size()) { - MS_LOG(EXCEPTION) << "The number of rank ids is not equal to the number of data!"; + if (rank_ids.size() != data.size() || rank_ids.size() != lens.size()) { + MS_LOG(EXCEPTION) << "The number of rank ids, data and lens are not equal!"; } for (size_t it = 0; it < rank_ids.size(); ++it) { if (!CommUtil::ValidateRankId(node_role, rank_ids.at(it))) { @@ -235,10 +235,8 @@ uint64_t AbstractNode::CollectiveSendAsync(const enum NodeRole &node_role, const } std::pair AbstractNode::CollectiveReceiveAsync(const enum NodeRole &node_role, - const uint32_t &rank_id, void **output, - size_t *size) { + const uint32_t &rank_id, VectorPtr *output) { MS_EXCEPTION_IF_NULL(output); - MS_EXCEPTION_IF_NULL(size); if (!CommUtil::ValidateRankId(node_role, rank_id)) { MS_LOG(EXCEPTION) << "The node role or rank_id is illegal!"; } @@ -248,8 +246,7 @@ std::pair AbstractNode::CollectiveReceiveAsync(const enum No receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = false; if (received_data_.count(std::make_pair(rank_id, rank_request_id)) > 0) { auto res = received_data_[std::make_pair(rank_id, rank_request_id)]; - *output = res->data(); - *size = res->size(); + *output = res; received_data_.erase(std::make_pair(rank_id, rank_request_id)); receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true; MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id; @@ -257,8 +254,7 @@ std::pair AbstractNode::CollectiveReceiveAsync(const enum No receive_callbacks_[std::make_pair(rank_id, rank_request_id)] = [=]() mutable { receive_callbacks_mutex_.lock(); auto res = received_data_[std::make_pair(rank_id, rank_request_id)]; - *output = res->data(); - *size = res->size(); + *output = res; received_data_.erase(std::make_pair(rank_id, rank_request_id)); receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true; MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id; @@ -295,7 +291,7 @@ void AbstractNode::StartHeartbeatTimer(const std::shared_ptr &client) } else { UpdateSchedulerTime(); } - std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval())); + std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval())); } }); } @@ -327,7 +323,7 @@ void AbstractNode::UpdateSchedulerTime() { bool AbstractNode::CheckSchedulerTimeout() const { struct timeval current_time {}; (void)gettimeofday(¤t_time, nullptr); - if (scheduler_time_.tv_sec + ClusterConfig::scheduler_timeout() < current_time.tv_sec) { + if (scheduler_time_.tv_sec + ClusterMetadata::instance()->scheduler_timeout() < current_time.tv_sec) { return true; } return false; @@ -414,8 +410,8 @@ bool AbstractNode::WaitForDisconnect(const uint32_t &timeout) { } bool AbstractNode::InitClientToScheduler() { - std::string scheduler_host = ClusterConfig::scheduler_host(); - uint16_t scheduler_port = ClusterConfig::scheduler_port(); + std::string scheduler_host = ClusterMetadata::instance()->scheduler_host(); + uint16_t scheduler_port = ClusterMetadata::instance()->scheduler_port(); client_to_scheduler_ = std::make_shared(scheduler_host, scheduler_port); client_to_scheduler_->SetMessageCallback( [&](std::shared_ptr meta, const Protos &protos, const void *data, size_t size) { @@ -436,7 +432,7 @@ bool AbstractNode::InitClientToScheduler() { }); client_to_scheduler_->set_disconnected_callback([&]() { - std::this_thread::sleep_for(std::chrono::milliseconds(ClusterConfig::connect_interval())); + std::this_thread::sleep_for(std::chrono::milliseconds(ClusterMetadata::instance()->connect_interval())); client_to_scheduler_->Init(); }); return client_to_scheduler_->WaitConnected(); @@ -507,8 +503,7 @@ bool AbstractNode::SendMessageSync(const std::shared_ptr &client, std client->SendMessage(meta, protos, data, size); MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) << ", the node id is:" << node_info_.node_id_ << " send the request id is:" << request_id; - bool res = Wait(request_id, timeout); - return res; + return Wait(request_id, timeout); } void AbstractNode::ProcessSendDataResp(std::shared_ptr meta, const Protos &protos, const void *data, @@ -589,7 +584,7 @@ void AbstractNode::RunReceiveCallback(std::shared_ptr meta, const P } received_data_[std::make_pair(rank_id, rank_request_id)] = received_data; MS_LOG(DEBUG) << "Run Receive data callback,the rank id:" << rank_id << ", the rank request id is:" << rank_request_id - << ", the send request id is:" << meta->request_id(); + << ", the send request id is:" << meta->request_id() << " the size is:" << size; auto it = receive_callbacks_.find(std::make_pair(rank_id, rank_request_id)); if (it != receive_callbacks_.end()) { receive_callbacks_mutex_.unlock(); diff --git a/mindspore/ccsrc/ps/core/abstract_node.h b/mindspore/ccsrc/ps/core/abstract_node.h index 3fd0271de6..a4503829bd 100644 --- a/mindspore/ccsrc/ps/core/abstract_node.h +++ b/mindspore/ccsrc/ps/core/abstract_node.h @@ -57,7 +57,7 @@ class AbstractNode : public Node { uint64_t CollectiveSendAsync(const enum NodeRole &node_role, const uint32_t &rank_id, const void *data, size_t size); std::pair CollectiveReceiveAsync(const enum NodeRole &node_role, const uint32_t &rank_id, - void **output, size_t *size); + VectorPtr *output); bool CollectiveWait(std::pair request_id, const uint32_t &timeout = kCommTimeoutInSeconds); protected: diff --git a/mindspore/ccsrc/ps/core/cluster_config.cc b/mindspore/ccsrc/ps/core/cluster_config.cc deleted file mode 100644 index 33bd658c5e..0000000000 --- a/mindspore/ccsrc/ps/core/cluster_config.cc +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ps/core/cluster_config.h" - -#include - -namespace mindspore { -namespace ps { -namespace core { -uint32_t ClusterConfig::worker_num_ = 0; -uint32_t ClusterConfig::server_num_ = 0; -std::unique_ptr ClusterConfig::scheduler_host_ = nullptr; -uint16_t ClusterConfig::scheduler_port_ = 0; -// The interval for sending heartbeat packets between worker node,server node and scheduler node is 3 seconds. -uint32_t ClusterConfig::heartbeat_interval_ = 3; -// The timeout for worker node and server node sending heartbeat packets to scheduler node is 30 seconds. -uint32_t ClusterConfig::heartbeat_timeout_ = 30; -// Timeout period for cluster preparation is 300 seconds. -uint32_t ClusterConfig::cluster_available_timeout_ = 300; -// The timeout period for the client to connect to the server is 100ms. -uint32_t ClusterConfig::connect_interval_ = 100; -// When the scheduler exits, the worker and server can continue to work for 5 hours -uint32_t ClusterConfig::scheduler_timeout_ = 3600 * 5; - -void ClusterConfig::Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host, - const uint16_t &scheduler_port) { - worker_num_ = worker_num; - server_num_ = server_num; - if (!CommUtil::CheckIp(scheduler_host)) { - MS_LOG(EXCEPTION) << "The scheduler_host:" << scheduler_host << " is illegal!"; - } - scheduler_host_ = std::make_unique(scheduler_host); - scheduler_port_ = scheduler_port; -} - -uint32_t ClusterConfig::worker_num() { return worker_num_; } - -uint32_t ClusterConfig::server_num() { return server_num_; } - -uint32_t ClusterConfig::heartbeat_interval() { return heartbeat_interval_; } - -void ClusterConfig::set_heartbeat_interval(const uint32_t &heartbeat_interval) { - heartbeat_interval_ = heartbeat_interval; -} - -std::string ClusterConfig::scheduler_host() { return *scheduler_host_; } - -uint16_t ClusterConfig::scheduler_port() { return scheduler_port_; } - -uint32_t ClusterConfig::heartbeat_timeout() { return heartbeat_timeout_; } - -void ClusterConfig::set_heartbeat_timeout(const uint32_t &heartbeat_timeout) { - heartbeat_interval_ = heartbeat_timeout; -} - -uint32_t ClusterConfig::cluster_available_timeout() { return cluster_available_timeout_; } - -void ClusterConfig::set_cluster_available_timeout(const uint32_t &cluster_available_timeout) { - cluster_available_timeout_ = cluster_available_timeout; -} - -uint32_t ClusterConfig::connect_interval() { return connect_interval_; } - -void ClusterConfig::set_connect_interval(const uint32_t &connect_interval) { connect_interval_ = connect_interval; } - -uint32_t ClusterConfig::scheduler_timeout() { return scheduler_timeout_; } - -void ClusterConfig::set_scheduler_timeout(const uint32_t &scheduler_timeout) { scheduler_timeout_ = scheduler_timeout; } -} // namespace core -} // namespace ps -} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/cluster_config.h b/mindspore/ccsrc/ps/core/cluster_config.h deleted file mode 100644 index c13c6d0192..0000000000 --- a/mindspore/ccsrc/ps/core/cluster_config.h +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright 2020 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_ -#define MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_ - -#include -#include -#include -#include - -#include "utils/log_adapter.h" -#include "ps/core/comm_util.h" - -namespace mindspore { -namespace ps { -namespace core { -class ClusterConfig { - public: - static void Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host, - const uint16_t &scheduler_port); - static uint32_t worker_num(); - static uint32_t server_num(); - static uint32_t heartbeat_interval(); - static void set_heartbeat_interval(const uint32_t &heartbeat_interval); - static std::string scheduler_host(); - static uint16_t scheduler_port(); - static uint32_t heartbeat_timeout(); - static void set_heartbeat_timeout(const uint32_t &heartbeat_timeout); - static uint32_t cluster_available_timeout(); - static void set_cluster_available_timeout(const uint32_t &cluster_available_timeout); - static uint32_t connect_interval(); - static void set_connect_interval(const uint32_t &connect_interval); - static uint32_t scheduler_timeout(); - static void set_scheduler_timeout(const uint32_t &scheduler_timeout); - - private: - static uint32_t worker_num_; - static uint32_t server_num_; - static uint32_t heartbeat_interval_; - static std::unique_ptr scheduler_host_; - static uint16_t scheduler_port_; - static uint32_t heartbeat_timeout_; - static uint32_t cluster_available_timeout_; - static uint32_t connect_interval_; - static uint32_t scheduler_timeout_; -}; -} // namespace core -} // namespace ps -} // namespace mindspore - -#endif // MINDSPORE_CCSRC_PS_CORE_CLUSTER_CONFIG_H_ diff --git a/mindspore/ccsrc/ps/core/cluster_metadata.cc b/mindspore/ccsrc/ps/core/cluster_metadata.cc new file mode 100644 index 0000000000..ccbe6b898c --- /dev/null +++ b/mindspore/ccsrc/ps/core/cluster_metadata.cc @@ -0,0 +1,82 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/cluster_metadata.h" +#include + +namespace mindspore { +namespace ps { +namespace core { +std::shared_ptr ClusterMetadata::instance() { + static std::shared_ptr metadata_instance = nullptr; + if (metadata_instance == nullptr) { + metadata_instance.reset(new (std::nothrow) ClusterMetadata()); + } + return metadata_instance; +} + +void ClusterMetadata::Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host, + const uint16_t &scheduler_port) { + worker_num_ = worker_num; + server_num_ = server_num; + if (!CommUtil::CheckIp(scheduler_host)) { + MS_LOG(EXCEPTION) << "The scheduler_host:" << scheduler_host << " is illegal!"; + } + scheduler_host_ = std::make_unique(scheduler_host); + scheduler_port_ = scheduler_port; +} + +uint32_t ClusterMetadata::worker_num() { return worker_num_; } + +uint32_t ClusterMetadata::server_num() { return server_num_; } + +uint32_t ClusterMetadata::heartbeat_interval() { return heartbeat_interval_; } + +void ClusterMetadata::set_heartbeat_interval(const uint32_t &heartbeat_interval) { + heartbeat_interval_ = heartbeat_interval; +} + +std::string ClusterMetadata::scheduler_host() { + MS_EXCEPTION_IF_NULL(scheduler_host_); + return *scheduler_host_; +} + +uint16_t ClusterMetadata::scheduler_port() { return scheduler_port_; } + +uint32_t ClusterMetadata::heartbeat_timeout() { return heartbeat_timeout_; } + +void ClusterMetadata::set_heartbeat_timeout(const uint32_t &heartbeat_timeout) { + heartbeat_interval_ = heartbeat_timeout; +} + +uint32_t ClusterMetadata::cluster_available_timeout() { return cluster_available_timeout_; } + +void ClusterMetadata::set_cluster_available_timeout(const uint32_t &cluster_available_timeout) { + cluster_available_timeout_ = cluster_available_timeout; +} + +uint32_t ClusterMetadata::connect_interval() { return connect_interval_; } + +void ClusterMetadata::set_connect_interval(const uint32_t &connect_interval) { connect_interval_ = connect_interval; } + +uint32_t ClusterMetadata::scheduler_timeout() { return scheduler_timeout_; } + +void ClusterMetadata::set_scheduler_timeout(const uint32_t &scheduler_timeout) { + scheduler_timeout_ = scheduler_timeout; +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/cluster_metadata.h b/mindspore/ccsrc/ps/core/cluster_metadata.h new file mode 100644 index 0000000000..47b7651b01 --- /dev/null +++ b/mindspore/ccsrc/ps/core/cluster_metadata.h @@ -0,0 +1,84 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_ +#define MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_ + +#include +#include +#include +#include + +#include "utils/log_adapter.h" +#include "ps/core/comm_util.h" + +namespace mindspore { +namespace ps { +namespace core { +class ClusterMetadata { + public: + ~ClusterMetadata() = default; + ClusterMetadata(ClusterMetadata const &) = delete; + ClusterMetadata &operator=(const ClusterMetadata &) = delete; + static std::shared_ptr instance(); + + void Init(const uint32_t &worker_num, const uint32_t &server_num, std::string scheduler_host, + const uint16_t &scheduler_port); + uint32_t worker_num(); + uint32_t server_num(); + uint32_t heartbeat_interval(); + void set_heartbeat_interval(const uint32_t &heartbeat_interval); + std::string scheduler_host(); + uint16_t scheduler_port(); + uint32_t heartbeat_timeout(); + void set_heartbeat_timeout(const uint32_t &heartbeat_timeout); + uint32_t cluster_available_timeout(); + void set_cluster_available_timeout(const uint32_t &cluster_available_timeout); + uint32_t connect_interval(); + void set_connect_interval(const uint32_t &connect_interval); + uint32_t scheduler_timeout(); + void set_scheduler_timeout(const uint32_t &scheduler_timeout); + + private: + ClusterMetadata() + : worker_num_(0), + server_num_(0), + heartbeat_interval_(3), + scheduler_host_(nullptr), + scheduler_port_(0), + heartbeat_timeout_(30), + cluster_available_timeout_(300), + connect_interval_(100), + scheduler_timeout_(3600 * 5) {} + uint32_t worker_num_; + uint32_t server_num_; + // The interval for sending heartbeat packets between worker node,server node and scheduler node is 3 seconds. + uint32_t heartbeat_interval_; + std::unique_ptr scheduler_host_; + uint16_t scheduler_port_; + // The timeout for worker node and server node sending heartbeat packets to scheduler node is 30 seconds. + uint32_t heartbeat_timeout_; + // Timeout period for cluster preparation is 300 seconds. + uint32_t cluster_available_timeout_; + // The timeout period for the client to connect to the server is 100ms. + uint32_t connect_interval_; + // When the scheduler exits, the worker and server can continue to work for 5 hours + uint32_t scheduler_timeout_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_CLUSTER_METADATA_H_ diff --git a/mindspore/ccsrc/ps/core/comm_util.cc b/mindspore/ccsrc/ps/core/comm_util.cc index dff9c28d3a..aeac337316 100644 --- a/mindspore/ccsrc/ps/core/comm_util.cc +++ b/mindspore/ccsrc/ps/core/comm_util.cc @@ -122,9 +122,9 @@ std::string CommUtil::NodeRoleToString(const NodeRole &role) { } } bool CommUtil::ValidateRankId(const enum NodeRole &node_role, const uint32_t &rank_id) { - if (node_role == NodeRole::SERVER && (rank_id > ClusterConfig::server_num() - 1)) { + if (node_role == NodeRole::SERVER && (rank_id > ClusterMetadata::instance()->server_num() - 1)) { return false; - } else if (node_role == NodeRole::WORKER && (rank_id > ClusterConfig::worker_num() - 1)) { + } else if (node_role == NodeRole::WORKER && (rank_id > ClusterMetadata::instance()->worker_num() - 1)) { return false; } return true; @@ -139,6 +139,26 @@ bool CommUtil::Retry(const std::function &func, size_t max_attempts, siz } return false; } + +void CommUtil::LogCallback(int severity, const char *msg) { + switch (severity) { + case EVENT_LOG_DEBUG: + MS_LOG(DEBUG) << kLibeventLogPrefix << msg; + break; + case EVENT_LOG_MSG: + MS_LOG(INFO) << kLibeventLogPrefix << msg; + break; + case EVENT_LOG_WARN: + MS_LOG(WARNING) << kLibeventLogPrefix << msg; + break; + case EVENT_LOG_ERR: + MS_LOG(ERROR) << kLibeventLogPrefix << msg; + break; + default: + MS_LOG(WARNING) << kLibeventLogPrefix << msg; + break; + } +} } // namespace core } // namespace ps } // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/comm_util.h b/mindspore/ccsrc/ps/core/comm_util.h index 8ba8efd3a2..2a7e6feb22 100644 --- a/mindspore/ccsrc/ps/core/comm_util.h +++ b/mindspore/ccsrc/ps/core/comm_util.h @@ -49,7 +49,7 @@ #include "proto/comm.pb.h" #include "proto/ps.pb.h" -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "utils/log_adapter.h" namespace mindspore { @@ -65,6 +65,7 @@ constexpr int kGroup5RandomLength = 12; constexpr int kMessageChunkLength = 4096; // The timeout period for the http client to connect to the http server is 120 seconds. constexpr int kConnectionTimeout = 120; +constexpr char kLibeventLogPrefix[] = "[libevent log]:"; class CommUtil { public: @@ -75,6 +76,7 @@ class CommUtil { static std::string NodeRoleToString(const NodeRole &role); static bool ValidateRankId(const enum NodeRole &node_role, const uint32_t &rank_id); static bool Retry(const std::function &func, size_t max_attempts, size_t interval_milliseconds); + static void LogCallback(int severity, const char *msg); private: static std::random_device rd; diff --git a/mindspore/ccsrc/ps/core/node.h b/mindspore/ccsrc/ps/core/node.h index 8a9216c658..8b558ad648 100644 --- a/mindspore/ccsrc/ps/core/node.h +++ b/mindspore/ccsrc/ps/core/node.h @@ -30,11 +30,10 @@ #include #include -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "ps/core/node_info.h" #include "ps/core/tcp_client.h" #include "ps/core/tcp_server.h" -#include "utils/log_adapter.h" namespace mindspore { namespace ps { @@ -55,7 +54,7 @@ class Node { using OnNodeEventMessage = std::function; using MessageCallback = std::function; - virtual bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) = 0; + virtual bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) = 0; virtual bool Stop() = 0; virtual bool Finish(const uint32_t &timeout = kTimeoutInSeconds) = 0; diff --git a/mindspore/ccsrc/ps/core/node_manager.cc b/mindspore/ccsrc/ps/core/node_manager.cc index d6eac42cc3..57b45ccebd 100644 --- a/mindspore/ccsrc/ps/core/node_manager.cc +++ b/mindspore/ccsrc/ps/core/node_manager.cc @@ -19,7 +19,9 @@ namespace mindspore { namespace ps { namespace core { -void NodeManager::InitNodeNum() { total_node_num_ = ClusterConfig::server_num() + ClusterConfig::worker_num(); } +void NodeManager::InitNodeNum() { + total_node_num_ = ClusterMetadata::instance()->server_num() + ClusterMetadata::instance()->worker_num(); +} int NodeManager::NextRankId(const RegisterMessage ®ister_message) { std::lock_guard lock(assign_rank_id_mutex_); @@ -92,7 +94,7 @@ void NodeManager::UpdateClusterState() { (void)gettimeofday(¤t_time, nullptr); timeout_nodes_info_.clear(); for (auto it = heartbeats_.begin(); it != heartbeats_.end(); ++it) { - if (it->second.tv_sec + ClusterConfig::heartbeat_timeout() < current_time.tv_sec) { + if (it->second.tv_sec + ClusterMetadata::instance()->heartbeat_timeout() < current_time.tv_sec) { MS_LOG(ERROR) << "The node id:" << it->first << " is timeout!"; timeout_nodes_info_[it->first] = nodes_info_[it->first]; } @@ -118,7 +120,7 @@ void NodeManager::UpdateClusterState() { void NodeManager::CheckClusterTimeout() { if (total_node_num_ != nodes_info_.size()) { - MS_LOG(WARNING) << "The cluster is not ready after " << ClusterConfig::cluster_available_timeout() + MS_LOG(WARNING) << "The cluster is not ready after " << ClusterMetadata::instance()->cluster_available_timeout() << " seconds,so finish the cluster, and change total node number from " << total_node_num_ << " to " << nodes_info_.size(); current_node_num_ = nodes_info_.size(); diff --git a/mindspore/ccsrc/ps/core/scheduler_node.cc b/mindspore/ccsrc/ps/core/scheduler_node.cc index 3561487b63..40dcc9eda0 100644 --- a/mindspore/ccsrc/ps/core/scheduler_node.cc +++ b/mindspore/ccsrc/ps/core/scheduler_node.cc @@ -88,8 +88,8 @@ void SchedulerNode::InitCommandHandler() { void SchedulerNode::CreateTcpServer() { node_manager_.InitNodeNum(); - std::string scheduler_host = ClusterConfig::scheduler_host(); - uint32_t scheduler_port = ClusterConfig::scheduler_port(); + std::string scheduler_host = ClusterMetadata::instance()->scheduler_host(); + uint32_t scheduler_port = ClusterMetadata::instance()->scheduler_port(); server_ = std::make_shared(scheduler_host, scheduler_port); server_->SetMessageCallback([&](std::shared_ptr conn, std::shared_ptr meta, const Protos &protos, const void *data, size_t size) { @@ -149,6 +149,10 @@ void SchedulerNode::ProcessFinish(std::shared_ptr server, std::shared void SchedulerNode::ProcessFetchServers(std::shared_ptr server, std::shared_ptr conn, std::shared_ptr meta, const void *data, size_t size) { + MS_EXCEPTION_IF_NULL(server); + MS_EXCEPTION_IF_NULL(conn); + MS_EXCEPTION_IF_NULL(meta); + MS_EXCEPTION_IF_NULL(data); FetchServersRespMessage fetch_servers_message; std::vector servers_meta_list = node_manager_.FetchServersMeta(); @@ -164,20 +168,21 @@ void SchedulerNode::StartUpdateClusterStateTimer() { auto start_time = std::chrono::steady_clock::now(); while (!is_finish_.load()) { // 1. update cluster timeout - if (!node_manager_.is_cluster_ready() && (std::chrono::steady_clock::now() - start_time > - std::chrono::seconds(ClusterConfig::cluster_available_timeout()))) { + if (!node_manager_.is_cluster_ready() && + (std::chrono::steady_clock::now() - start_time > + std::chrono::seconds(ClusterMetadata::instance()->cluster_available_timeout()))) { node_manager_.CheckClusterTimeout(); } // 2. update cluster state - std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval())); + std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval())); node_manager_.UpdateClusterState(); if (node_manager_.is_cluster_ready()) { is_ready_ = true; wait_start_cond_.notify_all(); } if (node_manager_.is_cluster_finish()) { - std::this_thread::sleep_for(std::chrono::seconds(ClusterConfig::heartbeat_interval() * 2)); + std::this_thread::sleep_for(std::chrono::seconds(ClusterMetadata::instance()->heartbeat_interval() * 2)); is_finish_ = true; wait_finish_cond_.notify_all(); } diff --git a/mindspore/ccsrc/ps/core/scheduler_node.h b/mindspore/ccsrc/ps/core/scheduler_node.h index ebbdaf52e3..c3d6e4dfb8 100644 --- a/mindspore/ccsrc/ps/core/scheduler_node.h +++ b/mindspore/ccsrc/ps/core/scheduler_node.h @@ -27,7 +27,7 @@ #include #include -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "ps/core/tcp_client.h" #include "ps/core/tcp_server.h" #include "ps/core/node_manager.h" @@ -45,7 +45,7 @@ class SchedulerNode : public Node { typedef void (SchedulerNode::*ResponseHandler)(std::shared_ptr server, std::shared_ptr conn, std::shared_ptr meta, const void *data, size_t size); - bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override; + bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; bool Stop() override; bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override; diff --git a/mindspore/ccsrc/ps/core/server_node.cc b/mindspore/ccsrc/ps/core/server_node.cc index 2ba7ca794b..62b54ad840 100644 --- a/mindspore/ccsrc/ps/core/server_node.cc +++ b/mindspore/ccsrc/ps/core/server_node.cc @@ -110,6 +110,12 @@ void ServerNode::ProcessSendData(std::shared_ptr conn, std::share if (ret != 0) { MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; } + MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) + << ", the node id is:" << node_info_.node_id_ << " send the request id is:" << meta->request_id() + << " the current time is:" + << std::chrono::time_point_cast(std::chrono::high_resolution_clock::now()) + .time_since_epoch() + .count(); request_handler_(conn, meta, res, size); } diff --git a/mindspore/ccsrc/ps/core/server_node.h b/mindspore/ccsrc/ps/core/server_node.h index df109e2a5c..5515b5f611 100644 --- a/mindspore/ccsrc/ps/core/server_node.h +++ b/mindspore/ccsrc/ps/core/server_node.h @@ -25,7 +25,7 @@ #include #include -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "ps/core/tcp_client.h" #include "ps/core/tcp_server.h" #include "ps/core/abstract_node.h" @@ -38,7 +38,7 @@ class ServerNode : public AbstractNode { ServerNode() : server_(nullptr), server_thread_(nullptr) {} ~ServerNode() override; - bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override; + bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; bool Stop() override; bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override; diff --git a/mindspore/ccsrc/ps/core/tcp_client.cc b/mindspore/ccsrc/ps/core/tcp_client.cc index d0d29121dd..5fb559347b 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.cc +++ b/mindspore/ccsrc/ps/core/tcp_client.cc @@ -88,6 +88,8 @@ void TcpClient::Init() { MS_LOG(EXCEPTION) << "The tcp client ip:" << server_address_ << " is illegal!"; } + event_enable_debug_logging(EVENT_DBG_ALL); + event_set_log_callback(CommUtil::LogCallback); int result = evthread_use_pthreads(); if (result != 0) { MS_LOG(EXCEPTION) << "Use event pthread failed!"; @@ -173,16 +175,11 @@ void TcpClient::ReadCallback(struct bufferevent *bev, void *ctx) { MS_EXCEPTION_IF_NULL(bev); MS_EXCEPTION_IF_NULL(ctx); auto tcp_client = reinterpret_cast(ctx); - struct evbuffer *input = bufferevent_get_input(const_cast(bev)); - MS_EXCEPTION_IF_NULL(input); char read_buffer[kMessageChunkLength]; + int read = 0; - while (EVBUFFER_LENGTH(input) > 0) { - int read = evbuffer_remove(input, &read_buffer, sizeof(read_buffer)); - if (read == -1) { - MS_LOG(EXCEPTION) << "Can not drain data from the event buffer!"; - } + while ((read = bufferevent_read(bev, &read_buffer, sizeof(read_buffer))) > 0) { tcp_client->OnReadHandler(read_buffer, read); } } @@ -312,6 +309,10 @@ bool TcpClient::SendMessage(std::shared_ptr meta, const Protos &pro MS_LOG(ERROR) << "Event buffer add protobuf data failed!"; res = false; } + int result = bufferevent_flush(buffer_event_, EV_READ | EV_WRITE, BEV_FLUSH); + if (result < 0) { + MS_LOG(EXCEPTION) << "Bufferevent flush failed!"; + } bufferevent_unlock(buffer_event_); return res; } diff --git a/mindspore/ccsrc/ps/core/tcp_client.h b/mindspore/ccsrc/ps/core/tcp_client.h index fa0ef8df15..ba22e15a86 100644 --- a/mindspore/ccsrc/ps/core/tcp_client.h +++ b/mindspore/ccsrc/ps/core/tcp_client.h @@ -32,7 +32,7 @@ #include #include -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "utils/convert_utils_base.h" namespace mindspore { @@ -53,7 +53,7 @@ class TcpClient { std::string GetServerAddress() const; void set_disconnected_callback(const OnDisconnected &disconnected); void set_connected_callback(const OnConnected &connected); - bool WaitConnected(const uint32_t &connected_timeout = ClusterConfig::cluster_available_timeout()); + bool WaitConnected(const uint32_t &connected_timeout = ClusterMetadata::instance()->cluster_available_timeout()); void Init(); void StartWithDelay(int seconds); void Stop(); diff --git a/mindspore/ccsrc/ps/core/tcp_server.cc b/mindspore/ccsrc/ps/core/tcp_server.cc index e802ade90c..f16c2c5bec 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.cc +++ b/mindspore/ccsrc/ps/core/tcp_server.cc @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -90,7 +92,15 @@ bool TcpConnection::SendMessage(std::shared_ptr meta, const Protos MS_LOG(ERROR) << "Event buffer add protobuf data failed!"; res = false; } + int result = bufferevent_flush(buffer_event_, EV_READ | EV_WRITE, BEV_FLUSH); + if (result < 0) { + MS_LOG(EXCEPTION) << "Bufferevent flush failed!"; + } bufferevent_unlock(buffer_event_); + MS_LOG(DEBUG) << "SendMessage the request id is:" << meta->request_id() << " the current time is:" + << std::chrono::time_point_cast(std::chrono::high_resolution_clock::now()) + .time_since_epoch() + .count(); return res; } @@ -136,6 +146,8 @@ void TcpServer::Init() { MS_LOG(EXCEPTION) << "Use event pthread failed!"; } + event_enable_debug_logging(EVENT_DBG_ALL); + event_set_log_callback(CommUtil::LogCallback); is_stop_ = false; base_ = event_base_new(); MS_EXCEPTION_IF_NULL(base_); @@ -284,7 +296,7 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st std::shared_ptr conn = server->onCreateConnection(bev, fd); MS_EXCEPTION_IF_NULL(conn); - + SetTcpNoDelay(fd); server->AddConnection(fd, conn); conn->InitConnection([=](std::shared_ptr meta, const Protos &protos, const void *data, size_t size) { OnServerReceiveMessage on_server_receive = server->GetServerReceive(); @@ -337,6 +349,11 @@ void TcpServer::ReadCallback(struct bufferevent *bev, void *connection) { MS_LOG(EXCEPTION) << "Can not drain data from the event buffer!"; } conn->OnReadHandler(read_buffer, IntToSize(read)); + MS_LOG(DEBUG) << "the current time is:" + << std::chrono::time_point_cast(std::chrono::high_resolution_clock::now()) + .time_since_epoch() + .count() + << " the read size is:" << read; } } @@ -388,6 +405,14 @@ void TcpServer::TimerOnceCallback(evutil_socket_t, int16_t, void *arg) { } } +void TcpServer::SetTcpNoDelay(const evutil_socket_t &fd) { + const int one = 1; + int ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(int)); + if (ret < 0) { + MS_LOG(EXCEPTION) << "Set socket no delay failed!"; + } +} + bool TcpServer::SendMessage(std::shared_ptr conn, std::shared_ptr message) { MS_EXCEPTION_IF_NULL(conn); MS_EXCEPTION_IF_NULL(message); diff --git a/mindspore/ccsrc/ps/core/tcp_server.h b/mindspore/ccsrc/ps/core/tcp_server.h index 9ec5d0f2ba..3c38a6dd40 100644 --- a/mindspore/ccsrc/ps/core/tcp_server.h +++ b/mindspore/ccsrc/ps/core/tcp_server.h @@ -35,7 +35,7 @@ #include #include "ps/core/tcp_message_handler.h" -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "utils/convert_utils_base.h" namespace mindspore { @@ -117,6 +117,7 @@ class TcpServer { static void EventCallback(struct bufferevent *, std::int16_t events, void *server); static void TimerCallback(evutil_socket_t fd, int16_t event, void *arg); static void TimerOnceCallback(evutil_socket_t fd, int16_t event, void *arg); + static void SetTcpNoDelay(const evutil_socket_t &fd); std::shared_ptr onCreateConnection(struct bufferevent *bev, const evutil_socket_t &fd); struct event_base *base_; diff --git a/mindspore/ccsrc/ps/core/worker_node.h b/mindspore/ccsrc/ps/core/worker_node.h index 8608ae430a..baa12b18e2 100644 --- a/mindspore/ccsrc/ps/core/worker_node.h +++ b/mindspore/ccsrc/ps/core/worker_node.h @@ -24,7 +24,7 @@ #include #include -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" #include "ps/core/tcp_client.h" #include "ps/core/tcp_server.h" #include "ps/core/abstract_node.h" @@ -37,7 +37,7 @@ class WorkerNode : public AbstractNode { WorkerNode() = default; ~WorkerNode() override; - bool Start(const uint32_t &timeout = ClusterConfig::cluster_available_timeout()) override; + bool Start(const uint32_t &timeout = ClusterMetadata::instance()->cluster_available_timeout()) override; bool Stop() override; bool Finish(const uint32_t &timeout = kTimeoutInSeconds) override; diff --git a/tests/ut/cpp/ps/core/cluster_available_timeout_test.cc b/tests/ut/cpp/ps/core/cluster_available_timeout_test.cc index 1efa1b15e7..4e66bd4c00 100644 --- a/tests/ut/cpp/ps/core/cluster_available_timeout_test.cc +++ b/tests/ut/cpp/ps/core/cluster_available_timeout_test.cc @@ -31,8 +31,8 @@ class TestClusterAvailableTimeout : public UT::Common { }; TEST_F(TestClusterAvailableTimeout, TestClusterAvailableTimeout) { - ClusterConfig::Init(1, 1, "127.0.0.1", 9999); - ClusterConfig::set_cluster_available_timeout(3); + ClusterMetadata::instance()->Init(1, 1, "127.0.0.1", 9999); + ClusterMetadata::instance()->set_cluster_available_timeout(3); SchedulerNode node; node.Start(); node.Finish(); diff --git a/tests/ut/cpp/ps/core/cluster_config_test.cc b/tests/ut/cpp/ps/core/cluster_metadata_test.cc similarity index 56% rename from tests/ut/cpp/ps/core/cluster_config_test.cc rename to tests/ut/cpp/ps/core/cluster_metadata_test.cc index 904034b5c7..694093e2df 100644 --- a/tests/ut/cpp/ps/core/cluster_config_test.cc +++ b/tests/ut/cpp/ps/core/cluster_metadata_test.cc @@ -18,27 +18,27 @@ #include #include "common/common_test.h" -#include "ps/core/cluster_config.h" +#include "ps/core/cluster_metadata.h" namespace mindspore { namespace ps { namespace core { -class TestClusterConfig : public UT::Common { +class TestClusterMetadata : public UT::Common { public: - TestClusterConfig() = default; - virtual ~TestClusterConfig() = default; + TestClusterMetadata() = default; + virtual ~TestClusterMetadata() = default; void SetUp() override {} void TearDown() override {} }; -TEST_F(TestClusterConfig, HeartbeatInterval) { - ClusterConfig::Init(2, 2, "127.0.0.1", 8080); - EXPECT_TRUE(ClusterConfig::heartbeat_interval() == 3); - ClusterConfig::set_heartbeat_interval(100); - EXPECT_TRUE(ClusterConfig::heartbeat_interval() == 100); - EXPECT_STREQ(ClusterConfig::scheduler_host().c_str(), "127.0.0.1"); - EXPECT_TRUE(ClusterConfig::scheduler_port() == 8080); +TEST_F(TestClusterMetadata, HeartbeatInterval) { + ClusterMetadata::instance()->Init(2, 2, "127.0.0.1", 8080); + EXPECT_TRUE(ClusterMetadata::instance()->heartbeat_interval() == 3); + ClusterMetadata::instance()->set_heartbeat_interval(100); + EXPECT_TRUE(ClusterMetadata::instance()->heartbeat_interval() == 100); + EXPECT_STREQ(ClusterMetadata::instance()->scheduler_host().c_str(), "127.0.0.1"); + EXPECT_TRUE(ClusterMetadata::instance()->scheduler_port() == 8080); } } // namespace core } // namespace ps diff --git a/tests/ut/cpp/ps/core/common_util_test.cc b/tests/ut/cpp/ps/core/common_util_test.cc index 3af77ded51..262cabe53e 100644 --- a/tests/ut/cpp/ps/core/common_util_test.cc +++ b/tests/ut/cpp/ps/core/common_util_test.cc @@ -53,7 +53,7 @@ TEST_F(TestCommUtil, GetAvailableInterfaceAndIP) { } TEST_F(TestCommUtil, ValidateRankId) { - ClusterConfig::Init(3, 2, "127.0.0.1", 9999); + ClusterMetadata::instance()->Init(3, 2, "127.0.0.1", 9999); EXPECT_TRUE(CommUtil::ValidateRankId(NodeRole::WORKER, 2)); EXPECT_FALSE(CommUtil::ValidateRankId(NodeRole::WORKER, 3)); EXPECT_TRUE(CommUtil::ValidateRankId(NodeRole::SERVER, 1)); diff --git a/tests/ut/cpp/ps/core/http_client_test.cc b/tests/ut/cpp/ps/core/http_client_test.cc index ed50abe2f6..2e56a946fb 100644 --- a/tests/ut/cpp/ps/core/http_client_test.cc +++ b/tests/ut/cpp/ps/core/http_client_test.cc @@ -62,6 +62,8 @@ class TestHttpClient : public UT::Common { if (memcpy_s(post_message, len, data, len) != 0) { MS_LOG(EXCEPTION) << "The memset_s error"; } + MS_LOG(WARNING) << "The path param:" << path_param; + MS_LOG(WARNING) << "The header param:" << header_param; EXPECT_STREQ(path_param.c_str(), "value1"); EXPECT_STREQ(header_param.c_str(), "headerValue"); EXPECT_STREQ(post_message, "postKey=postValue"); diff --git a/tests/ut/cpp/ps/core/http_server_test.cc b/tests/ut/cpp/ps/core/http_server_test.cc index e7e1e1fa63..752e20952c 100644 --- a/tests/ut/cpp/ps/core/http_server_test.cc +++ b/tests/ut/cpp/ps/core/http_server_test.cc @@ -95,9 +95,10 @@ class TestHttpServer : public UT::Common { if (memcpy_s(post_message, len, data, len) != 0) { MS_LOG(EXCEPTION) << "The memset_s error"; } + MS_LOG(WARNING) << "The Path param:" << path_param; + MS_LOG(WARNING) << "The header param:" << header_param; EXPECT_STREQ(path_param.c_str(), "value1"); EXPECT_STREQ(header_param.c_str(), "headerValue"); - EXPECT_STREQ(post_param.c_str(), "postValue"); EXPECT_STREQ(post_message, "postKey=postValue"); const std::string rKey("headKey"); @@ -127,7 +128,7 @@ class TestHttpServer : public UT::Common { std::unique_ptr server_; }; -TEST_F(TestHttpServer, httpGetQequest) { +TEST_F(TestHttpServer, httpGetRequest) { char buffer[100]; FILE *file; std::string cmd = "curl -X GET http://127.0.0.1:9999/httpget?key1=value1";