Default to memory-only cache server

pull/11559/head
Lixia Chen 4 years ago
parent 6394ba3974
commit be6d310946

@ -42,14 +42,6 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
#endif
// Create default spilling dir
ds::Path spill_dir = ds::Path(ds::DefaultSpillDir());
rc = spill_dir.CreateDirectories();
if (!rc.IsOk()) {
std::cerr << rc.ToString() << std::endl;
return 1;
}
if (argc == 1) {
args.Help();
return 0;

@ -48,7 +48,7 @@ CacheAdminArgHandler::CacheAdminArgHandler()
log_level_(kDefaultLogLevel),
memory_cap_ratio_(kMemoryCapRatio),
hostname_(kCfgDefaultCacheHost),
spill_dir_(DefaultSpillDir()),
spill_dir_(""),
command_id_(CommandId::kCmdUnknown) {
// Initialize the command mappings
arg_map_["-h"] = ArgValue::kArgHost;
@ -334,32 +334,7 @@ Status CacheAdminArgHandler::RunCommand() {
break;
}
case CommandId::kCmdStop: {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
SharedMessage msg;
RETURN_IF_NOT_OK(msg.Create());
auto rq = std::make_shared<ServerStopRequest>(msg.GetMsgQueueId());
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
Status rc = rq->Wait();
if (rc.IsError()) {
msg.RemoveResourcesOnExit();
if (rc.IsNetWorkError()) {
std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already.";
return Status(StatusCode::kNetWorkError, errMsg);
}
return rc;
}
// OK return code only means the server acknowledge our request but we still
// have to wait for its complete shutdown because the server will shutdown
// the comm layer as soon as the request is received, and we need to wait
// on the message queue instead.
// The server will send a message back and remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this process if we hang on
// the message queue.
alarm(60);
Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
RETURN_IF_NOT_OK(StopServer(command_id_));
break;
}
case CommandId::kCmdGenerateSession: {
@ -430,6 +405,36 @@ Status CacheAdminArgHandler::RunCommand() {
return Status::OK();
}
Status CacheAdminArgHandler::StopServer(CommandId command_id) {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
SharedMessage msg;
RETURN_IF_NOT_OK(msg.Create());
auto rq = std::make_shared<ServerStopRequest>(msg.GetMsgQueueId());
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
Status rc = rq->Wait();
if (rc.IsError()) {
msg.RemoveResourcesOnExit();
if (rc.IsNetWorkError()) {
std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already.";
return Status(StatusCode::kNetWorkError, errMsg);
}
return rc;
}
// OK return code only means the server acknowledge our request but we still
// have to wait for its complete shutdown because the server will shutdown
// the comm layer as soon as the request is received, and we need to wait
// on the message queue instead.
// The server will send a message back and remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this process if we hang on
// the message queue.
alarm(60);
Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl;
return Status::OK();
}
Status CacheAdminArgHandler::StartServer(CommandId command_id) {
// There currently does not exist any "install path" or method to identify which path the installed binaries will
// exist in. As a temporary approach, we will assume that the server binary shall exist in the same path as the
@ -462,7 +467,6 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {
// fork the child process to become the daemon
pid_t pid;
pid = fork();
// failed to fork
if (pid < 0) {
std::string err_msg = "Failed to fork process for cache server: " + std::to_string(errno);
@ -538,7 +542,7 @@ void CacheAdminArgHandler::Help() {
std::cerr << " [[-h | --hostname] <hostname>] Default is " << kCfgDefaultCacheHost << ".\n";
std::cerr << " [[-p | --port] <port number>] Default is " << kCfgDefaultCachePort << ".\n";
std::cerr << " [[-w | --workers] <number of workers>] Default is " << kDefaultNumWorkers << ".\n";
std::cerr << " [[-s | --spilldir] <spilling directory>] Default is " << DefaultSpillDir() << ".\n";
std::cerr << " [[-s | --spilldir] <spilling directory>] Default is no spilling.\n";
std::cerr << " [[-l | --loglevel] <log level>] Default is 1 (warning level).\n";
std::cerr << " [--destroy_session | -d] <session id>\n";
std::cerr << " [[-p | --port] <port number>]\n";

@ -79,6 +79,8 @@ class CacheAdminArgHandler {
Status StartServer(CommandId command_id);
Status StopServer(CommandId command_id);
Status AssignArg(std::string option, int32_t *out_arg, std::stringstream *arg_stream,
CommandId command_id = CommandId::kCmdUnknown);

@ -91,9 +91,6 @@ using worker_id_t = int32_t;
using numa_id_t = int32_t;
using cpu_id_t = int32_t;
/// Return the default spill dir for cache
inline std::string DefaultSpillDir() { return kDefaultPathPrefix; }
/// Return the default log dir for cache
inline std::string DefaultLogDir() { return kDefaultPathPrefix + std::string("/log"); }

@ -125,6 +125,33 @@ class BaseRequest {
/// \return Status object
Status Wait();
/// \brief Return if the request is of row request type
/// \return True if the request is row-related request
bool IsRowRequest() const {
return type_ == RequestType::kBatchCacheRows || type_ == RequestType::kBatchFetchRows ||
type_ == RequestType::kInternalCacheRow || type_ == RequestType::kInternalFetchRow ||
type_ == RequestType::kCacheRow;
}
/// \brief Return if the request is of admin request type
/// \return True if the request is admin-related request
bool IsAdminRequest() const {
return type_ == RequestType::kCreateCache || type_ == RequestType::kDestroyCache ||
type_ == RequestType::kGetStat || type_ == RequestType::kGetCacheState ||
type_ == RequestType::kAllocateSharedBlock || type_ == RequestType::kFreeSharedBlock ||
type_ == RequestType::kCacheSchema || type_ == RequestType::kFetchSchema ||
type_ == RequestType::kBuildPhaseDone || type_ == RequestType::kToggleWriteMode ||
type_ == RequestType::kConnectReset || type_ == RequestType::kStopService ||
type_ == RequestType::kHeartBeat || type_ == RequestType::kGetCacheMissKeys;
}
/// \brief Return if the request is of session request type
/// \return True if the request is session-related request
bool IsSessionRequest() const {
return type_ == RequestType::kGenerateSessionId || type_ == RequestType::kDropSession ||
type_ == RequestType::kListSessions;
}
protected:
CacheRequest rq_; // This is what we send to the server
CacheReply reply_; // This is what the server send back

File diff suppressed because it is too large Load Diff

@ -201,6 +201,22 @@ class CacheServer : public Service {
/// \brief Return the memory cap ratio
float GetMemoryCapRatio() const { return memory_cap_ratio_; }
/// \brief Function to handle a row request
/// \param[in] cache_req A row request to handle
/// \param[out] internal_request Indicator if the request is an internal request
/// \return Status object
Status ProcessRowRequest(CacheServerRequest *cache_req, bool *internal_request);
/// \brief Function to handle an admin request
/// \param[in] cache_req An admin request to handle
/// \return Status object
Status ProcessAdminRequest(CacheServerRequest *cache_req);
/// \brief Function to handle a session request
/// \param[in] cache_req A session request to handle
/// \return Status object
Status ProcessSessionRequest(CacheServerRequest *cache_req);
/// \brief How a request is handled.
/// \note that it can be process immediately by a grpc thread or routed to a server thread
/// which is pinned to some numa node core.
@ -256,6 +272,12 @@ class CacheServer : public Service {
/// \return Pointer to cache service. Null if not found
CacheService *GetService(connection_id_type id) const;
/// \brief Going over existing cache service and calculate how much we have consumed so far, a new cache service
/// can only be created if there is still enough avail memory left
/// \param cache_mem_sz Requested memory for a new cache service
/// \return Status object
Status GlobalMemoryCheck(uint64_t cache_mem_sz);
/// \brief Create a cache service. We allow multiple clients to create the same cache service.
/// Subsequent duplicate requests are ignored. The first cache client to create the service will be given
/// a special unique cookie.
@ -314,6 +336,12 @@ class CacheServer : public Service {
/// \return Status object
Status GetStat(CacheRequest *rq, CacheReply *reply);
/// \brief Internal function to get cache state
/// \param rq
/// \param reply
/// \return Status object
Status GetCacheState(CacheRequest *rq, CacheReply *reply);
/// \brief Cache a schema request
/// \param rq
/// \return Status object
@ -411,6 +439,9 @@ class CacheServer : public Service {
/// \return Status object
Status BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuilder> &fbb, WritableSlice *out);
Status BatchCacheRows(CacheRequest *rq);
Status InternalFetchRow(CacheRequest *rq);
Status InternalCacheRow(CacheRequest *rq, CacheReply *reply);
};
} // namespace dataset
} // namespace mindspore

@ -15,7 +15,8 @@
# ============================================================================
# source the globals and functions for use with cache testing
SKIP_ADMIN_COUNTER=false
export SKIP_ADMIN_COUNTER=false
declare failed_tests
. cachetest_lib.sh
echo

@ -15,7 +15,8 @@
# ============================================================================
# source the globals and functions for use with cache testing
SKIP_ADMIN_COUNTER=true
export SKIP_ADMIN_COUNTER=true
declare session_id failed_tests
. cachetest_lib.sh
echo
@ -28,8 +29,10 @@ UT_TEST_DIR="${BUILD_PATH}/mindspore/tests/ut/cpp"
DateStamp=$(date +%Y%m%d_%H%M%S);
CPP_TEST_LOG_OUTPUT="/tmp/ut_tests_cache_${DateStamp}.log"
# Start a basic cache server to be used for all tests
StartServer
# start cache server with a spilling path to be used for all tests
cmd="${CACHE_ADMIN} --start -s /tmp"
CacheAdminCmd "${cmd}" 0
sleep 1
HandleRcExit $? 1 1
# Set the environment variable to enable these pytests

@ -15,7 +15,8 @@
# ============================================================================
# source the globals and functions for use with cache testing
SKIP_ADMIN_COUNTER=true
export SKIP_ADMIN_COUNTER=true
declare session_id failed_tests
. cachetest_lib.sh
echo
@ -84,10 +85,6 @@ export SESSION_ID=$session_id
PytestCmd "test_cache_map.py" "test_cache_map_running_twice2"
HandleRcExit $? 0 0
# Set size parameter of DatasetCache to a extra small value
PytestCmd "test_cache_map.py" "test_cache_map_extra_small_size" 1
HandleRcExit $? 0 0
PytestCmd "test_cache_map.py" "test_cache_map_no_image"
HandleRcExit $? 0 0
@ -255,15 +252,6 @@ export SESSION_ID=$session_id
PytestCmd "test_cache_nomap.py" "test_cache_nomap_running_twice2"
HandleRcExit $? 0 0
# Set size parameter of DatasetCache to a extra small value
GetSession
HandleRcExit $? 1 1
export SESSION_ID=$session_id
PytestCmd "test_cache_nomap.py" "test_cache_nomap_extra_small_size" 1
HandleRcExit $? 0 0
DestroySession $session_id
HandleRcExit $? 1 1
# Run two parallel pipelines (sharing cache)
for i in $(seq 1 2)
do
@ -366,7 +354,7 @@ HandleRcExit $? 1 1
export SESSION_ID=$session_id
PytestCmd "test_cache_nomap.py" "test_cache_nomap_session_destroy" &
pid=("$!")
pid=$!
sleep 10
DestroySession $session_id
@ -381,7 +369,7 @@ HandleRcExit $? 1 1
export SESSION_ID=$session_id
PytestCmd "test_cache_nomap.py" "test_cache_nomap_server_stop" &
pid=("$!")
pid=$!
sleep 10
StopServer
@ -417,6 +405,26 @@ HandleRcExit $? 0 0
StopServer
HandleRcExit $? 0 1
# start cache server with a spilling path
cmd="${CACHE_ADMIN} --start -s /tmp"
CacheAdminCmd "${cmd}" 0
sleep 1
HandleRcExit $? 0 0
GetSession
HandleRcExit $? 1 1
export SESSION_ID=$session_id
# Set size parameter of mappable DatasetCache to a extra small value
PytestCmd "test_cache_map.py" "test_cache_map_extra_small_size" 1
HandleRcExit $? 0 0
# Set size parameter of non-mappable DatasetCache to a extra small value
PytestCmd "test_cache_nomap.py" "test_cache_nomap_extra_small_size" 1
HandleRcExit $? 0 0
StopServer
HandleRcExit $? 0 1
unset RUN_CACHE_TEST
unset SESSION_ID

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save