|
|
|
@ -14,6 +14,8 @@
|
|
|
|
|
|
|
|
|
|
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
|
|
|
|
|
|
|
|
|
|
#include <algorithm>
|
|
|
|
|
|
|
|
|
|
namespace paddle {
|
|
|
|
|
namespace framework {
|
|
|
|
|
namespace details {
|
|
|
|
@ -27,6 +29,32 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ReduceLoDTensor {
|
|
|
|
|
const std::vector<LoDTensor> &src_tensors_;
|
|
|
|
|
LoDTensor &dst_tensor_;
|
|
|
|
|
|
|
|
|
|
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
|
|
|
|
|
: src_tensors_(src), dst_tensor_(*dst) {}
|
|
|
|
|
|
|
|
|
|
template <typename T>
|
|
|
|
|
void operator()() const {
|
|
|
|
|
PADDLE_ENFORCE(!src_tensors_.empty());
|
|
|
|
|
auto &t0 = src_tensors_[0];
|
|
|
|
|
PADDLE_ENFORCE_NE(t0.numel(), 0);
|
|
|
|
|
dst_tensor_.Resize(t0.dims());
|
|
|
|
|
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
|
|
|
|
|
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
|
|
|
|
|
|
|
|
|
|
for (size_t i = 1; i < src_tensors_.size(); ++i) {
|
|
|
|
|
auto &t = src_tensors_[i];
|
|
|
|
|
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
|
|
|
|
|
PADDLE_ENFORCE_EQ(t.type(), t0.type());
|
|
|
|
|
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
|
|
|
|
|
[](T a, T b) -> T { return a + b; });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void NCCLAllReduceOpHandle::RunImpl() {
|
|
|
|
|
if (inputs_.size() == 1) {
|
|
|
|
|
return; // No need to all reduce when GPU count = 1;
|
|
|
|
@ -41,37 +69,53 @@ void NCCLAllReduceOpHandle::RunImpl() {
|
|
|
|
|
int dtype = -1;
|
|
|
|
|
size_t numel = 0;
|
|
|
|
|
|
|
|
|
|
std::vector<std::function<void()>> all_reduce_calls;
|
|
|
|
|
std::vector<LoDTensor> lod_tensors;
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < local_scopes_.size(); ++i) {
|
|
|
|
|
auto &p = places_[i];
|
|
|
|
|
auto *s = local_scopes_[i];
|
|
|
|
|
int dev_id = boost::get<platform::CUDAPlace>(p).device;
|
|
|
|
|
|
|
|
|
|
auto &lod_tensor = s->FindVar(var_name)->Get<LoDTensor>();
|
|
|
|
|
void *buffer = const_cast<void *>(lod_tensor.data<void>());
|
|
|
|
|
lod_tensors.emplace_back(lod_tensor);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (platform::is_gpu_place(lod_tensors[0].place())) {
|
|
|
|
|
std::vector<std::function<void()>> all_reduce_calls;
|
|
|
|
|
for (size_t i = 0; i < local_scopes_.size(); ++i) {
|
|
|
|
|
auto &p = places_[i];
|
|
|
|
|
auto &lod_tensor = lod_tensors[i];
|
|
|
|
|
void *buffer = const_cast<void *>(lod_tensor.data<void>());
|
|
|
|
|
|
|
|
|
|
if (dtype == -1) {
|
|
|
|
|
dtype = platform::ToNCCLDataType(lod_tensor.type());
|
|
|
|
|
if (dtype == -1) {
|
|
|
|
|
dtype = platform::ToNCCLDataType(lod_tensor.type());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (numel == 0) {
|
|
|
|
|
numel = static_cast<size_t>(lod_tensor.numel());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int dev_id = boost::get<platform::CUDAPlace>(p).device;
|
|
|
|
|
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
|
|
|
|
|
auto stream = nccl_ctx.stream();
|
|
|
|
|
auto comm = nccl_ctx.comm_;
|
|
|
|
|
all_reduce_calls.emplace_back([=] {
|
|
|
|
|
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
|
|
|
|
|
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype),
|
|
|
|
|
ncclSum, comm, stream));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (numel == 0) {
|
|
|
|
|
numel = static_cast<size_t>(lod_tensor.numel());
|
|
|
|
|
platform::NCCLGroupGuard guard;
|
|
|
|
|
for (auto &call : all_reduce_calls) {
|
|
|
|
|
call();
|
|
|
|
|
}
|
|
|
|
|
} else { // Special handle CPU only Operator's gradient. Like CRF
|
|
|
|
|
framework::LoDTensor trg;
|
|
|
|
|
|
|
|
|
|
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
|
|
|
|
|
auto stream = nccl_ctx.stream();
|
|
|
|
|
auto comm = nccl_ctx.comm_;
|
|
|
|
|
all_reduce_calls.emplace_back([=] {
|
|
|
|
|
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
|
|
|
|
|
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
|
|
|
|
|
comm, stream));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
// Reduce All Tensor to trg in CPU
|
|
|
|
|
ReduceLoDTensor func(lod_tensors, &trg);
|
|
|
|
|
VisitDataType(ToDataType(lod_tensors[0].type()), func);
|
|
|
|
|
|
|
|
|
|
platform::NCCLGroupGuard guard;
|
|
|
|
|
for (auto &call : all_reduce_calls) {
|
|
|
|
|
call();
|
|
|
|
|
// Copy trg to GPU
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|