You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/paddle/fluid/operators/collective/c_broadcast_op.cu.cc

88 lines
3.1 KiB

supports collective communicated training (#18175) * fix prepare context redundant code problem, optimize executor by caching create_varaiables test=develop * supports collective training in executor * make fetch_list runable with variables, add more unittest for use_program_cache test=develop * fix comment test=develop * use unique name for nccl_id * supports output to stream in program_to_code * insert sync_comm_stream before regularization; add skip_op_callstack capability in program_to_code * set op role in collective training * add collective op role * remove orig file * add build optimizer by strategy * add collective strategy * refine collective strategy * add multi-process role maker * refine strategy building factory so that we can easily plugin more strategy * scale loss grad in collective sgd transpiler * add support for distributed fc * code format * revert some features for dist fc * add support for distributed fc training * fix prepare context redundant code problem, optimize executor by caching create_varaiables test=develop * supports collective training in executor * make fetch_list runable with variables, add more unittest for use_program_cache test=develop * use unique name for nccl_id * supports output to stream in program_to_code * insert sync_comm_stream before regularization; add skip_op_callstack capability in program_to_code * set op role in collective training * add collective op role * fix comment test=develop * remove orig file * add build optimizer by strategy * add collective strategy * refine collective strategy * add multi-process role maker * refine strategy building factory so that we can easily plugin more strategy * scale loss grad in collective sgd transpiler * add support for distributed fc * code format * revert some features for dist fc * add support for distributed fc training * test=develop add collective op unittest standard * test=develop remove the test_collective directory * test=develop remove the test_collective directory * remove slicegather test * code format for reducescatter * update attr of shard_index_op * Modify macro nccl_helper * remove test without distribute * macro collective_helper * marcro update * test=develop update support python3.5 * test=develop change gpu memory use to 0.1 when test * test=develop update ut equal func * test=develop set flags to 1.5 * test=develop fix pickle dumple py35 * test=develop fix divide in slice and add sync_comm_stream update atol and rtol to 1e-05 rm shard_index op and test modify read input from file to read from memory remove origin_program in framework and add i/o in c_sync_calc_stream * test=develop update unittest sync operator I/O
6 years ago
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/collective/c_broadcast_op.h"
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace paddle {
namespace operators {
template <typename T>
class CBroadcastOpCUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
auto x = ctx.Input<framework::LoDTensor>("X");
auto out = ctx.Output<framework::LoDTensor>("Out");
int numel = x->numel();
ncclDataType_t dtype = platform::ToNCCLDataType(x->type());
int rid = ctx.Attr<int>("ring_id");
auto comm = platform::NCCLCommContext::Instance().Get(rid);
auto place = ctx.GetPlace();
cudaStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
int root = ctx.Attr<int>("root");
if (root == comm->rank()) {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
reinterpret_cast<void*>(const_cast<T*>(x->data<T>())), numel, dtype,
root, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " invoke Bcast. sent "
<< x->numel();
if (out != x) {
framework::TensorCopy(
*static_cast<const framework::Tensor*>(x), place,
*platform::DeviceContextPool::Instance().Get(place),
static_cast<framework::Tensor*>(out));
}
} else {
PADDLE_ENFORCE(platform::dynload::ncclBcast(out->mutable_data<T>(place),
numel, dtype, root,
comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " invoke Bcast. recieved "
<< framework::product(out->dims());
}
out->Resize(x->dims());
out->set_lod(x->lod());
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
#endif
}
};
} // namespace operators
} // namespace paddle
supports collective communicated training (#18175) * fix prepare context redundant code problem, optimize executor by caching create_varaiables test=develop * supports collective training in executor * make fetch_list runable with variables, add more unittest for use_program_cache test=develop * fix comment test=develop * use unique name for nccl_id * supports output to stream in program_to_code * insert sync_comm_stream before regularization; add skip_op_callstack capability in program_to_code * set op role in collective training * add collective op role * remove orig file * add build optimizer by strategy * add collective strategy * refine collective strategy * add multi-process role maker * refine strategy building factory so that we can easily plugin more strategy * scale loss grad in collective sgd transpiler * add support for distributed fc * code format * revert some features for dist fc * add support for distributed fc training * fix prepare context redundant code problem, optimize executor by caching create_varaiables test=develop * supports collective training in executor * make fetch_list runable with variables, add more unittest for use_program_cache test=develop * use unique name for nccl_id * supports output to stream in program_to_code * insert sync_comm_stream before regularization; add skip_op_callstack capability in program_to_code * set op role in collective training * add collective op role * fix comment test=develop * remove orig file * add build optimizer by strategy * add collective strategy * refine collective strategy * add multi-process role maker * refine strategy building factory so that we can easily plugin more strategy * scale loss grad in collective sgd transpiler * add support for distributed fc * code format * revert some features for dist fc * add support for distributed fc training * test=develop add collective op unittest standard * test=develop remove the test_collective directory * test=develop remove the test_collective directory * remove slicegather test * code format for reducescatter * update attr of shard_index_op * Modify macro nccl_helper * remove test without distribute * macro collective_helper * marcro update * test=develop update support python3.5 * test=develop change gpu memory use to 0.1 when test * test=develop update ut equal func * test=develop set flags to 1.5 * test=develop fix pickle dumple py35 * test=develop fix divide in slice and add sync_comm_stream update atol and rtol to 1e-05 rm shard_index op and test modify read input from file to read from memory remove origin_program in framework and add i/o in c_sync_calc_stream * test=develop update unittest sync operator I/O
6 years ago
namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OP_CUDA_KERNEL(c_broadcast, ops::CBroadcastOpCUDAKernel<float>,
ops::CBroadcastOpCUDAKernel<double>,
ops::CBroadcastOpCUDAKernel<int>,
ops::CBroadcastOpCUDAKernel<int64_t>,
ops::CBroadcastOpCUDAKernel<plat::float16>);