diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt index 1fe3b1c0d0..1318ce63b0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt @@ -6,16 +6,26 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) -# Try to find numa header file and its library -find_file(NUMA_HDR NAMES "numa.h") - -if (EXISTS ${NUMA_HDR}) - ADD_DEFINITIONS(-DNUMA_ENABLED) - MESSAGE("Numa package found") -endif () - if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT) + + # Try to find numa header file and its library + FIND_PATH( NUMA_INCLUDE_DIR numa.h ) + MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" ) + + FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so ) + MESSAGE( "Numa library is: ${NUMA_LIBRARY}" ) + + FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG + NUMA_INCLUDE_DIR + NUMA_LIBRARY + ) + if ( NUMA_FOUND ) + ADD_DEFINITIONS(-DNUMA_ENABLED) + MESSAGE("Numa package found") + else() + MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'") + endif() endif () add_library(engine-cache-client OBJECT @@ -71,7 +81,7 @@ if (ENABLE_CACHE) target_link_libraries(cache_server mindspore::glog) endif () - if (EXISTS ${NUMA_HDR}) + if (NUMA_FOUND) target_link_libraries(cache_server numa) endif () diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc index 92c2380981..7da6c66c2e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc @@ -32,23 +32,6 @@ int main(int argc, char **argv) { google::InitGoogleLogging(argv[0]); #endif - std::string warningMsg; - warningMsg.reserve(512); - warningMsg += "WARNING:\n"; - warningMsg += "cache_admin and the cache server that it controls are currently only used for experimental research"; - warningMsg += " purposes at this time.\n"; - auto env_enable_cache = std::getenv("MS_ENABLE_CACHE"); - if (env_enable_cache == nullptr || strcmp(env_enable_cache, "TRUE") != 0) { - // temporary disable cache feature in the current release - warningMsg += "This command is currently disabled. Quitting.\n"; - std::cerr << warningMsg << std::endl; - return 0; - } - warningMsg += "It is not intended for general availability yet as it may not be stable. Use it at your own risk.\n"; - - // A warning message until the code is mature enough. - std::cerr << warningMsg << std::endl; - if (argc == 1) { args.Help(); return 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index b36c922b33..ad203e393f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -343,7 +343,7 @@ Status CacheAdminArgHandler::RunCommand() { if (rc.IsError()) { msg.RemoveResourcesOnExit(); if (rc.IsNetWorkError()) { - std::string errMsg = "Server is not up or has been shutdown already."; + std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already."; return Status(StatusCode::kNetWorkError, errMsg); } return rc; @@ -355,9 +355,10 @@ Status CacheAdminArgHandler::RunCommand() { // The server will remove the queue and we will then wake up. But on the safe // side, we will also set up an alarm and kill this proocess if we hang on // the message queue. - alarm(15); + alarm(30); Status dummy_rc; (void)msg.ReceiveStatus(&dummy_rc); + std::cout << "Cache server has been stopped." << std::endl; break; } case CommandId::kCmdGenerateSession: { @@ -384,6 +385,7 @@ Status CacheAdminArgHandler::RunCommand() { CacheClientGreeter comm(hostname_, port_, 1); RETURN_IF_NOT_OK(comm.ServiceStart()); auto rq = std::make_shared(); + std::cout << "Listing sessions for server on port " << port_ << "\n" << std::endl; RETURN_IF_NOT_OK(comm.HandleRequest(rq)); RETURN_IF_NOT_OK(rq->Wait()); std::vector session_info = rq->GetSessionCacheInfo(); @@ -481,12 +483,14 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) { RETURN_STATUS_UNEXPECTED(err_msg); } msg.resize(n); - std::cout << msg << std::endl; if (WIFEXITED(status)) { auto exit_status = WEXITSTATUS(status); if (exit_status) { - std::string errMsg = "Child exit status " + std::to_string(exit_status); + std::string errMsg = msg + "\nChild exit status " + std::to_string(exit_status); return Status(StatusCode::kUnexpectedError, errMsg); + } else { + // Not an error, some info message goes to stdout + std::cout << msg << std::endl; } } return Status::OK(); @@ -545,14 +549,14 @@ void CacheAdminArgHandler::Help() { std::cerr << " [ [-l | --minloglevel] ]\n"; std::cerr << " Possible values are 0, 1, 2 and 3.\n"; std::cerr << " Default is 1 (info level).\n"; - std::cerr << " [ --list_sessions ]\n"; + std::cerr << " [--list_sessions]\n"; + std::cerr << " [--help]" << std::endl; // Do not expose these option to the user via help or documentation, but the options do exist to aid with // development and tuning. - // std::cerr << " [ [-m | --shared_memory_size] ]\n"; - // std::cerr << " Default is " << kDefaultSharedMemorySizeInGB << " (Gb in unit).\n"; - // std::cerr << " [ [-r | --memory_cap_ratio] ]\n"; - // std::cerr << " Default is " << kMemoryCapRatio << ".\n"; - std::cerr << " [--help]" << std::endl; + // [ [-m | --shared_memory_size] ] + // Default is: kDefaultSharedMemorySizeInGB (Gb in unit) + // [ [-r | --memory_cap_ratio] ] + // Default is kMemoryCapRatio } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc index 42283dd534..14ae5fc7f2 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc @@ -88,7 +88,7 @@ ds::Status StartServer(int argc, char **argv) { if (child_rc.IsError()) { return child_rc; } - std::cerr << "cache server daemon process has been created as process id: " << pid + std::cerr << "cache server daemon has been created as process id " << pid << " and listening on port " << port << "\nCheck log file for any start up error" << std::endl; signal(SIGCHLD, SIG_IGN); // ignore sig child signal. return ds::Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index 7445c65ddf..0a15e48d65 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -631,6 +631,7 @@ Status CacheServer::GetCacheMissKeys(CacheRequest *rq, CacheReply *reply) { inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) { reply->set_result(std::to_string(session_id)); + MS_LOG(INFO) << "Server generated new session id " << session_id; return Status::OK(); } diff --git a/mindspore/dataset/engine/cache_client.py b/mindspore/dataset/engine/cache_client.py index 2abe7aa4b3..1a2f90d79f 100644 --- a/mindspore/dataset/engine/cache_client.py +++ b/mindspore/dataset/engine/cache_client.py @@ -15,8 +15,8 @@ """Cache client """ -import os import copy +from mindspore._c_dataengine import CacheClient from ..core.validator_helpers import type_check, check_uint32, check_uint64 @@ -39,12 +39,7 @@ class DatasetCache: self.port = port self.prefetch_size = prefetch_size self.num_connections = num_connections - if os.getenv('MS_ENABLE_CACHE') != 'TRUE': - # temporary disable cache feature in the current release - self.cache_client = None - else: - from mindspore._c_dataengine import CacheClient - self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size) + self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size) def GetStat(self): return self.cache_client.GetStat() diff --git a/mindspore/dataset/engine/validators.py b/mindspore/dataset/engine/validators.py index d106d8be47..00b533dab9 100644 --- a/mindspore/dataset/engine/validators.py +++ b/mindspore/dataset/engine/validators.py @@ -30,6 +30,7 @@ from ..core.validator_helpers import parse_user_args, type_check, type_check_lis from . import datasets from . import samplers +from . import cache_client def check_imagefolderdataset(method): @@ -1259,8 +1260,4 @@ def check_paddeddataset(method): def check_cache_option(cache): """Sanity check for cache parameter""" if cache is not None: - if os.getenv('MS_ENABLE_CACHE') != 'TRUE': - # temporary disable cache feature in the current release - raise ValueError("Caching is disabled in the current release.") - from . import cache_client type_check(cache, (cache_client.DatasetCache,), "cache")