add push opt logic

pull/8378/head
xiefangqi 4 years ago
parent 0b1bc3b9ce
commit ae36071fc8

@ -28,6 +28,26 @@ if (MS_BUILD_GRPC)
message(STATUS "Cache is enabled")
endif()
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# Try to find numa header file and its library
FIND_PATH(NUMA_INCLUDE_DIR numa.h)
MESSAGE("Numa include dir is: ${NUMA_INCLUDE_DIR}")
FIND_LIBRARY(NUMA_LIBRARY NAMES libnuma.so)
MESSAGE("Numa library is: ${NUMA_LIBRARY}")
FIND_PACKAGE_HANDLE_STANDARD_ARGS(NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if (NUMA_FOUND)
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()
# conde coverage
# option(ENABLE_COVERAGE "Enable code coverage report" OFF)
# if (ENABLE_COVERAGE)
@ -177,14 +197,18 @@ else ()
target_link_libraries(_c_dataengine PRIVATE -ldl ${SECUREC_LIBRARY})
endif ()
target_link_libraries(_c_dataengine PUBLIC mindspore::sentencepiece)
endif ()
if (NUMA_FOUND)
target_link_libraries(_c_dataengine PUBLIC numa)
endif()
endif()
target_link_libraries(_c_dataengine PUBLIC mindspore::jpeg_turbo mindspore::turbojpeg mindspore::opencv_core mindspore::opencv_imgcodecs
mindspore::opencv_imgproc mindspore::tinyxml2 mindspore::sentencepiece_train ${ICU_LIB})
if (ENABLE_GPUQUE)
target_link_libraries(_c_dataengine PRIVATE gpu_queue
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so
${CUDA_PATH}/lib64/stubs/libcuda.so)
${CUDNN_LIBRARY_PATH}
${CUDA_PATH}/lib64/libcudart.so
${CUDA_PATH}/lib64/stubs/libcuda.so)
endif ()
if (ENABLE_TDTQUE)

@ -42,6 +42,7 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) {
.def("get_op_connector_size", &ConfigManager::op_connector_size)
.def("get_rows_per_buffer", &ConfigManager::rows_per_buffer)
.def("get_seed", &ConfigManager::seed)
.def("set_rank_id", &ConfigManager::set_rank_id)
.def("get_worker_connector_size", &ConfigManager::worker_connector_size)
.def("set_auto_num_workers", &ConfigManager::set_auto_num_workers)
.def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_)

@ -128,6 +128,8 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector
uint32_t ConfigManager::seed() const { return seed_; }
void ConfigManager::set_rank_id(uint32_t rank_id) { rank_id_ = rank_id; }
void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }
void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }

@ -143,6 +143,17 @@ class ConfigManager {
/// \param prefetch_size
void set_prefetch_size(int32_t prefetch_size);
// getter function
// This rank_id is for numa and device_queue, one process work with only one rank_id
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
// @return Get the current device id, for one process, it's only with one rank_id.
uint32_t rank_id() const { return rank_id_; }
// setter function
// @param rank_id - Set the current device id
void set_rank_id(uint32_t rank_id);
uint32_t seed() const;
// setter function
@ -196,6 +207,10 @@ class ConfigManager {
int32_t num_parallel_workers_;
int32_t worker_connector_size_;
int32_t op_connector_size_;
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
uint32_t seed_;
uint32_t monitor_sampling_interval_;
uint32_t callback_timout_;

@ -6,26 +6,8 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
if (NUMA_FOUND)
ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT)
# Try to find numa header file and its library
FIND_PATH( NUMA_INCLUDE_DIR numa.h )
MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" )
FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so )
MESSAGE( "Numa library is: ${NUMA_LIBRARY}" )
FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if ( NUMA_FOUND )
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()
add_library(engine-cache-client OBJECT

@ -32,8 +32,10 @@
#endif
#ifdef ENABLE_GPUQUE
#include "minddata/dataset/engine/gpu_item_connector.h"
#include "minddata/dataset/util/circular_pool.h"
#include "runtime/device/gpu/gpu_buffer_mgr.h"
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
using mindspore::device::BlockQueueStatus_T;
using mindspore::device::GpuBufferMgr;
#endif
@ -189,12 +191,21 @@ class DeviceQueueOp : public PipelineOp {
#ifdef ENABLE_GPUQUE
Status SendDataToGPU();
Status RetryPushGPUData(const std::vector<size_t> &data_size, const TensorRow &curr_row, uint32_t handle,
bool profiling, int32_t *push_time);
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row);
void ReleaseData(void *addr);
std::shared_ptr<MemoryPool> pool_;
Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id);
void ReleaseData(void *addr, int32_t worker_id);
Status LaunchParallelCopyThread();
Status PushDataToGPU();
Status WorkerEntry(int32_t worker_id);
QueueList<std::unique_ptr<DataBuffer>> receive_queues_;
std::vector<std::shared_ptr<MemoryPool>> pool_;
std::unique_ptr<GpuItemConnector> gpu_item_connector_;
uint32_t num_workers_;
uint32_t queue_capacity_;
// This rank_id is for device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
#endif
Status SendDataToCPU();

