You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							210 lines
						
					
					
						
							7.0 KiB
						
					
					
				
			
		
		
	
	
							210 lines
						
					
					
						
							7.0 KiB
						
					
					
				| /* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| http://www.apache.org/licenseshashernless required by applicable law or agreed
 | |
| to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License. */
 | |
| 
 | |
| #include <functional>
 | |
| 
 | |
| #include "paddle/fluid/framework/lod_tensor.h"
 | |
| #include "paddle/fluid/framework/op_registry.h"
 | |
| #include "paddle/fluid/operators/nccl/nccl_gpu_common.h"
 | |
| 
 | |
| namespace paddle {
 | |
| namespace operators {
 | |
| 
 | |
| using framework::Tensor;
 | |
| using platform::Communicator;
 | |
| using framework::LoDTensor;
 | |
| 
 | |
| template <typename Type>
 | |
| class NCCLTypeWrapper;
 | |
| 
 | |
| template <>
 | |
| class NCCLTypeWrapper<float> {
 | |
|  public:
 | |
|   static const ncclDataType_t type = ncclFloat;
 | |
| };
 | |
| 
 | |
| template <>
 | |
| class NCCLTypeWrapper<double> {
 | |
|  public:
 | |
|   static const ncclDataType_t type = ncclDouble;
 | |
| };
 | |
| 
 | |
| template <typename T>
 | |
| class NCCLAllReduceKernel : public framework::OpKernel<T> {
 | |
|  public:
 | |
|   void Compute(const framework::ExecutionContext& ctx) const override {
 | |
|     PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()),
 | |
|                    "This kernel only runs on GPU device.");
 | |
| 
 | |
|     auto ins = ctx.MultiInput<LoDTensor>("X");
 | |
|     auto outs = ctx.MultiOutput<LoDTensor>("Out");
 | |
| 
 | |
|     std::string reduction = ctx.Attr<std::string>("reduction");
 | |
|     ncclRedOp_t reduction_op_ = ncclSum;
 | |
| 
 | |
|     if (reduction == "ncclMin") {
 | |
|       reduction_op_ = ncclMin;
 | |
|     } else if (reduction == "ncclMax") {
 | |
|       reduction_op_ = ncclMax;
 | |
|     } else if (reduction == "ncclSum") {
 | |
|       reduction_op_ = ncclSum;
 | |
|     } else if (reduction == "ncclProd") {
 | |
|       reduction_op_ = ncclProd;
 | |
|     } else {
 | |
|       PADDLE_THROW("Invalid reduction. default ncclSum.");
 | |
|     }
 | |
| 
 | |
|     auto* comm = ctx.Input<Communicator>("Communicator");
 | |
| 
 | |
|     auto stream = ctx.cuda_device_context().stream();
 | |
| 
 | |
|     // device id
 | |
|     int gpu_id = boost::get<platform::CUDAPlace>(ctx.GetPlace()).GetDeviceId();
 | |
|     int idx = comm->GetCommId(gpu_id);
 | |
| 
 | |
|     for (size_t i = 0; i < ins.size(); ++i) {
 | |
|       VLOG(1) << "gpu : "
 | |
|               << " invoke allreduce. send " << ins[i]->numel() << " recv "
 | |
|               << outs[i]->numel();
 | |
| 
 | |
|       PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
 | |
|           ins[i]->data<T>(), outs[i]->mutable_data<T>(ctx.GetPlace()),
 | |
|           outs[i]->numel(), NCCLTypeWrapper<T>::type, reduction_op_,
 | |
|           comm->comms_[idx], stream));
 | |
|       PADDLE_ENFORCE(cudaStreamSynchronize(stream));
 | |
| 
 | |
|       VLOG(1) << "gpu : "
 | |
|               << " finished allreduce. send " << ins[i]->numel() << " recv "
 | |
|               << outs[i]->numel();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <typename T>
 | |
| class NCCLReduceKernel : public framework::OpKernel<T> {
 | |
|  public:
 | |
|   void Compute(const framework::ExecutionContext& ctx) const override {
 | |
|     PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()),
 | |
|                    "This kernel only runs on GPU device.");
 | |
| 
 | |
|     auto ins = ctx.MultiInput<LoDTensor>("X");  // x0, x1, x2
 | |
|     auto outs = ctx.MultiOutput<LoDTensor>("Out");
 | |
| 
 | |
|     std::string reduction = ctx.Attr<std::string>("reduction");
 | |
|     ncclRedOp_t reduction_op_ = ncclSum;
 | |
| 
 | |
|     if (reduction == "ncclMin") {
 | |
|       reduction_op_ = ncclMin;
 | |
|     } else if (reduction == "ncclMax") {
 | |
|       reduction_op_ = ncclMax;
 | |
|     } else if (reduction == "ncclSum") {
 | |
|       reduction_op_ = ncclSum;
 | |
|     } else if (reduction == "ncclProd") {
 | |
|       reduction_op_ = ncclProd;
 | |
|     } else {
 | |
|       PADDLE_THROW("Invalid reduction. default ncclSum.");
 | |
|     }
 | |
| 
 | |
|     int root = ctx.Attr<int>("root");
 | |
|     auto* comm = ctx.Input<Communicator>("Communicator");
 | |
| 
 | |
|     auto stream = reinterpret_cast<const platform::CUDADeviceContext&>(
 | |
|                       ctx.device_context())
 | |
|                       .stream();
 | |
|     // device id
 | |
|     int gpu_id = boost::get<platform::CUDAPlace>(ctx.GetPlace()).GetDeviceId();
 | |
|     int idx = comm->GetCommId(gpu_id);
 | |
| 
 | |
|     auto ins_names = ctx.Inputs("X");
 | |
|     std::hash<std::string> hasher;
 | |
|     for (size_t i = 0; i < ins.size(); ++i) {
 | |
|       if (root == platform::kInvalidGPUId) {
 | |
|         root = hasher(ins_names[i]) % comm->comms_.size();
 | |
|       }
 | |
|       T* recvbuffer = nullptr;
 | |
|       if (root == gpu_id) {
 | |
|         recvbuffer = outs[i]->mutable_data<T>(ctx.GetPlace());
 | |
|       }
 | |
| 
 | |
|       VLOG(1) << "gpu : " << gpu_id << " invoke reduce. send "
 | |
|               << ins[i]->numel() << " recv " << outs[i]->numel();
 | |
| 
 | |
|       PADDLE_ENFORCE(platform::dynload::ncclReduce(
 | |
|           ins[i]->data<T>(), recvbuffer, ins[i]->numel(),
 | |
|           NCCLTypeWrapper<T>::type, reduction_op_, root, comm->comms_[idx],
 | |
|           stream));
 | |
|       PADDLE_ENFORCE(cudaStreamSynchronize(stream));
 | |
| 
 | |
|       VLOG(1) << "gpu : " << gpu_id << " finished reduce. send "
 | |
|               << ins[i]->numel() << " recv " << outs[i]->numel();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <typename T>
 | |
| class NCCLBcastKernel : public framework::OpKernel<T> {
 | |
|  public:
 | |
|   void Compute(const framework::ExecutionContext& ctx) const override {
 | |
|     PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()),
 | |
|                    "This kernel only runs on GPU device.");
 | |
| 
 | |
|     int root = ctx.Attr<int>("root");
 | |
| 
 | |
|     auto* comm = ctx.Input<Communicator>("Communicator");
 | |
| 
 | |
|     auto stream = reinterpret_cast<const platform::CUDADeviceContext&>(
 | |
|                       ctx.device_context())
 | |
|                       .stream();
 | |
|     // device id
 | |
|     int gpu_id = boost::get<platform::CUDAPlace>(ctx.GetPlace()).GetDeviceId();
 | |
|     int idx = comm->GetCommId(gpu_id);
 | |
| 
 | |
|     if (idx == root) {
 | |
|       auto ins = ctx.MultiInput<LoDTensor>("X");
 | |
|       for (size_t i = 0; i < ins.size(); ++i) {
 | |
|         VLOG(1) << "gpu : " << gpu_id << " invoke Bcast. send "
 | |
|                 << ins[i]->numel();
 | |
| 
 | |
|         VLOG(1) << " before ncclBcast";
 | |
|         PADDLE_ENFORCE(platform::dynload::ncclBcast(
 | |
|             (void*)ins[i]->data<T>(), ins[i]->numel(), NCCLTypeWrapper<T>::type,
 | |
|             root, comm->comms_[idx], stream));
 | |
|         VLOG(1) << " after ncclBcast";
 | |
|         PADDLE_ENFORCE(cudaStreamSynchronize(stream));
 | |
| 
 | |
|         VLOG(1) << "gpu : " << gpu_id << " finished Bcast.";
 | |
|       }
 | |
|     } else {
 | |
|       auto outs = ctx.MultiOutput<LoDTensor>("Out");
 | |
|       for (size_t i = 0; i < outs.size(); ++i) {
 | |
|         VLOG(1) << "gpu : " << gpu_id << " invoke Bcast. recv buffer "
 | |
|                 << framework::product(outs[i]->dims());
 | |
| 
 | |
|         PADDLE_ENFORCE(platform::dynload::ncclBcast(
 | |
|             outs[i]->mutable_data<T>(ctx.GetPlace()), outs[i]->numel(),
 | |
|             NCCLTypeWrapper<T>::type, root, comm->comms_[idx], stream));
 | |
|         PADDLE_ENFORCE(cudaStreamSynchronize(stream));
 | |
| 
 | |
|         VLOG(1) << "gpu : " << gpu_id << " finished Bcast. recv "
 | |
|                 << outs[i]->numel();
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| }  // namespace operators
 | |
| }  // namespace paddle
 | |
| 
 | |
| namespace ops = paddle::operators;
 | |
| REGISTER_OP_CUDA_KERNEL(ncclAllReduce, ops::NCCLAllReduceKernel<float>);
 | |
| REGISTER_OP_CUDA_KERNEL(ncclBcast, ops::NCCLBcastKernel<float>);
 | |
| REGISTER_OP_CUDA_KERNEL(ncclReduce, ops::NCCLReduceKernel<float>);
 |