From a0e75b845a7c4f83e65b2d365e9208e04cf5ff97 Mon Sep 17 00:00:00 2001 From: RobinGrosman Date: Wed, 10 Mar 2021 18:38:11 -0800 Subject: [PATCH] Increase default level of parallelism in mindddata pipeline to 8. Adjust memory usage to not increase as parallelism increases. it will stay at same level it would be with 4 parallelism --- .../ccsrc/minddata/dataset/core/constants.h | 2 +- .../dataset/engine/datasetops/batch_op.cc | 22 ++++++++++++++++--- .../dataset/engine/datasetops/parallel_op.cc | 8 ++++++- mindspore/dataset/core/config.py | 7 +++++- .../cpp/dataset/c_api_dataset_config_test.cc | 2 +- tests/ut/data/dataset/declient.cfg | 2 +- tests/ut/python/dataset/test_config.py | 4 ++-- 7 files changed, 37 insertions(+), 10 deletions(-) diff --git a/mindspore/ccsrc/minddata/dataset/core/constants.h b/mindspore/ccsrc/minddata/dataset/core/constants.h index d749827c88..fc621e00d7 100644 --- a/mindspore/ccsrc/minddata/dataset/core/constants.h +++ b/mindspore/ccsrc/minddata/dataset/core/constants.h @@ -87,7 +87,7 @@ constexpr int64_t kDeMaxFreq = std::numeric_limits::max(); // 92233720 constexpr int64_t kDeMaxTopk = std::numeric_limits::max(); constexpr uint32_t kCfgRowsPerBuffer = 1; -constexpr uint32_t kCfgParallelWorkers = 4; +constexpr uint32_t kCfgParallelWorkers = 8; constexpr uint32_t kCfgWorkerConnectorSize = 16; constexpr uint32_t kCfgOpConnectorSize = 16; constexpr int32_t kCfgDefaultRankId = -1; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index 3aeb908ea6..9d2c0efb3c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -77,7 +77,15 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, pad_info_(pad_map), batch_num_(0), batch_cnt_(0) { - worker_queues_.Init(num_workers, op_queue_size); + // Adjust connector queue size. After batch each row is batch_size times larger + int32_t queue_size; + queue_size = std::max(1, op_queue_size / start_batch_size_); + if (num_workers == 1) { + // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2 + queue_size = std::max(2, queue_size); + } + + worker_queues_.Init(num_workers, queue_size); } // if PYTHON is disabled. per_batch_map can't be used #else @@ -88,8 +96,16 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, drop_(drop), pad_(pad), in_col_names_(cols_to_map), - pad_info_(pad_map) { - worker_queues_.Init(num_workers, op_queue_size); + pad_info_(pad_map), + batch_num_(0), + batch_cnt_(0) { + int32_t queue_size; + queue_size = std::max(1, op_queue_size / start_batch_size_); + if (num_workers == 1) { + // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2 + queue_size = std::max(2, queue_size); + } + worker_queues_.Init(num_workers, queue_size); } #endif diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc index a92690c097..aa4f3a2605 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.cc @@ -33,7 +33,13 @@ ParallelOp::ParallelOp(int32_t num_workers, int32_t op_connector_size, std::shar worker_connector_size_(1), worker_connector_(nullptr), num_workers_paused_(0), - epoch_sync_flag_(false) {} + epoch_sync_flag_(false) { + // reduce excessive memory usage with high parallelism + // when num_workers > 4, reduce op_connector_size to have similar total size if there were only 4 workers + if (num_workers_ > 4) { + oc_queue_size_ = std::max(1, op_connector_size * 4 / num_workers_); + } +} // Creates the internal worker connector for the parallel op if the derived class wants to use it Status ParallelOp::CreateWorkerConnector(int32_t worker_connector_size) { diff --git a/mindspore/dataset/core/config.py b/mindspore/dataset/core/config.py index 59f76ef3e4..1a7eb3f56d 100644 --- a/mindspore/dataset/core/config.py +++ b/mindspore/dataset/core/config.py @@ -112,11 +112,16 @@ def set_prefetch_size(size): Set the number of rows to be prefetched. Args: - size (int): Total number of rows to be prefetched. + size (int): Total number of rows to be prefetched per operator per parallel worker. Raises: ValueError: If prefetch_size is invalid (<= 0 or > MAX_INT_32). + Note: + Since total memory used for prefetch can grow very large with high number of workers, + when number of workers is > 4, the per worker prefetch size will be reduced. The actual + prefetch size at runtime per worker will be prefetchsize * (4 / num_parallel_workers). + Examples: >>> # Set a new global configuration value for the prefetch size. >>> ds.config.set_prefetch_size(1000) diff --git a/tests/ut/cpp/dataset/c_api_dataset_config_test.cc b/tests/ut/cpp/dataset/c_api_dataset_config_test.cc index bca8a78827..80bd63400c 100644 --- a/tests/ut/cpp/dataset/c_api_dataset_config_test.cc +++ b/tests/ut/cpp/dataset/c_api_dataset_config_test.cc @@ -42,7 +42,7 @@ TEST_F(MindDataTestPipeline, TestConfigSetting) { EXPECT_EQ(load_status, true); // Test configuration loaded - EXPECT_EQ(config::get_num_parallel_workers(), 4); + EXPECT_EQ(config::get_num_parallel_workers(), 8); EXPECT_EQ(config::get_prefetch_size(), 16); EXPECT_EQ(config::get_seed(), 5489); EXPECT_EQ(config::get_monitor_sampling_interval(), 15); diff --git a/tests/ut/data/dataset/declient.cfg b/tests/ut/data/dataset/declient.cfg index 36ca5ec3d9..36c8a45532 100644 --- a/tests/ut/data/dataset/declient.cfg +++ b/tests/ut/data/dataset/declient.cfg @@ -1,7 +1,7 @@ { "logFilePath": "/tmp", "rowsPerBuffer": 1, - "numParallelWorkers": 4, + "numParallelWorkers": 8, "workerConnectorSize": 16, "opConnectorSize": 16, "seed": 5489, diff --git a/tests/ut/python/dataset/test_config.py b/tests/ut/python/dataset/test_config.py index 632b8a57f1..41f0035201 100644 --- a/tests/ut/python/dataset/test_config.py +++ b/tests/ut/python/dataset/test_config.py @@ -44,7 +44,7 @@ def test_basic(): ds.config.load('../data/dataset/declient.cfg') # assert ds.config.get_rows_per_buffer() == 32 - assert ds.config.get_num_parallel_workers() == 4 + assert ds.config.get_num_parallel_workers() == 8 # assert ds.config.get_worker_connector_size() == 16 assert ds.config.get_prefetch_size() == 16 assert ds.config.get_seed() == 5489 @@ -348,7 +348,7 @@ def test_deterministic_python_seed_multi_thread(): try: np.testing.assert_equal(data1_output, data2_output) except Exception as e: - # expect output to not match during multi-threaded excution + # expect output to not match during multi-threaded execution logger.info("Got an exception in DE: {}".format(str(e))) assert "Array" in str(e)