!8980 {WIP} Minor cache fixes for HQ testers

From: @lixiachen
Reviewed-by: @liucunwei,@jonyguo
Signed-off-by: @liucunwei
pull/8980/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit 2fb5ab631d

@ -19,6 +19,8 @@
#include <glog/logging.h>
#endif
#include "minddata/dataset/engine/cache/cache_admin_arg.h"
#include "minddata/dataset/engine/cache/cache_common.h"
#include "minddata/dataset/util/path.h"
namespace ds = mindspore::dataset;
@ -28,10 +30,26 @@ int main(int argc, char **argv) {
std::stringstream arg_stream;
#ifdef USE_GLOG
FLAGS_log_dir = "/tmp";
FLAGS_minloglevel = google::WARNING;
FLAGS_log_dir = ds::DefaultLogDir();
// Create default log dir
ds::Path log_dir = ds::Path(FLAGS_log_dir);
rc = log_dir.CreateDirectories();
if (!rc.IsOk()) {
std::cerr << rc.ToString() << std::endl;
return 1;
}
google::InitGoogleLogging(argv[0]);
#endif
// Create default spilling dir
ds::Path spill_dir = ds::Path(ds::DefaultSpillDir());
rc = spill_dir.CreateDirectories();
if (!rc.IsOk()) {
std::cerr << rc.ToString() << std::endl;
return 1;
}
if (argc == 1) {
args.Help();
return 0;

@ -39,7 +39,6 @@ const int32_t CacheAdminArgHandler::kDefaultNumWorkers = std::thread::hardware_c
? std::thread::hardware_concurrency() / 2
: 1;
const char CacheAdminArgHandler::kServerBinary[] = "cache_server";
const char CacheAdminArgHandler::kDefaultSpillDir[] = "/tmp";
CacheAdminArgHandler::CacheAdminArgHandler()
: port_(kCfgDefaultCachePort),
@ -49,7 +48,7 @@ CacheAdminArgHandler::CacheAdminArgHandler()
log_level_(kDefaultLogLevel),
memory_cap_ratio_(kMemoryCapRatio),
hostname_(kCfgDefaultCacheHost),
spill_dir_(kDefaultSpillDir),
spill_dir_(DefaultSpillDir()),
command_id_(CommandId::kCmdUnknown) {
// Initialize the command mappings
arg_map_["-h"] = ArgValue::kArgHost;
@ -70,7 +69,7 @@ CacheAdminArgHandler::CacheAdminArgHandler()
arg_map_["-m"] = ArgValue::kArgSharedMemorySize;
arg_map_["--shared_memory_size"] = ArgValue::kArgSharedMemorySize;
arg_map_["-l"] = ArgValue::kArgLogLevel;
arg_map_["--minloglevel"] = ArgValue::kArgLogLevel;
arg_map_["--loglevel"] = ArgValue::kArgLogLevel;
arg_map_["-r"] = ArgValue::kArgMemoryCapRatio;
arg_map_["--memory_cap_ratio"] = ArgValue::kArgMemoryCapRatio;
arg_map_["--list_sessions"] = ArgValue::kArgListSessions;
@ -306,6 +305,7 @@ Status CacheAdminArgHandler::Validate() {
// run.
if (command_id_ == CommandId::kCmdUnknown) {
std::string err_msg = "No command provided";
err_msg += "\nPlease try `cache_admin --help` for more information";
return Status(StatusCode::kSyntaxError, err_msg);
}
@ -353,12 +353,12 @@ Status CacheAdminArgHandler::RunCommand() {
// the comm layer as soon as the request is received, and we need to wait
// on the message queue instead.
// The server will remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this proocess if we hang on
// side, we will also set up an alarm and kill this process if we hang on
// the message queue.
alarm(30);
Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server has been stopped." << std::endl;
std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
break;
}
case CommandId::kCmdGenerateSession: {
@ -367,7 +367,8 @@ Status CacheAdminArgHandler::RunCommand() {
auto rq = std::make_shared<GenerateSessionIdRequest>();
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Session: " << rq->GetSessionId() << std::endl;
std::cout << "Session created for server on port " << std::to_string(port_) << ": " << rq->GetSessionId()
<< std::endl;
break;
}
case CommandId::kCmdDestroySession: {
@ -378,7 +379,7 @@ Status CacheAdminArgHandler::RunCommand() {
auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session successful" << std::endl;
std::cout << "Drop session successfully for server on port " << std::to_string(port_) << std::endl;
break;
}
case CommandId::kCmdListSessions: {
@ -545,8 +546,8 @@ void CacheAdminArgHandler::Help() {
std::cerr << " Possible values are in range [1...max(100, Number of CPU)].\n";
std::cerr << " Default is " << kDefaultNumWorkers << ".\n";
std::cerr << " [ [-s | --spilldir] <spilling directory> ]\n";
std::cerr << " Default is " << kDefaultSpillDir << ".\n";
std::cerr << " [ [-l | --minloglevel] <log level> ]\n";
std::cerr << " Default is " << DefaultSpillDir() << ".\n";
std::cerr << " [ [-l | --loglevel] <log level> ]\n";
std::cerr << " Possible values are 0, 1, 2 and 3.\n";
std::cerr << " Default is 1 (info level).\n";
std::cerr << " [--list_sessions]\n";

@ -35,7 +35,6 @@ class CacheAdminArgHandler {
static constexpr int32_t kDefaultLogLevel = 1;
static constexpr float kMemoryCapRatio = 0.8;
static const char kServerBinary[];
static const char kDefaultSpillDir[];
// These are the actual command types to execute
enum class CommandId : int16_t {

@ -48,6 +48,8 @@ constexpr static uint32_t kLocalClientSupport = 1;
constexpr static uint32_t kDataIsInSharedMemory = 2;
/// \brief Size of each message used in message queue.
constexpr static int32_t kSharedMessageSize = 2048;
/// \brief Prefix for default cache spilling path and log path
const char kDefaultPathPrefix[] = "/tmp/mindspore/cache";
/// \brief State of CacheService at the server.
enum class CacheServiceState : int8_t {
@ -70,7 +72,9 @@ inline void Status2CacheReply(const Status &rc, CacheReply *reply) {
/// \brief Generate the unix socket file we use on both client/server side given a tcp/ip port number
/// \param port
/// \return unix socket url
inline std::string PortToUnixSocketPath(int port) { return "/tmp/cache_server_p" + std::to_string(port); }
inline std::string PortToUnixSocketPath(int port) {
return kDefaultPathPrefix + std::string("/cache_server_p") + std::to_string(port);
}
/// \brief Round up to the next 4k
inline int64_t round_up_4K(int64_t sz) {
@ -87,6 +91,12 @@ using worker_id_t = int32_t;
using numa_id_t = int32_t;
using cpu_id_t = int32_t;
/// Return the default spill dir for cache
inline std::string DefaultSpillDir() { return kDefaultPathPrefix; }
/// Return the default log dir for cache
inline std::string DefaultLogDir() { return kDefaultPathPrefix + std::string("/log"); }
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_

@ -25,6 +25,7 @@ CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port
// We need to bump up the message size to unlimited. The default receiving
// message limit is 4MB which is not big enough.
args.SetMaxReceiveMessageSize(-1);
MS_LOG(INFO) << "Hostname: " << hostname << ".";
#if CACHE_LOCAL_CLIENT
// Try connect locally to the unix_socket first as the first preference
// Need to resolve hostname to ip address rather than to do a string compare

@ -44,10 +44,6 @@ ds::Status StartServer(int argc, char **argv) {
.SetSharedMemorySizeInGB(strtol(argv[4], nullptr, 10))
.SetMemoryCapRatio(strtof(argv[7], nullptr));
#ifdef USE_GLOG
FLAGS_minloglevel = strtol(argv[5], nullptr, 10);
#endif
auto daemonize_string = argv[6];
bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 ||
strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0;
@ -63,7 +59,16 @@ ds::Status StartServer(int argc, char **argv) {
ds::SharedMessage msg;
if (daemonize) {
#ifdef USE_GLOG
FLAGS_log_dir = "/tmp";
// temporary setting log level to WARNING to avoid logging when creating dir
FLAGS_minloglevel = google::WARNING;
FLAGS_log_dir = ds::DefaultLogDir();
// Create cache server default log dir
ds::Path log_dir = ds::Path(FLAGS_log_dir);
rc = log_dir.CreateDirectories();
if (rc.IsError()) {
return rc;
}
FLAGS_minloglevel = strtol(argv[5], nullptr, 10);
google::InitGoogleLogging(argv[0]);
#endif
rc = msg.Create();
@ -88,8 +93,12 @@ ds::Status StartServer(int argc, char **argv) {
if (child_rc.IsError()) {
return child_rc;
}
std::cerr << "cache server daemon has been created as process id " << pid << " and listening on port " << port
<< "\nCheck log file for any start up error" << std::endl;
std::cout << "Cache server startup completed successfully!\n";
std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
<< std::endl;
std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
"logs (under "
<< ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
return ds::Status::OK();
} else {
@ -109,6 +118,7 @@ ds::Status StartServer(int argc, char **argv) {
}
// Dump the summary
MS_LOG(INFO) << "Logging services started with log level: " << argv[5];
MS_LOG(INFO) << builder << std::endl;
// Create the instance with some sanity checks built in
rc = builder.Build();

@ -1041,7 +1041,8 @@ Status CacheServer::DestroySession(CacheRequest *rq) {
"A destroy cache request has been completed but it had a stale session id " + std::to_string(drop_session_id);
RETURN_STATUS_UNEXPECTED(errMsg);
} else {
std::string errMsg = "Session id " + std::to_string(drop_session_id) + " not found.";
std::string errMsg =
"Session id " + std::to_string(drop_session_id) + " not found in server on port " + std::to_string(port_) + ".";
return Status(StatusCode::kFileNotExist, errMsg);
}
}
@ -1234,7 +1235,7 @@ Status CacheServer::Builder::SanityCheck() {
}
CacheServer::Builder::Builder()
: top_("/tmp"),
: top_(DefaultSpillDir()),
num_workers_(std::thread::hardware_concurrency() / 2),
port_(50052),
shared_memory_sz_in_gb_(kDefaultSharedMemorySize),

@ -84,9 +84,9 @@ Status CacheBase::FetchSamplesToWorkers() {
// Instead of sending sampler id to WorkerEntry, we send them to the Prefetcher which will redirect them
// to the WorkerEntry.
do {
if (AllowCacheMiss() && wait_cnt > 0) {
MS_LOG(WARNING) << "Epoch: " << wait_cnt << " Cache Miss : " << num_cache_miss_
<< " Total number of rows : " << row_cnt_;
if (AllowCacheMiss() && wait_cnt > 0 && wait_cnt % op_num_repeats_per_epoch() == 0) {
MS_LOG(INFO) << "Epoch: " << op_current_epochs_ << " Cache Miss : " << num_cache_miss_
<< " Total number of rows : " << row_cnt_;
}
num_cache_miss_ = 0;
row_cnt_ = 0;
@ -167,8 +167,8 @@ Status CacheBase::FetchSamplesToWorkers() {
}
// Dump the last epoch result (approximately) without waiting for the worker threads to come back.
if (AllowCacheMiss()) {
MS_LOG(WARNING) << "Epoch: " << wait_cnt << " Cache Miss : " << num_cache_miss_
<< " Total number of rows : " << row_cnt_;
MS_LOG(INFO) << "Epoch: " << wait_cnt / op_num_repeats_per_epoch() << " Cache Miss : " << num_cache_miss_
<< " Total number of rows : " << row_cnt_;
}
return Status::OK();
}

Loading…
Cancel
Save