@ -17,6 +17,10 @@
#include <iostream>
#include <string>
#include <utility>
#include <limits>
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
#include <numa.h>
#endif
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/shuffle_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
@ -42,6 +46,10 @@ ExecutionTree::ExecutionTree() : id_count_(0), pre_pass_override_(nullptr) {
prepare_flags_ = kDePrepNone;
profiling_manager_ = std::make_unique<ProfilingManager>(this);
optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false;
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id();
#endif
}
// Destructor
@ -137,6 +145,27 @@ Status ExecutionTree::Launch() {
// opencv limit too many threads
#ifndef ENABLE_ANDROID
#if !defined(_WIN32) && !defined(_WIN64) && !defined(__APPLE__)
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
// Here we do numa bind for performance optimization, as our test result,
// if we do numa bind when get_dataset_size launch a tree, we'll get a
// better performance than only we do numa bind at the time _To_Device
// launch a tree. Our numa bind work is a process level bind, bind with
// both cpu and memory and we choose numa_node with a polling logic:
// numa_bind_id = rank_id_ % (numa_max_node() + 1)
// Now we only test pass in GPU scenario, we've not tested D scenario,
// without enough test we don't suggest numa feature open in D scenario
int numa_node_max_id = numa_max_node();
if (numa_node_max_id >= 0 && rank_id_ >= 0) {
uint32_t numa_bind_id = static_cast<uint32_t>(rank_id_ % (numa_node_max_id + 1));
auto bm = numa_allocate_nodemask();
numa_bitmask_clearall(bm);
numa_bitmask_setbit(bm, numa_bind_id);
numa_bind(bm);
numa_bitmask_free(bm);
} else {
RETURN_STATUS_UNEXPECTED("Get numa max node failed.");
}
#endif
int32_t thread_num = get_nprocs();
if (thread_num == 0) {
std::string err_msg = "Invalid thread number.";

@ -282,6 +282,12 @@ class ExecutionTree {
bool optimize_; // Flag to enable optional optimizations
std::function<OptPass(OptPass)> pre_pass_override_; // function ptr that overrides pre pass, called in PrePrepare()
bool partially_prepare_; // Temp: during migration to IR, if true, run remaining passes.
#if defined(NUMA_ENABLED) && defined(ENABLE_GPUQUE)
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
// but for distribute scenario, this rank_id come from _get_global_rank() in python
uint32_t rank_id_;
#endif
};
} // namespace dataset
} // namespace mindspore

@ -0,0 +1,84 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
#ifdef ENABLE_GPUQUE
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/connector.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/core/constants.h"
#include "runtime/device/gpu/blocking_queue.h"
using mindspore::device::DataItemGpu;
namespace mindspore {
namespace dataset {
class GpuItemConnector : public Connector<std::vector<device::DataItemGpu>> {
public:
GpuItemConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity)
: Connector<std::vector<device::DataItemGpu>>(num_producers, num_consumers, queue_capacity) {
for (int i = 0; i < num_producers; i++) {
is_queue_finished_.push_back(false);
}
}
~GpuItemConnector() = default;
Status Add(int32_t worker_d, std::vector<device::DataItemGpu> &&element) noexcept {
return Connector<std::vector<device::DataItemGpu>>::Push(worker_d, std::move(element));
}
Status Pop(int32_t worker_id, std::vector<device::DataItemGpu> *result) noexcept override {
{
MS_ASSERT(worker_id < num_consumers_);
std::unique_lock<std::mutex> lock(m_);
RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; }));
if (is_queue_finished_[pop_from_]) {
std::string errMsg = "ERROR: popping from a finished queue in GpuItemConnector";
RETURN_STATUS_UNEXPECTED(errMsg);
}
RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
if ((*result).empty()) {
is_queue_finished_[pop_from_] = true;
}
for (int offset = 1; offset <= num_producers_; offset++) {
int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_;
if (is_queue_finished_[nextQueueIndex] == false) {
pop_from_ = nextQueueIndex;
break;
}
}
expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
}
cv_.NotifyAll();
return Status::OK();
}
private:
std::vector<bool> is_queue_finished_;
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
#endif // ENABLE_GPUQUE

