|
|
|
@ -16,6 +16,7 @@
|
|
|
|
|
|
|
|
|
|
#include <glog/logging.h>
|
|
|
|
|
#include <gtest/gtest.h>
|
|
|
|
|
#include <algorithm>
|
|
|
|
|
#include <memory>
|
|
|
|
|
#include <mutex>
|
|
|
|
|
#include <thread>
|
|
|
|
@ -150,16 +151,41 @@ TEST_F(NCCLTester, ncclAllReduceOp) {
|
|
|
|
|
op2->SetInput("Communicator", {"comm"});
|
|
|
|
|
op2->SetOutput("Out", {"rt"});
|
|
|
|
|
|
|
|
|
|
std::vector<f::Scope *> dev_scopes;
|
|
|
|
|
|
|
|
|
|
std::vector<std::thread> ths;
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < gpu_list.size(); ++i) {
|
|
|
|
|
dev_scopes.emplace_back(&g_scope.NewScope());
|
|
|
|
|
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
|
|
|
|
|
*op2.get(), &g_scope.NewScope());
|
|
|
|
|
*op2.get(), dev_scopes[i]);
|
|
|
|
|
ths.emplace_back(std::move(th));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < gpu_list.size(); ++i) {
|
|
|
|
|
ths[i].join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check results
|
|
|
|
|
float result = 0;
|
|
|
|
|
std::accumulate(gpu_list.begin(), gpu_list.end(), result);
|
|
|
|
|
for (size_t i = 0; i < dev_scopes.size(); ++i) {
|
|
|
|
|
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
|
|
|
|
|
auto *rt = recv_tensor.data<float>();
|
|
|
|
|
|
|
|
|
|
p::CPUPlace cpu_place;
|
|
|
|
|
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
|
|
|
|
|
result_tensor->Resize(kDims);
|
|
|
|
|
auto *ct = result_tensor->mutable_data<float>(cpu_place);
|
|
|
|
|
|
|
|
|
|
paddle::memory::Copy(
|
|
|
|
|
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
|
|
|
|
|
recv_tensor.numel() * sizeof(float),
|
|
|
|
|
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
|
|
|
|
|
for (size_t j = 0; j < f::product(kDims); ++j) {
|
|
|
|
|
ASSERT_NEAR(ct[j], result, 1e-5);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ncclReduceOp with desc
|
|
|
|
@ -170,24 +196,76 @@ TEST(NCCL, ncclReduceOp) {
|
|
|
|
|
op2->SetInput("Communicator", {"comm"});
|
|
|
|
|
op2->SetOutput("Out", {"rt"});
|
|
|
|
|
|
|
|
|
|
std::vector<f::Scope *> dev_scopes;
|
|
|
|
|
|
|
|
|
|
std::vector<std::thread> ths;
|
|
|
|
|
for (size_t i = 0; i < gpu_list.size(); ++i) {
|
|
|
|
|
dev_scopes.emplace_back(&g_scope.NewScope());
|
|
|
|
|
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
|
|
|
|
|
*op2.get(), &g_scope.NewScope());
|
|
|
|
|
*op2.get(), dev_scopes[i]);
|
|
|
|
|
ths.emplace_back(std::move(th));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < gpu_list.size(); ++i) {
|
|
|
|
|
ths[i].join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check results
|
|
|
|
|
float result = 0;
|
|
|
|
|
std::accumulate(gpu_list.begin(), gpu_list.end(), result);
|
|
|
|
|
for (size_t i = 0; i < dev_scopes.size(); ++i) {
|
|
|
|
|
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
|
|
|
|
|
auto *rt = recv_tensor.data<float>();
|
|
|
|
|
|
|
|
|
|
p::CPUPlace cpu_place;
|
|
|
|
|
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
|
|
|
|
|
result_tensor->Resize(kDims);
|
|
|
|
|
auto *ct = result_tensor->mutable_data<float>(cpu_place);
|
|
|
|
|
|
|
|
|
|
paddle::memory::Copy(
|
|
|
|
|
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
|
|
|
|
|
recv_tensor.numel() * sizeof(float),
|
|
|
|
|
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
|
|
|
|
|
for (size_t j = 0; j < f::product(kDims); ++j) {
|
|
|
|
|
ASSERT_NEAR(ct[j], result, 1e-5);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ncclBcastOp with desc
|
|
|
|
|
// TEST(NCCL, ncclBcastOp) {
|
|
|
|
|
TEST(NCCL, ncclBcastOp) {
|
|
|
|
|
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
|
|
|
|
|
op1->SetType("ncclBcastSend");
|
|
|
|
|
op1->SetInput("X", {"st"});
|
|
|
|
|
op1->SetInput("Communicator", {"comm"});
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
|
|
|
|
|
op2->SetType("ncclBcastRecv");
|
|
|
|
|
op2->SetInput("Communicator", {"comm"});
|
|
|
|
|
op2->SetOutput("Out", {"rt"});
|
|
|
|
|
|
|
|
|
|
std::vector<std::thread> ths;
|
|
|
|
|
for (size_t i = 1; i < gpu_list.size(); ++i) {
|
|
|
|
|
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
|
|
|
|
|
*op2.get(), &g_scope.NewScope());
|
|
|
|
|
ths.emplace_back(std::move(th));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < gpu_list.size(); ++i) {
|
|
|
|
|
ths[i].join();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// joint ncclBcastOp and ncclReduceOp
|
|
|
|
|
// TEST(NCCL, MultipleOp) {
|
|
|
|
|
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
|
|
|
|
|
// op2->SetType("ncclBcastSend");
|
|
|
|
|
// op2->SetInput("X", {"st"});
|
|
|
|
|
// op2->SetInput("Communicator", {"comm"});
|
|
|
|
|
|
|
|
|
|
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
|
|
|
|
|
// op2->SetType("ncclBcastRecv");
|
|
|
|
|
// op2->SetInput("Communicator", {"comm"});
|
|
|
|
|
// op2->SetOutput("Out", {"rt"});
|
|
|
|
|
|
|
|
|
|
// std::vector<std::thread> ths;
|
|
|
|
|