!8666 cache updates and libnuma build prerequisite

From: @jtnisbet
Reviewed-by: @liucunwei
Signed-off-by: @liucunwei
pull/8666/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 025bb7b125

@ -6,16 +6,26 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
# Try to find numa header file and its library
find_file(NUMA_HDR NAMES "numa.h")
if (EXISTS ${NUMA_HDR})
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
endif ()
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT)
# Try to find numa header file and its library
FIND_PATH( NUMA_INCLUDE_DIR numa.h )
MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" )
FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so )
MESSAGE( "Numa library is: ${NUMA_LIBRARY}" )
FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if ( NUMA_FOUND )
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()
add_library(engine-cache-client OBJECT
@ -71,7 +81,7 @@ if (ENABLE_CACHE)
target_link_libraries(cache_server mindspore::glog)
endif ()
if (EXISTS ${NUMA_HDR})
if (NUMA_FOUND)
target_link_libraries(cache_server numa)
endif ()

@ -32,23 +32,6 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
#endif
std::string warningMsg;
warningMsg.reserve(512);
warningMsg += "WARNING:\n";
warningMsg += "cache_admin and the cache server that it controls are currently only used for experimental research";
warningMsg += " purposes at this time.\n";
auto env_enable_cache = std::getenv("MS_ENABLE_CACHE");
if (env_enable_cache == nullptr || strcmp(env_enable_cache, "TRUE") != 0) {
// temporary disable cache feature in the current release
warningMsg += "This command is currently disabled. Quitting.\n";
std::cerr << warningMsg << std::endl;
return 0;
}
warningMsg += "It is not intended for general availability yet as it may not be stable. Use it at your own risk.\n";
// A warning message until the code is mature enough.
std::cerr << warningMsg << std::endl;
if (argc == 1) {
args.Help();
return 0;

@ -343,7 +343,7 @@ Status CacheAdminArgHandler::RunCommand() {
if (rc.IsError()) {
msg.RemoveResourcesOnExit();
if (rc.IsNetWorkError()) {
std::string errMsg = "Server is not up or has been shutdown already.";
std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already.";
return Status(StatusCode::kNetWorkError, errMsg);
}
return rc;
@ -355,9 +355,10 @@ Status CacheAdminArgHandler::RunCommand() {
// The server will remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this proocess if we hang on
// the message queue.
alarm(15);
alarm(30);
Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server has been stopped." << std::endl;
break;
}
case CommandId::kCmdGenerateSession: {
@ -384,6 +385,7 @@ Status CacheAdminArgHandler::RunCommand() {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
auto rq = std::make_shared<ListSessionsRequest>();
std::cout << "Listing sessions for server on port " << port_ << "\n" << std::endl;
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::vector<SessionCacheInfo> session_info = rq->GetSessionCacheInfo();
@ -481,12 +483,14 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {
RETURN_STATUS_UNEXPECTED(err_msg);
}
msg.resize(n);
std::cout << msg << std::endl;
if (WIFEXITED(status)) {
auto exit_status = WEXITSTATUS(status);
if (exit_status) {
std::string errMsg = "Child exit status " + std::to_string(exit_status);
std::string errMsg = msg + "\nChild exit status " + std::to_string(exit_status);
return Status(StatusCode::kUnexpectedError, errMsg);
} else {
// Not an error, some info message goes to stdout
std::cout << msg << std::endl;
}
}
return Status::OK();
@ -545,14 +549,14 @@ void CacheAdminArgHandler::Help() {
std::cerr << " [ [-l | --minloglevel] <log level> ]\n";
std::cerr << " Possible values are 0, 1, 2 and 3.\n";
std::cerr << " Default is 1 (info level).\n";
std::cerr << " [ --list_sessions ]\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [--help]" << std::endl;
// Do not expose these option to the user via help or documentation, but the options do exist to aid with
// development and tuning.
// std::cerr << " [ [-m | --shared_memory_size] <shared memory size> ]\n";
// std::cerr << " Default is " << kDefaultSharedMemorySizeInGB << " (Gb in unit).\n";
// std::cerr << " [ [-r | --memory_cap_ratio] <float percent value>]\n";
// std::cerr << " Default is " << kMemoryCapRatio << ".\n";
std::cerr << " [--help]" << std::endl;
// [ [-m | --shared_memory_size] <shared memory size> ]
// Default is: kDefaultSharedMemorySizeInGB (Gb in unit)
// [ [-r | --memory_cap_ratio] <float percent value>]
// Default is kMemoryCapRatio
}
} // namespace dataset
} // namespace mindspore

@ -88,7 +88,7 @@ ds::Status StartServer(int argc, char **argv) {
if (child_rc.IsError()) {
return child_rc;
}
std::cerr << "cache server daemon process has been created as process id: " << pid
std::cerr << "cache server daemon has been created as process id " << pid << " and listening on port " << port
<< "\nCheck log file for any start up error" << std::endl;
signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
return ds::Status::OK();

@ -631,6 +631,7 @@ Status CacheServer::GetCacheMissKeys(CacheRequest *rq, CacheReply *reply) {
inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) {
reply->set_result(std::to_string(session_id));
MS_LOG(INFO) << "Server generated new session id " << session_id;
return Status::OK();
}

@ -15,8 +15,8 @@
"""Cache client
"""
import os
import copy
from mindspore._c_dataengine import CacheClient
from ..core.validator_helpers import type_check, check_uint32, check_uint64
@ -39,12 +39,7 @@ class DatasetCache:
self.port = port
self.prefetch_size = prefetch_size
self.num_connections = num_connections
if os.getenv('MS_ENABLE_CACHE') != 'TRUE':
# temporary disable cache feature in the current release
self.cache_client = None
else:
from mindspore._c_dataengine import CacheClient
self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size)
self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size)
def GetStat(self):
return self.cache_client.GetStat()

@ -30,6 +30,7 @@ from ..core.validator_helpers import parse_user_args, type_check, type_check_lis
from . import datasets
from . import samplers
from . import cache_client
def check_imagefolderdataset(method):
@ -1259,8 +1260,4 @@ def check_paddeddataset(method):
def check_cache_option(cache):
"""Sanity check for cache parameter"""
if cache is not None:
if os.getenv('MS_ENABLE_CACHE') != 'TRUE':
# temporary disable cache feature in the current release
raise ValueError("Caching is disabled in the current release.")
from . import cache_client
type_check(cache, (cache_client.DatasetCache,), "cache")

Loading…
Cancel
Save