diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index fa92048c49..b36c922b33 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -351,7 +352,10 @@ Status CacheAdminArgHandler::RunCommand() { // have to wait for its complete shutdown because the server will shutdown // the comm layer as soon as the request is received, and we need to wait // on the message queue instead. - // The server will remove the queue and we will then wake up. + // The server will remove the queue and we will then wake up. But on the safe + // side, we will also set up an alarm and kill this proocess if we hang on + // the message queue. + alarm(15); Status dummy_rc; (void)msg.ReceiveStatus(&dummy_rc); break; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc index 9c23fec1a1..6c970e4b83 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.cc @@ -230,6 +230,9 @@ Status CacheClient::DestroyCache() { Status CacheClient::GetStat(CacheServiceStat *stat) { SharedLock lck(&mux_); RETURN_UNEXPECTED_IF_NULL(stat); + // GetStat has an external interface, so we have to make sure we have a valid connection id first + CHECK_FAIL_RETURN_UNEXPECTED(server_connection_id_ != 0, "GetStat called but the cache is not in use yet."); + auto rq = std::make_shared(server_connection_id_); RETURN_IF_NOT_OK(PushRequest(rq)); RETURN_IF_NOT_OK(rq->Wait()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc index ae8ea73af9..ef45a8f877 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc @@ -48,12 +48,10 @@ Status CachePool::DoServiceStop() { } sm_.reset(); - value_allocator alloc(mp_); - for (auto &bl : *tree_) { - if (bl.ptr != nullptr) { - alloc.deallocate(bl.ptr, bl.sz); - } - } + // We used to free the memory allocated from each DataLocator but + // since all of them are coming from NumaMemoryPool and we will + // skip this and release the whole NumaMemoryPool instead. Otherwise + // release each buffer in the DataLocator one by one. tree_.reset(); if (!root_.toString().empty()) { diff --git a/tests/ut/python/cachetests/cachetest_args.sh b/tests/ut/python/cachetests/cachetest_args.sh index 70ad5a18cf..82c4414ad1 100755 --- a/tests/ut/python/cachetests/cachetest_args.sh +++ b/tests/ut/python/cachetests/cachetest_args.sh @@ -175,7 +175,7 @@ num_cpu=$(grep -c processor /proc/cpuinfo) if [ $num_cpu -lt 100 ]; then cmd="${CACHE_ADMIN} --start -w 101" else - cmd="${CACHE_ADMIN} --start -w ${num_cpu}+1" + cmd="${CACHE_ADMIN} --start -w $(($num_cpu+1))" fi CacheAdminCmd "${cmd}" 1 HandleRcExit $? 0 0 diff --git a/tests/ut/python/dataset/test_cache_nomap.py b/tests/ut/python/dataset/test_cache_nomap.py index 2269ce47f9..52e7ddc650 100644 --- a/tests/ut/python/dataset/test_cache_nomap.py +++ b/tests/ut/python/dataset/test_cache_nomap.py @@ -339,7 +339,7 @@ def test_cache_nomap_basic8(): | TFReader """ - logger.info("Test cache basic 4") + logger.info("Test cache basic 8") if "SESSION_ID" in os.environ: session_id = int(os.environ['SESSION_ID']) else: @@ -355,7 +355,33 @@ def test_cache_nomap_basic8(): logger.info("Number of data in ds1: {} ".format(num_iter)) assert num_iter == 3 - logger.info('test_cache_basic3 Ended.\n') + logger.info('test_cache_basic8 Ended.\n') + + +@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") +def test_cache_nomap_basic9(): + """ + Testing the GetStat interface for getting some info from server, but this should fail if the cache is not created + in a pipeline. + """ + + logger.info("Test cache nomap basic 9") + if "SESSION_ID" in os.environ: + session_id = int(os.environ['SESSION_ID']) + else: + raise RuntimeError("Testcase requires SESSION_ID environment variable") + + some_cache = ds.DatasetCache(session_id=session_id, size=0, spilling=True) + + # Contact the server to get the statistics, this should fail because we have not used this cache in any pipeline + # so there will not be any cache to get stats on. + with pytest.raises(RuntimeError) as e: + stat = some_cache.GetStat() + cache_sz = stat.avg_cache_sz + logger.info("Average row cache size: {}".format(cache_sz)) + assert "Unexpected error" in str(e.value) + + logger.info("test_cache_nomap_basic9 Ended.\n") @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")