From ae36071fc8e2ed4c881ab5c38b0a2929143650b3 Mon Sep 17 00:00:00 2001 From: xiefangqi Date: Mon, 9 Nov 2020 16:06:47 +0800 Subject: [PATCH] add push opt logic --- .../ccsrc/minddata/dataset/CMakeLists.txt | 32 ++- .../python/bindings/dataset/core/bindings.cc | 1 + .../minddata/dataset/core/config_manager.cc | 2 + .../minddata/dataset/core/config_manager.h | 15 + .../dataset/engine/cache/CMakeLists.txt | 20 +- .../engine/datasetops/device_queue_op.cc | 271 +++++++++++------- .../engine/datasetops/device_queue_op.h | 23 +- .../minddata/dataset/engine/execution_tree.cc | 29 ++ .../minddata/dataset/engine/execution_tree.h | 6 + .../dataset/engine/gpu_item_connector.h | 84 ++++++ .../runtime/device/gpu/blocking_queue.cc | 4 +- .../ccsrc/runtime/device/gpu/blocking_queue.h | 7 +- .../runtime/device/gpu/gpu_buffer_mgr.cc | 2 +- .../ccsrc/runtime/device/gpu/gpu_buffer_mgr.h | 2 +- mindspore/dataset/core/config.py | 22 ++ mindspore/dataset/engine/datasets.py | 10 +- 16 files changed, 386 insertions(+), 144 deletions(-) create mode 100644 mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index 64461cd175..a37e362fb5 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -28,6 +28,26 @@ if (MS_BUILD_GRPC) message(STATUS "Cache is enabled") endif() +if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") + # Try to find numa header file and its library + FIND_PATH(NUMA_INCLUDE_DIR numa.h) + MESSAGE("Numa include dir is: ${NUMA_INCLUDE_DIR}") + + FIND_LIBRARY(NUMA_LIBRARY NAMES libnuma.so) + MESSAGE("Numa library is: ${NUMA_LIBRARY}") + + FIND_PACKAGE_HANDLE_STANDARD_ARGS(NUMA DEFAULT_MSG + NUMA_INCLUDE_DIR + NUMA_LIBRARY + ) + if (NUMA_FOUND) + ADD_DEFINITIONS(-DNUMA_ENABLED) + MESSAGE("Numa package found") + else() + MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'") + endif() +endif () + # conde coverage # option(ENABLE_COVERAGE "Enable code coverage report" OFF) # if (ENABLE_COVERAGE) @@ -177,14 +197,18 @@ else () target_link_libraries(_c_dataengine PRIVATE -ldl ${SECUREC_LIBRARY}) endif () target_link_libraries(_c_dataengine PUBLIC mindspore::sentencepiece) -endif () + if (NUMA_FOUND) + target_link_libraries(_c_dataengine PUBLIC numa) + endif() +endif() + target_link_libraries(_c_dataengine PUBLIC mindspore::jpeg_turbo mindspore::turbojpeg mindspore::opencv_core mindspore::opencv_imgcodecs mindspore::opencv_imgproc mindspore::tinyxml2 mindspore::sentencepiece_train ${ICU_LIB}) if (ENABLE_GPUQUE) target_link_libraries(_c_dataengine PRIVATE gpu_queue - ${CUDNN_LIBRARY_PATH} - ${CUDA_PATH}/lib64/libcudart.so - ${CUDA_PATH}/lib64/stubs/libcuda.so) + ${CUDNN_LIBRARY_PATH} + ${CUDA_PATH}/lib64/libcudart.so + ${CUDA_PATH}/lib64/stubs/libcuda.so) endif () if (ENABLE_TDTQUE) diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc index 6aab9f4f8f..f7c945425d 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/core/bindings.cc @@ -42,6 +42,7 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) { .def("get_op_connector_size", &ConfigManager::op_connector_size) .def("get_rows_per_buffer", &ConfigManager::rows_per_buffer) .def("get_seed", &ConfigManager::seed) + .def("set_rank_id", &ConfigManager::set_rank_id) .def("get_worker_connector_size", &ConfigManager::worker_connector_size) .def("set_auto_num_workers", &ConfigManager::set_auto_num_workers) .def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_) diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc index a8bea9cf05..08015e7527 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.cc @@ -128,6 +128,8 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector uint32_t ConfigManager::seed() const { return seed_; } +void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; } + void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } diff --git a/mindspore/ccsrc/minddata/dataset/core/config_manager.h b/mindspore/ccsrc/minddata/dataset/core/config_manager.h index 609cee8da0..380074fbf4 100644 --- a/mindspore/ccsrc/minddata/dataset/core/config_manager.h +++ b/mindspore/ccsrc/minddata/dataset/core/config_manager.h @@ -143,6 +143,17 @@ class ConfigManager { /// \param prefetch_size void set_prefetch_size(int32_t prefetch_size); + // getter function + // This rank_id is for numa and device_queue, one process work with only one rank_id + // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', + // but for distribute scenario, this rank_id come from _get_global_rank() in python + // @return Get the current device id, for one process, it's only with one rank_id. + uint32_t rank_id() const { return rank_id_; } + + // setter function + // @param rank_id - Set the current device id + void set_rank_id(uint32_t rank_id); + uint32_t seed() const; // setter function @@ -196,6 +207,10 @@ class ConfigManager { int32_t num_parallel_workers_; int32_t worker_connector_size_; int32_t op_connector_size_; + // This rank_id is for numa and device_queue, one process work with only one rank_id, + // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', + // but for distribute scenario, this rank_id come from _get_global_rank() in python + uint32_t rank_id_; uint32_t seed_; uint32_t monitor_sampling_interval_; uint32_t callback_timout_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt index 1318ce63b0..f225daa330 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt @@ -6,26 +6,8 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) -if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") +if (NUMA_FOUND) ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT) - - # Try to find numa header file and its library - FIND_PATH( NUMA_INCLUDE_DIR numa.h ) - MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" ) - - FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so ) - MESSAGE( "Numa library is: ${NUMA_LIBRARY}" ) - - FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG - NUMA_INCLUDE_DIR - NUMA_LIBRARY - ) - if ( NUMA_FOUND ) - ADD_DEFINITIONS(-DNUMA_ENABLED) - MESSAGE("Numa package found") - else() - MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'") - endif() endif () add_library(engine-cache-client OBJECT diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 327942d48a..a7ad09987f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/core/global_context.h" #include "minddata/dataset/engine/data_buffer.h" @@ -43,6 +44,16 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i stop_send_(false), total_batch_(total_batch), create_data_info_queue_(create_data_info_queue) { +#ifdef ENABLE_GPUQUE + std::shared_ptr cfg = GlobalContext::config_manager(); + rank_id_ = cfg->rank_id(); // Get the current rank_id + // Be careful when try to modified these num_workers_ and queue_capacity_, + // and we suggest num_workers_ * queue_capacity_ not greater than 16, because + // one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_ + // must limit to avoid memory overload + num_workers_ = 2; + queue_capacity_ = 8; +#endif #ifdef ENABLE_TDTQUE ascend_keep_waiting_ = true; #endif @@ -51,9 +62,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i DeviceQueueOp::~DeviceQueueOp() {} #ifdef ENABLE_GPUQUE -void DeviceQueueOp::ReleaseData(void *addr) { +void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) { if (addr != nullptr) { - pool_->Deallocate(addr); + pool_[worker_id]->Deallocate(addr); } } #endif @@ -96,7 +107,6 @@ Status DeviceQueueOp::operator()() { #endif } else if (device_type_ == DeviceType::GPU) { #ifdef ENABLE_GPUQUE - RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_, -1, 1024, false, true)); RETURN_IF_NOT_OK(SendDataToGPU()); #endif } else if (device_type_ == DeviceType::CPU) { @@ -226,17 +236,38 @@ Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) { #endif #ifdef ENABLE_GPUQUE -Status DeviceQueueOp::SendDataToGPU() { - MS_LOG(INFO) << "Device queue, sending data to GPU."; - int64_t send_batch = 0; - bool is_break_loop = false; - bool is_open = false; - uint32_t handle = INVALID_HANDLE; - auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1); - double batch_start_time, end_time; - int32_t batch_cost, push_cost; +Status DeviceQueueOp::LaunchParallelCopyThread() { + // Without cudaSetDevice cuda memory will allocate on GPU:0 as default + // and will overload in distribute scenario, so don't remove this line + cudaSetDevice(rank_id_); + // CircularPool may not safe under multi-threads scenario, so one worker with one pool + for (int i = 0; i < num_workers_; i++) { + std::shared_ptr pool; + RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool, -1, 1024, false, true)); + pool_.push_back(pool); + } + gpu_item_connector_ = std::make_unique(num_workers_, 1, queue_capacity_); + receive_queues_.Init(num_workers_, queue_capacity_); + RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks())); + RETURN_IF_NOT_OK( + tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1))); + RETURN_IF_NOT_OK( + tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue", std::bind(&DeviceQueueOp::PushDataToGPU, this))); + + return Status::OK(); +} + +Status DeviceQueueOp::PushDataToGPU() { + // Without cudaSetDevice cuda memory will allocate on GPU:0 as default + // and will overload in distribute scenario, so don't remove this line + cudaSetDevice(rank_id_); + TaskManager::FindMe()->Post(); + double batch_start_time = 0.0; + double end_time = 0.0; + int32_t batch_cost = 0; + int32_t push_cost = 0; int32_t connector_size = 0; - int32_t connector_capacity; + int32_t connector_capacity = 0; std::shared_ptr profiling_node; bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable(); if (isProfilingEnable) { @@ -244,135 +275,161 @@ Status DeviceQueueOp::SendDataToGPU() { RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node)); profiling_node = std::dynamic_pointer_cast(node); batch_start_time = ProfilingTime::GetCurMilliSecond(); - connector_capacity = ChildOpConnectorCapacity(); + connector_capacity = gpu_item_connector_->capacity(); } + std::vector items; + RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items)); + bool is_open = false; + uint32_t handle = INVALID_HANDLE; + int64_t send_batch = 0; + bool ps_data_prefetch = false; + auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2); + while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) { + if (!is_open) { + std::vector data_size; + for (int32_t index = 0; index < items.size(); index++) { + data_size.push_back(items[index].data_len_); + } + handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function); + if (handle == INVALID_HANDLE) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Failed to open channel for sending data."); + } + is_open = true; + } - std::unique_ptr current_buffer; - RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); - - while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { - while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { - RETURN_IF_NOT_OK(CheckExceptions(current_buffer)); - TensorRow curr_row; // batch data - for (int row_id = 0; - row_id < current_buffer->NumRows() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) { - RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row)); - - std::vector data_size; - for (int i = 0; i < curr_row.size(); i++) { - data_size.push_back(static_cast(curr_row[i]->SizeInBytes())); - } - if (!is_open) { - handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function); - if (handle == INVALID_HANDLE) { - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Failed to open channel for sending data."); + // Data prefetch only when PS mode enables cache. + if ((!ps_data_prefetch) && (items.size() > 0)) { + ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_); + ps_data_prefetch = true; + } + while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { + BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); + if (ret) { + if (ret == BlockQueueStatus_T::ERROR_INPUT) { + return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); + } else { + if (!stop_send_) { + MS_LOG(DEBUG) << "Retry pushing data..."; + continue; } - is_open = true; - } - RETURN_IF_NOT_OK(RetryPushGPUData(data_size, curr_row, handle, isProfilingEnable, &push_cost)); - send_batch++; - if (isProfilingEnable) { - end_time = ProfilingTime::GetCurMilliSecond(); - // record push data time - profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost); - batch_cost = (int32_t)(end_time - batch_start_time); - // record batch time - profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost); - // record pipeline time - profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost); - batch_start_time = end_time; - // record connector depth - profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size); - } - if (total_batch_ > 0 && send_batch >= total_batch_) { - is_break_loop = true; break; } - } - if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { - if (isProfilingEnable) { - connector_size = ChildOpConnectorSize(); - connector_capacity = ChildOpConnectorCapacity(); - } - RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } else { - is_break_loop = true; + break; } } + send_batch++; + if (isProfilingEnable) { + end_time = ProfilingTime::GetCurMilliSecond(); + // record push data time + profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost); + batch_cost = (int32_t)(end_time - batch_start_time); + // record batch time + profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost); + // record pipeline time + profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost); + batch_start_time = end_time; + // record connector depth + profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size); + connector_size = gpu_item_connector_->size(); + connector_capacity = gpu_item_connector_->capacity(); + } + if (total_batch_ > 0 && send_batch >= total_batch_) { + break; + } if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { - if (isProfilingEnable) { - connector_size = ChildOpConnectorSize(); - connector_capacity = ChildOpConnectorCapacity(); - } - RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); + RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items)); } else { - is_break_loop = true; + break; } } tree_->SetFinished(); - MS_LOG(INFO) << "Device queue total batch is " << send_batch << "."; + MS_LOG(INFO) << "Device queue send " << send_batch << " batch."; GpuBufferMgr::GetInstance().Close(handle); GpuBufferMgr::GetInstance().CloseConfirm(); return Status::OK(); } -Status DeviceQueueOp::RetryPushGPUData(const std::vector &data_size, const TensorRow &curr_row, uint32_t handle, - bool profiling, int32_t *push_time) { - std::vector items; - double start_time; - bool ps_data_prefetch = false; - for (int i = 0; i < data_size.size(); i++) { - device::DataItemGpu data_item; - data_item.data_len_ = data_size[i]; - data_item.data_ptr_ = nullptr; - items.push_back(data_item); +// WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization. +Status DeviceQueueOp::WorkerEntry(int32_t worker_id) { + // Without cudaSetDevice cuda memory will allocate on GPU:0 as default + // and will overload in distribute scenario, so don't remove this line + cudaSetDevice(rank_id_); + TaskManager::FindMe()->Post(); + std::unique_ptr current_buffer; + uint32_t batch_num = 0; + RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_buffer)); + while (!current_buffer->quit() && !GpuBufferMgr::GetInstance().IsClosed()) { + TensorRow curr_row; + for (int row_id = 0; row_id < current_buffer->NumRows() && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) { + RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row)); + std::vector items; + for (int i = 0; i < curr_row.size(); i++) { + device::DataItemGpu data_item; + data_item.data_len_ = static_cast(curr_row[i]->SizeInBytes()); + data_item.data_ptr_ = nullptr; + data_item.worker_id_ = worker_id; + items.push_back(data_item); + } + RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row, worker_id)); + RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items))); + batch_num++; + } + + RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_buffer)); } - while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) { - RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row)); - if (profiling) { - start_time = ProfilingTime::GetCurMilliSecond(); - } - // Data prefetch only when PS mode enables cache. - if ((!ps_data_prefetch) && (items.size() > 0)) { - ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_); - ps_data_prefetch = true; - } - BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); - if (profiling) { - double end_time = ProfilingTime::GetCurMilliSecond(); - *push_time = (int32_t)(end_time - start_time); - } - if (ret) { - for (int i = 0; i < items.size(); i++) { - ReleaseData(items[i].data_ptr_); - } - if (ret == BlockQueueStatus_T::ERROR_INPUT) { - return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "Invalid input data, please check it."); + MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch."; + // Add empty vector as quit flag. + std::vector items; + RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items))); + return Status::OK(); +} + +Status DeviceQueueOp::SendDataToGPU() { + RETURN_IF_NOT_OK(LaunchParallelCopyThread()); + MS_LOG(INFO) << "Device queue, sending data to GPU."; + std::unique_ptr current_buffer; + RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); + int64_t num_buf = 0; + bool is_break_loop = false; + while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { + while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { + RETURN_IF_NOT_OK(CheckExceptions(current_buffer)); + RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_buffer))); + if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { + RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } else { - if (!stop_send_) { - MS_LOG(DEBUG) << "Retry pushing data..."; - continue; - } - break; + is_break_loop = true; } + } + + if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { + RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } else { - break; + is_break_loop = true; } } + + for (uint32_t index = 0; index < num_workers_; index++) { + auto quit = std::make_unique(0, DataBuffer::kDeBFlagQuit); + RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit))); + } + + MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch."; return Status::OK(); } -Status DeviceQueueOp::MallocForGPUData(std::vector *items, const TensorRow &curr_row) { +Status DeviceQueueOp::MallocForGPUData(std::vector *items, const TensorRow &curr_row, + const int32_t &worker_id) { int i = 0; for (auto &sub_item : *items) { - RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_)); + RETURN_IF_NOT_OK(pool_[worker_id]->Allocate(sub_item.data_len_, &sub_item.data_ptr_)); if (sub_item.data_ptr_ == nullptr) { return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__, "Memory malloc failed."); } - (void)memset_s(sub_item.data_ptr_, sub_item.data_len_, 0, sub_item.data_len_); const unsigned char *column_data = curr_row[i]->GetBuffer(); if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data, static_cast(curr_row[i++]->SizeInBytes())) != 0) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h index 7804e123a3..c96b38ca3b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h @@ -32,8 +32,10 @@ #endif #ifdef ENABLE_GPUQUE +#include "minddata/dataset/engine/gpu_item_connector.h" #include "minddata/dataset/util/circular_pool.h" #include "runtime/device/gpu/gpu_buffer_mgr.h" +#include "ps/ps_cache/ps_data/ps_data_prefetch.h" using mindspore::device::BlockQueueStatus_T; using mindspore::device::GpuBufferMgr; #endif @@ -189,12 +191,21 @@ class DeviceQueueOp : public PipelineOp { #ifdef ENABLE_GPUQUE Status SendDataToGPU(); - Status RetryPushGPUData(const std::vector &data_size, const TensorRow &curr_row, uint32_t handle, - bool profiling, int32_t *push_time); - Status MallocForGPUData(std::vector *items, const TensorRow &curr_row); - void ReleaseData(void *addr); - - std::shared_ptr pool_; + Status MallocForGPUData(std::vector *items, const TensorRow &curr_row, const int32_t &worker_id); + void ReleaseData(void *addr, int32_t worker_id); + Status LaunchParallelCopyThread(); + Status PushDataToGPU(); + Status WorkerEntry(int32_t worker_id); + + QueueList> receive_queues_; + std::vector> pool_; + std::unique_ptr gpu_item_connector_; + uint32_t num_workers_; + uint32_t queue_capacity_; + // This rank_id is for device_queue, one process work with only one rank_id, + // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', + // but for distribute scenario, this rank_id come from _get_global_rank() in python + uint32_t rank_id_; #endif Status SendDataToCPU(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc index 2347cf2fab..5e0ce6b2a4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc @@ -17,6 +17,10 @@ #include #include #include +#include +#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) +#include +#endif #include "minddata/dataset/engine/datasetops/dataset_op.h" #include "minddata/dataset/engine/datasetops/shuffle_op.h" #include "minddata/dataset/engine/datasetops/device_queue_op.h" @@ -42,6 +46,10 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) { prepare_flags_ = kDePrepNone; profiling_manager_ = std::make_unique(this); optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false; +#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) + std::shared_ptr cfg = GlobalContext::config_manager(); + rank_id_ = cfg->rank_id(); +#endif } // Destructor @@ -137,6 +145,27 @@ Status ExecutionTree::Launch() { // opencv limit too many threads #ifndef ENABLE_ANDROID #if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__) +#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) + // Here we do numa bind for performance optimization, as our test result, + // if we do numa bind when get_dataset_size launch a tree, we'll get a + // better performance than only we do numa bind at the time _To_Device + // launch a tree. Our numa bind work is a process level bind, bind with + // both cpu and memory and we choose numa_node with a polling logic: + // numa_bind_id = rank_id_ % (numa_max_node() + 1) + // Now we only test pass in GPU scenario, we've not tested D scenario, + // without enough test we don't suggest numa feature open in D scenario + int numa_node_max_id = numa_max_node(); + if (numa_node_max_id >= 0 && rank_id_ >= 0) { + uint32_t numa_bind_id = static_cast(rank_id_ % (numa_node_max_id + 1)); + auto bm = numa_allocate_nodemask(); + numa_bitmask_clearall(bm); + numa_bitmask_setbit(bm, numa_bind_id); + numa_bind(bm); + numa_bitmask_free(bm); + } else { + RETURN_STATUS_UNEXPECTED("Get numa max node failed."); + } +#endif int32_t thread_num = get_nprocs(); if (thread_num == 0) { std::string err_msg = "Invalid thread number."; diff --git a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h index 1ca698c79a..8afea65e7d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h +++ b/mindspore/ccsrc/minddata/dataset/engine/execution_tree.h @@ -282,6 +282,12 @@ class ExecutionTree { bool optimize_; // Flag to enable optional optimizations std::function pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare() bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes. +#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE) + // This rank_id is for numa and device_queue, one process work with only one rank_id, + // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', + // but for distribute scenario, this rank_id come from _get_global_rank() in python + uint32_t rank_id_; +#endif }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h b/mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h new file mode 100644 index 0000000000..6571eff679 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/gpu_item_connector.h @@ -0,0 +1,84 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ + +#ifdef ENABLE_GPUQUE +#include +#include +#include +#include +#include "minddata/dataset/engine/connector.h" +#include "minddata/dataset/util/status.h" +#include "minddata/dataset/core/constants.h" +#include "runtime/device/gpu/blocking_queue.h" + +using mindspore::device::DataItemGpu; + +namespace mindspore { +namespace dataset { +class GpuItemConnector : public Connector> { + public: + GpuItemConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity) + : Connector>(num_producers, num_consumers, queue_capacity) { + for (int i = 0; i < num_producers; i++) { + is_queue_finished_.push_back(false); + } + } + + ~GpuItemConnector() = default; + + Status Add(int32_t worker_d, std::vector &&element) noexcept { + return Connector>::Push(worker_d, std::move(element)); + } + + Status Pop(int32_t worker_id, std::vector *result) noexcept override { + { + MS_ASSERT(worker_id < num_consumers_); + std::unique_lock lock(m_); + RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; })); + if (is_queue_finished_[pop_from_]) { + std::string errMsg = "ERROR: popping from a finished queue in GpuItemConnector"; + RETURN_STATUS_UNEXPECTED(errMsg); + } + + RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result)); + if ((*result).empty()) { + is_queue_finished_[pop_from_] = true; + } + + for (int offset = 1; offset <= num_producers_; offset++) { + int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_; + if (is_queue_finished_[nextQueueIndex] == false) { + pop_from_ = nextQueueIndex; + break; + } + } + + expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; + } + + cv_.NotifyAll(); + return Status::OK(); + } + + private: + std::vector is_queue_finished_; +}; +} // namespace dataset +} // namespace mindspore +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ +#endif // ENABLE_GPUQUE diff --git a/mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc b/mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc index 6cd18d223f..ff48c6de41 100644 --- a/mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc +++ b/mindspore/ccsrc/runtime/device/gpu/blocking_queue.cc @@ -72,7 +72,7 @@ BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const { *len = len_; for (auto item : node_info_[head_].data_) { - host_release_(item.data_ptr_); + host_release_(item.data_ptr_, item.worker_id_); } return SUCCESS; } @@ -105,7 +105,7 @@ BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector & return SUCCESS; } -void BlockingQueue::RegisterRelease(const std::function &func) { queue_->RegisterRelease(func); } +void BlockingQueue::RegisterRelease(const std::function &func) { queue_->RegisterRelease(func); } BlockQueueStatus_T BlockingQueue::Push(const std::vector &data, unsigned int) { std::unique_lock locker(mutex_); diff --git a/mindspore/ccsrc/runtime/device/gpu/blocking_queue.h b/mindspore/ccsrc/runtime/device/gpu/blocking_queue.h index 94dee538bf..52bb954d52 100644 --- a/mindspore/ccsrc/runtime/device/gpu/blocking_queue.h +++ b/mindspore/ccsrc/runtime/device/gpu/blocking_queue.h @@ -33,6 +33,7 @@ namespace device { enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; struct DataItemGpu { + int32_t worker_id_; size_t data_len_; void *data_ptr_; }; @@ -42,7 +43,7 @@ class GpuQueue { GpuQueue(void *addr, const std::vector &shape, const size_t &capacity); virtual ~GpuQueue(); - void RegisterRelease(const std::function &func) { host_release_ = func; } + void RegisterRelease(const std::function &func) { host_release_ = func; } inline bool IsEmpty() const { return size_ == 0; } inline bool IsFull() const { return size_ == capacity_; } @@ -69,7 +70,7 @@ class GpuQueue { size_t capacity_; cudaStream_t stream_; std::unique_ptr node_info_; - std::function host_release_; + std::function host_release_; GpuQueue(const GpuQueue &) = delete; GpuQueue &operator=(const GpuQueue &) = delete; @@ -81,7 +82,7 @@ class BlockingQueue { ~BlockingQueue() = default; BlockQueueStatus_T Create(void *addr, const std::vector &shape, const size_t &capacity); - void RegisterRelease(const std::function &func); + void RegisterRelease(const std::function &func); BlockQueueStatus_T Push(const std::vector &data, unsigned int timeout_in_sec); BlockQueueStatus_T Front(void **ptr, size_t *len); BlockQueueStatus_T Pop(); diff --git a/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc b/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc index b03882695e..5d56f1e71c 100644 --- a/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc +++ b/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.cc @@ -66,7 +66,7 @@ BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::strin } unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name, - const std::vector &shape, const std::function func) { + const std::vector &shape, const std::function func) { set_device(); std::string name = std::to_string(device_id) + std::string("_") + channel_name; if (!name_queue_map_.count(name)) { diff --git a/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h b/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h index de25b948c6..59febe418c 100644 --- a/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h +++ b/mindspore/ccsrc/runtime/device/gpu/gpu_buffer_mgr.h @@ -85,7 +85,7 @@ class GpuBufferMgr { // call for Push thread EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape, - std::function func); + std::function func); // call for Front/Pop thread EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector &shape); diff --git a/mindspore/dataset/core/config.py b/mindspore/dataset/core/config.py index 54eed4117f..28a5c2cc3e 100644 --- a/mindspore/dataset/core/config.py +++ b/mindspore/dataset/core/config.py @@ -16,6 +16,7 @@ The configuration module provides various functions to set and get the supported configuration parameters, and read a configuration file. """ +import os import random import numpy import mindspore._c_dataengine as cde @@ -29,6 +30,27 @@ UINT32_MAX = 4294967295 _config = cde.GlobalContext.config_manager() +def _init_device_info(): + """ + INTERNAL USE ONLY! + As rank_id need to pass into deep layer for numa and device_queue. + One process work with only one rank_id, In standalone scenario, + rank_id may come from env 'CUDA_VISIBLE_DEVICES', For distribute + scenario, rank_id come from _get_global_rank() + """ + from mindspore import context + from mindspore.parallel._auto_parallel_context import auto_parallel_context + from mindspore.parallel._utils import _get_global_rank + if context.get_context("device_target") == "GPU": + rank_id = _get_global_rank() + parallel_mode = auto_parallel_context().get_parallel_mode() + if parallel_mode == "stand_alone": + cuda_device_info = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_device_info: + cuda_id = int(cuda_device_info.split(",")[0].strip()) + if cuda_id != rank_id: + rank_id = cuda_id + _config.set_rank_id(rank_id) def set_seed(seed): """ diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index 92bc811168..f4101de3d5 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -52,7 +52,7 @@ from .validators import check_batch, check_shuffle, check_map, check_filter, che check_generatordataset, check_sync_wait, check_zip_dataset, check_add_column, check_textfiledataset, check_concat, \ check_random_dataset, check_split, check_bucket_batch_by_length, check_cluedataset, check_save, check_csvdataset, \ check_paddeddataset, check_tuple_iterator, check_dict_iterator, check_schema, check_to_device_send, replace_none -from ..core.config import get_callback_timeout +from ..core.config import get_callback_timeout, _init_device_info from ..core.datatypes import mstype_to_detype, mstypelist_to_detypelist try: @@ -141,11 +141,19 @@ class Dataset: self._sync = False def create_ir_tree(self): + """ + Internal method to create an IR tree. + + Returns: + ir_tree, The onject of the IR tree. + dataset, the root dataset of the IR tree. + """ parent = self.parent self.parent = [] dataset = copy.deepcopy(self) ir_tree = dataset.parse_tree() self.parent = parent + _init_device_info() return ir_tree, dataset def parse_tree(self):