[Imperative] implement imperative NCCLParallelContext (#16477)
add NCCLParallelContext for parallel dygraphdevel
parent
bb143052cb
commit
b4c3a6aa0b
@ -0,0 +1,134 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "paddle/fluid/imperative/nccl_context.h"
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
namespace imperative {
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
void NCCLParallelContext::RecvNCCLID(const std::string &ep,
|
||||||
|
ncclUniqueId *nccl_id) {
|
||||||
|
auto addr = paddle::string::Split(ep, ':');
|
||||||
|
PADDLE_ENFORCE_EQ(addr.size(), 2UL,
|
||||||
|
"The endpoint should contain host and port: %s", ep);
|
||||||
|
std::string host = addr[0];
|
||||||
|
int port = std::stoi(addr[1]);
|
||||||
|
|
||||||
|
int server_fd, new_socket;
|
||||||
|
struct sockaddr_in address;
|
||||||
|
int addrlen = sizeof(address);
|
||||||
|
char buffer[1024] = {0};
|
||||||
|
int opt = 0;
|
||||||
|
// creating socket fd
|
||||||
|
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
|
||||||
|
PADDLE_THROW("create server fd failed");
|
||||||
|
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt,
|
||||||
|
sizeof(opt)))
|
||||||
|
PADDLE_THROW("set socket opt failed");
|
||||||
|
|
||||||
|
address.sin_family = AF_INET;
|
||||||
|
address.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
address.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
|
||||||
|
PADDLE_THROW("binding failed on ep: %s", ep);
|
||||||
|
VLOG(3) << "listening on: " << ep;
|
||||||
|
if (listen(server_fd, 3) < 0) PADDLE_THROW("listen on server fd failed");
|
||||||
|
|
||||||
|
if ((new_socket =
|
||||||
|
accept(server_fd, reinterpret_cast<struct sockaddr *>(&address),
|
||||||
|
reinterpret_cast<socklen_t *>(&addrlen))) < 0)
|
||||||
|
PADDLE_THROW("accept the new socket fd failed");
|
||||||
|
|
||||||
|
if (read(new_socket, buffer, 1024) < 0)
|
||||||
|
PADDLE_THROW("reading the ncclUniqueId from socket failed");
|
||||||
|
VLOG(3) << "recevived the ncclUniqueId";
|
||||||
|
memcpy(nccl_id, buffer, NCCL_UNIQUE_ID_BYTES);
|
||||||
|
|
||||||
|
VLOG(3) << "closing the socket server: " << ep;
|
||||||
|
close(server_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
void NCCLParallelContext::SendNCCLID(const std::string &ep,
|
||||||
|
ncclUniqueId *nccl_id) {
|
||||||
|
auto addr = paddle::string::Split(ep, ':');
|
||||||
|
PADDLE_ENFORCE_EQ(addr.size(), 2UL,
|
||||||
|
"The endpoint should contain host and port: %s", ep);
|
||||||
|
std::string host = addr[0];
|
||||||
|
int port = std::stoi(addr[1]);
|
||||||
|
// struct sockaddr_in address;
|
||||||
|
int sock = 0;
|
||||||
|
struct sockaddr_in serv_addr;
|
||||||
|
char buffer[1024] = {0};
|
||||||
|
|
||||||
|
memcpy(buffer, nccl_id, NCCL_UNIQUE_ID_BYTES);
|
||||||
|
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
|
||||||
|
PADDLE_THROW("create socket failed");
|
||||||
|
|
||||||
|
memset(&serv_addr, '0', sizeof(serv_addr));
|
||||||
|
serv_addr.sin_family = AF_INET;
|
||||||
|
serv_addr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (inet_pton(AF_INET, host.c_str(), &serv_addr.sin_addr) <= 0)
|
||||||
|
PADDLE_THROW("invalied address: %s", ep);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
|
||||||
|
VLOG(0) << "worker: " << ep
|
||||||
|
<< " is not ready, will retry after 3 seconds...";
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
VLOG(3) << "sending the ncclUniqueId to " << ep;
|
||||||
|
send(sock, buffer, NCCL_UNIQUE_ID_BYTES, 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void NCCLParallelContext::BcastNCCLId(ncclUniqueId *nccl_id, int root) {
|
||||||
|
if (strategy_.local_rank_ == root) {
|
||||||
|
for (auto ep : strategy_.trainer_endpoints_) {
|
||||||
|
if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_id);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
RecvNCCLID(strategy_.current_endpoint_, nccl_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void NCCLParallelContext::Init() {
|
||||||
|
ncclUniqueId nccl_id;
|
||||||
|
ncclComm_t comm;
|
||||||
|
if (strategy_.local_rank_ == 0) {
|
||||||
|
// generate the unique ncclid on the root worker
|
||||||
|
platform::dynload::ncclGetUniqueId(&nccl_id);
|
||||||
|
BcastNCCLId(&nccl_id, 0);
|
||||||
|
} else {
|
||||||
|
BcastNCCLId(&nccl_id, 0);
|
||||||
|
}
|
||||||
|
int gpu_id = boost::get<platform::CUDAPlace>(place_).device;
|
||||||
|
VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
|
||||||
|
<< " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id;
|
||||||
|
|
||||||
|
PADDLE_ENFORCE(cudaSetDevice(gpu_id));
|
||||||
|
PADDLE_ENFORCE(platform::dynload::ncclCommInitRank(
|
||||||
|
&comm, strategy_.nranks_, nccl_id, strategy_.local_rank_));
|
||||||
|
|
||||||
|
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
|
||||||
|
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(pool.Get(place_));
|
||||||
|
dev_ctx->set_nccl_comm(comm);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
} // namespace imperative
|
||||||
|
} // namespace paddle
|
||||||
@ -0,0 +1,81 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
// network header files
|
||||||
|
#ifndef _WIN32
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "paddle/fluid/framework/variable.h"
|
||||||
|
#include "paddle/fluid/platform/device_context.h"
|
||||||
|
#ifdef PADDLE_WITH_CUDA
|
||||||
|
#include "paddle/fluid/platform/dynload/nccl.h"
|
||||||
|
#endif
|
||||||
|
#include "paddle/fluid/platform/place.h"
|
||||||
|
#include "paddle/fluid/string/split.h"
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
namespace imperative {
|
||||||
|
|
||||||
|
struct ParallelStrategy {
|
||||||
|
int nranks_{1};
|
||||||
|
int local_rank_{0};
|
||||||
|
std::vector<std::string> trainer_endpoints_{};
|
||||||
|
std::string current_endpoint_{""};
|
||||||
|
};
|
||||||
|
|
||||||
|
class ParallelContext {
|
||||||
|
public:
|
||||||
|
explicit ParallelContext(const ParallelStrategy& strategy,
|
||||||
|
const platform::Place& place)
|
||||||
|
: strategy_(strategy), place_(place) {}
|
||||||
|
|
||||||
|
virtual ~ParallelContext() {}
|
||||||
|
|
||||||
|
virtual void Init() = 0;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
ParallelStrategy strategy_;
|
||||||
|
platform::Place place_;
|
||||||
|
};
|
||||||
|
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
class NCCLParallelContext : ParallelContext {
|
||||||
|
public:
|
||||||
|
explicit NCCLParallelContext(const ParallelStrategy& strategy,
|
||||||
|
const platform::Place& place)
|
||||||
|
: ParallelContext(strategy, place) {}
|
||||||
|
|
||||||
|
~NCCLParallelContext() {}
|
||||||
|
|
||||||
|
void BcastNCCLId(ncclUniqueId* nccl_id, int root);
|
||||||
|
|
||||||
|
void Init() override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void RecvNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id);
|
||||||
|
|
||||||
|
void SendNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id);
|
||||||
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
|
} // namespace imperative
|
||||||
|
} // namespace paddle
|
||||||
@ -0,0 +1,52 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "paddle/fluid/imperative/nccl_context.h"
|
||||||
|
#include "gtest/gtest.h"
|
||||||
|
#include "paddle/fluid/platform/device_context.h"
|
||||||
|
|
||||||
|
namespace imperative = paddle::imperative;
|
||||||
|
namespace platform = paddle::platform;
|
||||||
|
|
||||||
|
imperative::ParallelStrategy GetStrategy(int local_rank) {
|
||||||
|
std::vector<std::string> eps = {"127.0.0.1:9866", "127.0.0.1:9867"};
|
||||||
|
imperative::ParallelStrategy strategy;
|
||||||
|
strategy.trainer_endpoints_ = eps;
|
||||||
|
strategy.current_endpoint_ = eps[local_rank];
|
||||||
|
strategy.nranks_ = 2;
|
||||||
|
strategy.local_rank_ = local_rank;
|
||||||
|
return strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
void BcastNCCLId(int local_rank, ncclUniqueId *nccl_id) {
|
||||||
|
auto strategy = GetStrategy(local_rank);
|
||||||
|
platform::CUDAPlace gpu(local_rank);
|
||||||
|
imperative::NCCLParallelContext ctx(strategy, gpu);
|
||||||
|
ctx.BcastNCCLId(nccl_id, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(BcastNCCLId, Run) {
|
||||||
|
ncclUniqueId nccl_id;
|
||||||
|
platform::dynload::ncclGetUniqueId(&nccl_id);
|
||||||
|
std::thread t(BcastNCCLId, 0, &nccl_id);
|
||||||
|
|
||||||
|
ncclUniqueId recv_nccl_id;
|
||||||
|
BcastNCCLId(1, &recv_nccl_id);
|
||||||
|
|
||||||
|
t.join();
|
||||||
|
EXPECT_EQ(0, std::memcmp(nccl_id.internal, recv_nccl_id.internal,
|
||||||
|
NCCL_UNIQUE_ID_BYTES));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
@ -0,0 +1,60 @@
|
|||||||
|
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except jin compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import os
|
||||||
|
from .. import core
|
||||||
|
|
||||||
|
__all__ = ["prepare_context"]
|
||||||
|
|
||||||
|
ParallelStrategy = core.ParallelStrategy
|
||||||
|
|
||||||
|
__parallel_ctx__clz__ = None
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_context(parallel_strategy, place):
|
||||||
|
global __parallel_ctx__clz__
|
||||||
|
assert __parallel_ctx__clz__ is None, "ParallelContext can only be initialized once."
|
||||||
|
|
||||||
|
if isinstance(place, core.CUDAPlace):
|
||||||
|
__parallel_ctx__clz__ = core.NCCLParallelContext(parallel_strategy,
|
||||||
|
place)
|
||||||
|
else:
|
||||||
|
# TODO(Yancey1989): add Gloo Parallel Context to support CPU parallel computation
|
||||||
|
assert ("Only support CUDAPlace for now.")
|
||||||
|
__parallel_ctx__clz__.init()
|
||||||
|
|
||||||
|
|
||||||
|
class Env(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._nranks = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
|
||||||
|
self._local_rank = int(os.getenv("PADDLE_TRAINER_ID", "0"))
|
||||||
|
self._dev_id = int(os.getenv("FLAGS_selected_gpus", "0"))
|
||||||
|
self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS",
|
||||||
|
"").split(",")
|
||||||
|
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def nranks(self):
|
||||||
|
return self._nranks
|
||||||
|
|
||||||
|
@property
|
||||||
|
def local_rank(self):
|
||||||
|
return self._local_rank
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dev_id(self):
|
||||||
|
return self._dev_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_endpoint(self):
|
||||||
|
return self._current_endpoint
|
||||||
Loading…
Reference in new issue