@ -72,7 +72,7 @@ BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
*len = len_;
for (auto item : node_info_[head_].data_) {
host_release_(item.data_ptr_);
host_release_(item.data_ptr_, item.worker_id_);
}
return SUCCESS;
}
@ -105,7 +105,7 @@ BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &
return SUCCESS;
}
void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }
BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
std::unique_lock<std::mutex> locker(mutex_);

@ -33,6 +33,7 @@ namespace device {
enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_NOT_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT };
struct DataItemGpu {
int32_t worker_id_;
size_t data_len_;
void *data_ptr_;
};
@ -42,7 +43,7 @@ class GpuQueue {
GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
virtual ~GpuQueue();
void RegisterRelease(const std::function<void(void *)> &func) { host_release_ = func; }
void RegisterRelease(const std::function<void(void *, int32_t)> &func) { host_release_ = func; }
inline bool IsEmpty() const { return size_ == 0; }
inline bool IsFull() const { return size_ == capacity_; }
@ -69,7 +70,7 @@ class GpuQueue {
size_t capacity_;
cudaStream_t stream_;
std::unique_ptr<NodeInfo[]> node_info_;
std::function<void(void *)> host_release_;
std::function<void(void *, int32_t)> host_release_;
GpuQueue(const GpuQueue &) = delete;
GpuQueue &operator=(const GpuQueue &) = delete;
@ -81,7 +82,7 @@ class BlockingQueue {
~BlockingQueue() = default;
BlockQueueStatus_T Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity);
void RegisterRelease(const std::function<void(void *)> &func);
void RegisterRelease(const std::function<void(void *, int32_t)> &func);
BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec);
BlockQueueStatus_T Front(void **ptr, size_t *len);
BlockQueueStatus_T Pop();

@ -66,7 +66,7 @@ BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::strin
}
unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
const std::vector<size_t> &shape, const std::function<void(void *)> func) {
const std::vector<size_t> &shape, const std::function<void(void *, int32_t)> func) {
set_device();
std::string name = std::to_string(device_id) + std::string("_") + channel_name;
if (!name_queue_map_.count(name)) {

@ -85,7 +85,7 @@ class GpuBufferMgr {
// call for Push thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape,
std::function<void(void *)> func);
std::function<void(void *, int32_t)> func);
// call for Front/Pop thread
EXPORT unsigned int Open(unsigned int device_id, const std::string &channel_name, const std::vector<size_t> &shape);

@ -16,6 +16,7 @@
The configuration module provides various functions to set and get the supported
configuration parameters, and read a configuration file.
"""
import os
import random
import numpy
import mindspore._c_dataengine as cde
@ -29,6 +30,27 @@ UINT32_MAX = 4294967295
_config = cde.GlobalContext.config_manager()
def _init_device_info():
"""
INTERNAL USE ONLY!
As rank_id need to pass into deep layer for numa and device_queue.
One process work with only one rank_id, In standalone scenario,
rank_id may come from env 'CUDA_VISIBLE_DEVICES', For distribute
scenario, rank_id come from _get_global_rank()
"""
from mindspore import context
from mindspore.parallel._auto_parallel_context import auto_parallel_context
from mindspore.parallel._utils import _get_global_rank
if context.get_context("device_target") == "GPU":
rank_id = _get_global_rank()
parallel_mode = auto_parallel_context().get_parallel_mode()
if parallel_mode == "stand_alone":
cuda_device_info = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_device_info:
cuda_id = int(cuda_device_info.split(",")[0].strip())
if cuda_id != rank_id:
rank_id = cuda_id
_config.set_rank_id(rank_id)
def set_seed(seed):
"""

@ -52,7 +52,7 @@ from .validators import check_batch, check_shuffle, check_map, check_filter, che
check_generatordataset, check_sync_wait, check_zip_dataset, check_add_column, check_textfiledataset, check_concat, \
check_random_dataset, check_split, check_bucket_batch_by_length, check_cluedataset, check_save, check_csvdataset, \
check_paddeddataset, check_tuple_iterator, check_dict_iterator, check_schema, check_to_device_send, replace_none
from ..core.config import get_callback_timeout
from ..core.config import get_callback_timeout, _init_device_info
from ..core.datatypes import mstype_to_detype, mstypelist_to_detypelist
try:
@ -141,11 +141,19 @@ class Dataset:
self._sync = False
def create_ir_tree(self):
"""
Internal method to create an IR tree.
Returns:
ir_tree, The onject of the IR tree.
dataset, the root dataset of the IR tree.
"""
parent = self.parent
self.parent = []
dataset = copy.deepcopy(self)
ir_tree = dataset.parse_tree()
self.parent = parent
_init_device_info()
return ir_tree, dataset
def parse_tree(self):

Loading…
Cancel
Save