Fuse AllReduce (#15921)
* fuse all_reduce test=develop * add fuse_parameter_groups_size test=develop * Polish code test=develop * Fix travis-ci test=develop * Add SetGroupAccordingToLayers and SetGroupAccordingToGroupSize test=develop * Add SetGroupAccordingToMemorySize test=develop * fix multi_devices_graph test=develop * reset params_grads test=develop * Polish code test=developrevert-16190-refine_parallel_executor
parent
d0ef682552
commit
f26ba5bddd
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,195 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
|
||||||
|
#include "paddle/fluid/framework/details/container_cast.h"
|
||||||
|
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
|
||||||
|
#include "paddle/fluid/framework/details/multi_devices_helper.h"
|
||||||
|
#include "paddle/fluid/framework/ir/graph_helper.h"
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
namespace framework {
|
||||||
|
namespace details {
|
||||||
|
|
||||||
|
class FuseAllReduceOpPass : public ir::Pass {
|
||||||
|
protected:
|
||||||
|
std::unique_ptr<ir::Graph> ApplyImpl(
|
||||||
|
std::unique_ptr<ir::Graph> graph) const override {
|
||||||
|
ir::Graph &result = *graph;
|
||||||
|
|
||||||
|
auto &places = Get<const std::vector<platform::Place>>(kPlaces);
|
||||||
|
auto &local_scopes = Get<const std::vector<Scope *>>(kLocalScopes);
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
auto *nccl_ctxs = &Get<platform::NCCLContextMap>(kNCCLCtxs);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
std::unordered_set<std::string> grads;
|
||||||
|
auto ¶ms_grads = result.Get<ParamsAndGrads>(kParamsAndGrads);
|
||||||
|
size_t num_of_all_reduce = params_grads.size();
|
||||||
|
grads.reserve(num_of_all_reduce);
|
||||||
|
for (auto p_g : params_grads) {
|
||||||
|
grads.insert(p_g.second);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t num_place = places.size();
|
||||||
|
std::unordered_map<std::string, ir::Node *> all_reduce_ops;
|
||||||
|
all_reduce_ops.reserve(grads.size());
|
||||||
|
for (auto &node : result.Nodes()) {
|
||||||
|
if (node->IsOp()) {
|
||||||
|
PADDLE_ENFORCE(node->IsWrappedBy<OpHandleBase>());
|
||||||
|
auto *all_reduce_op_handle =
|
||||||
|
dynamic_cast<AllReduceOpHandle *>(&node->Wrapper<OpHandleBase>());
|
||||||
|
if (all_reduce_op_handle) {
|
||||||
|
auto inputs = DynamicCast<VarHandle>(all_reduce_op_handle->Inputs());
|
||||||
|
PADDLE_ENFORCE_EQ(inputs.size(), num_place);
|
||||||
|
// The inputs' name should be the same.
|
||||||
|
auto &grad_name = inputs[0]->name();
|
||||||
|
for (size_t i = 1; i < inputs.size(); ++i) {
|
||||||
|
PADDLE_ENFORCE_EQ(inputs[i]->name(), grad_name,
|
||||||
|
"The input name should be the same.");
|
||||||
|
}
|
||||||
|
PADDLE_ENFORCE_NE(grads.count(grad_name), static_cast<size_t>(0));
|
||||||
|
all_reduce_ops.emplace(grad_name, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
VLOG(10) << "Find all_reduce_ops: " << all_reduce_ops.size();
|
||||||
|
if (all_reduce_ops.size() == 0) {
|
||||||
|
return std::move(graph);
|
||||||
|
}
|
||||||
|
|
||||||
|
PADDLE_ENFORCE_EQ(all_reduce_ops.size(), grads.size(),
|
||||||
|
"The number of all_reduce OpHandle is not equal to the "
|
||||||
|
"number of grads. Maybe some gradients are sparse type, "
|
||||||
|
"it is not supported currently.");
|
||||||
|
VLOG(10) << "Insert fused_all_reduce";
|
||||||
|
|
||||||
|
auto &group_grads_params =
|
||||||
|
graph->Get<GroupGradsAndParams>(kGroupGradsAndParams);
|
||||||
|
|
||||||
|
for (auto &group_g_p : group_grads_params) {
|
||||||
|
size_t group_size = group_g_p.size();
|
||||||
|
PADDLE_ENFORCE_GT(group_size, static_cast<size_t>(0));
|
||||||
|
std::vector<ir::Node *> group_all_reduce_ops;
|
||||||
|
group_all_reduce_ops.reserve(group_size);
|
||||||
|
for (auto &g_p : group_g_p) {
|
||||||
|
group_all_reduce_ops.emplace_back(all_reduce_ops.at(g_p.first));
|
||||||
|
}
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||||
|
group_all_reduce_ops, nccl_ctxs, &result);
|
||||||
|
#else
|
||||||
|
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||||
|
group_all_reduce_ops, &result);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
return std::move(graph);
|
||||||
|
}
|
||||||
|
|
||||||
|
void InsertFusedAllReduce(const std::vector<platform::Place> &places,
|
||||||
|
const std::vector<Scope *> &local_scopes,
|
||||||
|
const size_t num_of_all_reduce,
|
||||||
|
const std::vector<ir::Node *> &all_reduce_ops,
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
const platform::NCCLContextMap *nccl_ctxs,
|
||||||
|
#endif
|
||||||
|
ir::Graph *result) const {
|
||||||
|
std::vector<VarHandleBase *> inputs;
|
||||||
|
std::vector<VarHandleBase *> outputs;
|
||||||
|
for (auto &op : all_reduce_ops) {
|
||||||
|
auto &op_handle = op->Wrapper<OpHandleBase>();
|
||||||
|
inputs.insert(inputs.end(), op_handle.Inputs().begin(),
|
||||||
|
op_handle.Inputs().end());
|
||||||
|
// Remove output
|
||||||
|
for_each(op_handle.Inputs().begin(), op_handle.Inputs().end(),
|
||||||
|
[&op_handle](VarHandleBase *var_handle) {
|
||||||
|
var_handle->RemoveOutput(&op_handle, op_handle.Node());
|
||||||
|
});
|
||||||
|
|
||||||
|
outputs.insert(outputs.end(), op_handle.Outputs().begin(),
|
||||||
|
op_handle.Outputs().end());
|
||||||
|
// Remove Input
|
||||||
|
for_each(
|
||||||
|
op_handle.Outputs().begin(), op_handle.Outputs().end(),
|
||||||
|
[](VarHandleBase *var_handle) { var_handle->ClearGeneratedOp(); });
|
||||||
|
|
||||||
|
result->RemoveNode(op_handle.Node());
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||||
|
local_scopes, nccl_ctxs, result);
|
||||||
|
#else
|
||||||
|
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||||
|
local_scopes, result);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void CreateFusedAllReduceOp(const std::vector<VarHandleBase *> &inputs,
|
||||||
|
const std::vector<VarHandleBase *> &outputs,
|
||||||
|
const size_t num_of_all_reduce,
|
||||||
|
const std::vector<platform::Place> &places,
|
||||||
|
const std::vector<Scope *> &local_scopes,
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
const platform::NCCLContextMap *nccl_ctxs,
|
||||||
|
#endif
|
||||||
|
ir::Graph *result) const {
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
auto *op_handle = new FusedAllReduceOpHandle(
|
||||||
|
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||||
|
local_scopes, places, num_of_all_reduce, nccl_ctxs);
|
||||||
|
#else
|
||||||
|
auto *op_handle = new FusedAllReduceOpHandle(
|
||||||
|
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||||
|
local_scopes, places, num_of_all_reduce);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
for (auto in : inputs) {
|
||||||
|
op_handle->AddInput(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto out : outputs) {
|
||||||
|
op_handle->AddOutput(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
if (!nccl_ctxs) {
|
||||||
|
SetCommunicationContext(places, op_handle);
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
SetCommunicationContext(places, op_handle);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetCommunicationContext(const std::vector<platform::Place> &places,
|
||||||
|
FusedAllReduceOpHandle *op_handle) const {
|
||||||
|
for (size_t i = 0; i < places.size(); ++i) {
|
||||||
|
op_handle->SetDeviceContext(
|
||||||
|
places[i], platform::DeviceContextPool::Instance().Get(places[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace details
|
||||||
|
} // namespace framework
|
||||||
|
} // namespace paddle
|
||||||
|
|
||||||
|
REGISTER_PASS(fuse_all_reduce_op_pass,
|
||||||
|
paddle::framework::details::FuseAllReduceOpPass);
|
@ -0,0 +1,248 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
|
||||||
|
#include <algorithm>
|
||||||
|
#include <utility>
|
||||||
|
#include "paddle/fluid/framework/details/container_cast.h"
|
||||||
|
#include "paddle/fluid/framework/details/reduce_and_gather.h"
|
||||||
|
#include "paddle/fluid/framework/details/variable_visitor.h"
|
||||||
|
#include "paddle/fluid/platform/profiler.h"
|
||||||
|
|
||||||
|
DEFINE_bool(skip_fused_all_reduce_check, false, "");
|
||||||
|
namespace paddle {
|
||||||
|
namespace framework {
|
||||||
|
namespace details {
|
||||||
|
|
||||||
|
typedef std::vector<std::vector<std::pair<std::string, const LoDTensor *>>>
|
||||||
|
GradientAndLoDTensor;
|
||||||
|
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
|
||||||
|
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||||
|
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
|
||||||
|
const platform::NCCLContextMap *ctxs)
|
||||||
|
: OpHandleBase(node),
|
||||||
|
local_scopes_(local_scopes),
|
||||||
|
places_(places),
|
||||||
|
num_of_all_reduce_(num_of_all_reduce),
|
||||||
|
nccl_ctxs_(ctxs) {
|
||||||
|
if (nccl_ctxs_) {
|
||||||
|
for (auto &p : places_) {
|
||||||
|
this->SetDeviceContext(p, nccl_ctxs_->DevCtx(p));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
|
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
|
||||||
|
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||||
|
const std::vector<platform::Place> &places, const size_t num_of_all_reduce)
|
||||||
|
: OpHandleBase(node),
|
||||||
|
local_scopes_(local_scopes),
|
||||||
|
places_(places),
|
||||||
|
num_of_all_reduce_(num_of_all_reduce) {
|
||||||
|
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void FusedAllReduceOpHandle::RunImpl() {
|
||||||
|
platform::RecordEvent record_event(Name());
|
||||||
|
|
||||||
|
VLOG(4) << this->DebugString();
|
||||||
|
|
||||||
|
WaitInputVarGenerated();
|
||||||
|
// The input: grad0(dev0), grad0(dev1), grad1(dev0), grad1(dev1)...
|
||||||
|
// The output: grad0(dev0), grad0(dev1), grad1(dev0), grad1(dev1)...
|
||||||
|
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
|
||||||
|
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
|
||||||
|
|
||||||
|
size_t place_num = places_.size();
|
||||||
|
PADDLE_ENFORCE_EQ(
|
||||||
|
in_var_handles.size(), place_num * num_of_all_reduce_,
|
||||||
|
"The NoDummyInputSize should be equal to the number of places.");
|
||||||
|
PADDLE_ENFORCE_EQ(
|
||||||
|
in_var_handles.size(), out_var_handles.size(),
|
||||||
|
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
|
||||||
|
|
||||||
|
GradientAndLoDTensor grads_tensor;
|
||||||
|
grads_tensor.resize(place_num);
|
||||||
|
|
||||||
|
int64_t numel = -1;
|
||||||
|
auto dtype = static_cast<framework::proto::VarType::Type>(0);
|
||||||
|
for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) {
|
||||||
|
auto &g_tensor = grads_tensor.at(scope_idx);
|
||||||
|
g_tensor.reserve(num_of_all_reduce_);
|
||||||
|
|
||||||
|
GetGradLoDTensor(scope_idx, in_var_handles, out_var_handles, &g_tensor);
|
||||||
|
|
||||||
|
int64_t element_num = 0;
|
||||||
|
framework::proto::VarType::Type ele_dtype =
|
||||||
|
static_cast<framework::proto::VarType::Type>(0);
|
||||||
|
GetDTypeAndNumel(g_tensor, &ele_dtype, &element_num);
|
||||||
|
|
||||||
|
if (numel == -1) {
|
||||||
|
numel = element_num;
|
||||||
|
}
|
||||||
|
if (dtype == static_cast<framework::proto::VarType::Type>(0)) {
|
||||||
|
dtype = ele_dtype;
|
||||||
|
PADDLE_ENFORCE_NE(ele_dtype,
|
||||||
|
static_cast<framework::proto::VarType::Type>(0));
|
||||||
|
}
|
||||||
|
PADDLE_ENFORCE_EQ(ele_dtype, dtype);
|
||||||
|
|
||||||
|
// Check whether the address space is contiguous.
|
||||||
|
std::sort(
|
||||||
|
g_tensor.begin(), g_tensor.end(),
|
||||||
|
[](const std::pair<std::string, const LoDTensor *> &grad1,
|
||||||
|
const std::pair<std::string, const LoDTensor *> &grad2) -> bool {
|
||||||
|
return grad1.second->data<void>() < grad2.second->data<void>();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (size_t k = 1; k < g_tensor.size(); ++k) {
|
||||||
|
const void *pre_address = g_tensor.at(k - 1).second->data<void>();
|
||||||
|
int64_t len = g_tensor.at(k - 1).second->numel();
|
||||||
|
auto offset = len * framework::SizeOfType(dtype);
|
||||||
|
void *next_address = reinterpret_cast<void *>(
|
||||||
|
reinterpret_cast<uintptr_t>(pre_address) + offset);
|
||||||
|
const void *cur_address = g_tensor.at(k).second->data<void>();
|
||||||
|
VLOG(10) << k << ", "
|
||||||
|
<< " pre_address(" << g_tensor.at(k - 1).first
|
||||||
|
<< "): " << pre_address << ", cur_address("
|
||||||
|
<< g_tensor.at(k).first << "): " << cur_address
|
||||||
|
<< ", offset:" << offset << ", " << next_address << ", "
|
||||||
|
<< cur_address;
|
||||||
|
PADDLE_ENFORCE_EQ(next_address, cur_address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!FLAGS_skip_fused_all_reduce_check) {
|
||||||
|
for (size_t scope_idx = 0; scope_idx < place_num; ++scope_idx) {
|
||||||
|
for (size_t j = 1; j < num_of_all_reduce_; ++j) {
|
||||||
|
PADDLE_ENFORCE_EQ(grads_tensor.at(0).at(j).first,
|
||||||
|
grads_tensor.at(scope_idx).at(j).first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<const void *> lod_tensor_data;
|
||||||
|
for (size_t scope_idx = 0; scope_idx < place_num; ++scope_idx) {
|
||||||
|
auto data = grads_tensor.at(scope_idx).at(0).second->data<void>();
|
||||||
|
lod_tensor_data.emplace_back(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (platform::is_gpu_place(places_[0])) {
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
|
||||||
|
int nccl_dtype = platform::ToNCCLDataType(dtype);
|
||||||
|
std::vector<std::function<void()>> all_reduce_calls;
|
||||||
|
for (size_t i = 0; i < local_scopes_.size(); ++i) {
|
||||||
|
auto &p = places_[i];
|
||||||
|
void *buffer = const_cast<void *>(lod_tensor_data.at(i));
|
||||||
|
|
||||||
|
int dev_id = boost::get<platform::CUDAPlace>(p).device;
|
||||||
|
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
|
||||||
|
auto stream = nccl_ctx.stream();
|
||||||
|
auto comm = nccl_ctx.comm_;
|
||||||
|
all_reduce_calls.emplace_back([=] {
|
||||||
|
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
|
||||||
|
buffer, buffer, numel, static_cast<ncclDataType_t>(nccl_dtype),
|
||||||
|
ncclSum, comm, stream));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this->RunAndRecordEvent([&] {
|
||||||
|
if (all_reduce_calls.size() == 1UL) {
|
||||||
|
// Do not use NCCLGroup when manage NCCL by per thread per device
|
||||||
|
all_reduce_calls[0]();
|
||||||
|
} else {
|
||||||
|
platform::NCCLGroupGuard guard;
|
||||||
|
for (auto &call : all_reduce_calls) {
|
||||||
|
call();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
#else
|
||||||
|
PADDLE_THROW("Not compiled with CUDA");
|
||||||
|
#endif
|
||||||
|
} else {
|
||||||
|
// Special handle CPU only Operator's gradient. Like CRF
|
||||||
|
auto grad_name = grads_tensor.at(0).at(0).first;
|
||||||
|
auto &trg = *this->local_scopes_[0]
|
||||||
|
->FindVar(kLocalExecScopeName)
|
||||||
|
->Get<Scope *>()
|
||||||
|
->FindVar(grad_name)
|
||||||
|
->GetMutable<framework::LoDTensor>();
|
||||||
|
|
||||||
|
// Reduce All data to trg in CPU
|
||||||
|
ReduceBufferData func(lod_tensor_data, trg.data<void>(), numel);
|
||||||
|
VisitDataType(trg.type(), func);
|
||||||
|
|
||||||
|
for (size_t i = 1; i < local_scopes_.size(); ++i) {
|
||||||
|
auto &scope =
|
||||||
|
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||||
|
auto &p = places_[i];
|
||||||
|
auto *var = scope.FindVar(grad_name);
|
||||||
|
auto *dev_ctx = dev_ctxes_.at(p);
|
||||||
|
size_t size = numel * SizeOfType(trg.type());
|
||||||
|
RunAndRecordEvent(p, [&trg, var, dev_ctx, p, size] {
|
||||||
|
auto dst_ptr = var->GetMutable<framework::LoDTensor>()->data<void>();
|
||||||
|
platform::CPUPlace cpu_place;
|
||||||
|
memory::Copy(cpu_place, dst_ptr, cpu_place, trg.data<void>(), size);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FusedAllReduceOpHandle::GetGradLoDTensor(
|
||||||
|
const size_t &scope_idx, const std::vector<VarHandle *> &in_var_handles,
|
||||||
|
const std::vector<VarHandle *> &out_var_handles,
|
||||||
|
std::vector<std::pair<std::string, const LoDTensor *>> *grad_tensor) const {
|
||||||
|
auto *local_scope =
|
||||||
|
local_scopes_.at(scope_idx)->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||||
|
size_t place_num = places_.size();
|
||||||
|
|
||||||
|
for (size_t j = 0; j < in_var_handles.size(); j += place_num) {
|
||||||
|
auto var_name = in_var_handles[j]->name();
|
||||||
|
PADDLE_ENFORCE_EQ(var_name, out_var_handles[j]->name());
|
||||||
|
auto &lod_tensor = local_scope->FindVar(var_name)->Get<LoDTensor>();
|
||||||
|
PADDLE_ENFORCE_EQ(lod_tensor.place(), places_.at(scope_idx));
|
||||||
|
grad_tensor->emplace_back(std::make_pair(var_name, &lod_tensor));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FusedAllReduceOpHandle::GetDTypeAndNumel(
|
||||||
|
const std::vector<std::pair<std::string, const LoDTensor *>> &grad_tensor,
|
||||||
|
proto::VarType::Type *dtype, int64_t *numel) const {
|
||||||
|
*numel = 0;
|
||||||
|
for (size_t i = 0; i < grad_tensor.size(); ++i) {
|
||||||
|
// Get element number
|
||||||
|
int64_t len = grad_tensor.at(i).second->numel();
|
||||||
|
PADDLE_ENFORCE_GT(len, 0);
|
||||||
|
*numel += len;
|
||||||
|
|
||||||
|
// Get dtype
|
||||||
|
auto ele_type = grad_tensor.at(i).second->type();
|
||||||
|
if (i == 0) {
|
||||||
|
*dtype = ele_type;
|
||||||
|
}
|
||||||
|
PADDLE_ENFORCE_EQ(ele_type, *dtype);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string FusedAllReduceOpHandle::Name() const { return "fused_all_reduce"; }
|
||||||
|
} // namespace details
|
||||||
|
} // namespace framework
|
||||||
|
} // namespace paddle
|
@ -0,0 +1,76 @@
|
|||||||
|
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
#include "paddle/fluid/framework/details/op_handle_base.h"
|
||||||
|
#include "paddle/fluid/framework/lod_tensor.h"
|
||||||
|
#include "paddle/fluid/framework/scope.h"
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
#include "paddle/fluid/platform/nccl_helper.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
namespace framework {
|
||||||
|
namespace details {
|
||||||
|
|
||||||
|
struct FusedAllReduceOpHandle : public OpHandleBase {
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
FusedAllReduceOpHandle(ir::Node *node,
|
||||||
|
const std::vector<Scope *> &local_scopes,
|
||||||
|
const std::vector<platform::Place> &places,
|
||||||
|
const size_t num_of_all_reduce,
|
||||||
|
const platform::NCCLContextMap *ctxs);
|
||||||
|
#else
|
||||||
|
FusedAllReduceOpHandle(ir::Node *node,
|
||||||
|
const std::vector<Scope *> &local_scopes,
|
||||||
|
const std::vector<platform::Place> &places,
|
||||||
|
const size_t num_of_all_reduce);
|
||||||
|
#endif
|
||||||
|
std::string Name() const override;
|
||||||
|
|
||||||
|
// Delay and buffer nccl_all_reduce together can significantly increase
|
||||||
|
// performance. Disable this feature by returning false.
|
||||||
|
bool IsMultiDeviceTransfer() override { return true; };
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void RunImpl() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<Scope *> local_scopes_;
|
||||||
|
std::vector<platform::Place> places_;
|
||||||
|
size_t num_of_all_reduce_;
|
||||||
|
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||||
|
const platform::NCCLContextMap *nccl_ctxs_;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// Check the dtype of the input
|
||||||
|
void GetDTypeAndNumel(
|
||||||
|
const std::vector<std::pair<std::string, const LoDTensor *>> &g_tensor,
|
||||||
|
proto::VarType::Type *dtype, int64_t *total_num) const;
|
||||||
|
|
||||||
|
// Get gradient's name and LoDTensor
|
||||||
|
void GetGradLoDTensor(const size_t &scope_idx,
|
||||||
|
const std::vector<VarHandle *> &in_var_handles,
|
||||||
|
const std::vector<VarHandle *> &out_var_handles,
|
||||||
|
std::vector<std::pair<std::string, const LoDTensor *>>
|
||||||
|
*grad_tensor) const;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace details
|
||||||
|
} // namespace framework
|
||||||
|
} // namespace paddle
|
@ -0,0 +1,121 @@
|
|||||||
|
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from parallel_executor_test_base import TestParallelExecutorBase
|
||||||
|
import paddle.fluid as fluid
|
||||||
|
import paddle.fluid.core as core
|
||||||
|
import numpy as np
|
||||||
|
import paddle
|
||||||
|
import paddle.dataset.mnist as mnist
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def simple_fc_net(use_feed):
|
||||||
|
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
|
||||||
|
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
|
||||||
|
|
||||||
|
hidden = img
|
||||||
|
for _ in range(4):
|
||||||
|
hidden = fluid.layers.fc(
|
||||||
|
hidden,
|
||||||
|
size=200,
|
||||||
|
act='relu',
|
||||||
|
bias_attr=fluid.ParamAttr(
|
||||||
|
initializer=fluid.initializer.Constant(value=1.0)))
|
||||||
|
prediction = fluid.layers.fc(hidden, size=10, act='softmax')
|
||||||
|
loss = fluid.layers.cross_entropy(input=prediction, label=label)
|
||||||
|
loss = fluid.layers.mean(loss)
|
||||||
|
return loss
|
||||||
|
|
||||||
|
|
||||||
|
def fc_with_batchnorm(use_feed):
|
||||||
|
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
|
||||||
|
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
|
||||||
|
|
||||||
|
hidden = img
|
||||||
|
for _ in range(2):
|
||||||
|
hidden = fluid.layers.fc(
|
||||||
|
hidden,
|
||||||
|
size=200,
|
||||||
|
act='relu',
|
||||||
|
bias_attr=fluid.ParamAttr(
|
||||||
|
initializer=fluid.initializer.Constant(value=1.0)))
|
||||||
|
|
||||||
|
hidden = fluid.layers.batch_norm(input=hidden)
|
||||||
|
|
||||||
|
prediction = fluid.layers.fc(hidden, size=10, act='softmax')
|
||||||
|
loss = fluid.layers.cross_entropy(input=prediction, label=label)
|
||||||
|
loss = fluid.layers.mean(loss)
|
||||||
|
return loss
|
||||||
|
|
||||||
|
|
||||||
|
class TestMNIST(TestParallelExecutorBase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
os.environ['CPU_NUM'] = str(4)
|
||||||
|
|
||||||
|
def _init_data(self, random=True):
|
||||||
|
np.random.seed(5)
|
||||||
|
if random:
|
||||||
|
img = np.random.random(size=[32, 784]).astype(np.float32)
|
||||||
|
else:
|
||||||
|
img = np.ones(shape=[32, 784], dtype='float32')
|
||||||
|
label = np.ones(shape=[32, 1], dtype='int64')
|
||||||
|
return img, label
|
||||||
|
|
||||||
|
def _compare_fuse_all_reduce_ops(self, model, use_cuda, random_data=True):
|
||||||
|
if use_cuda and not core.is_compiled_with_cuda():
|
||||||
|
return
|
||||||
|
img, label = self._init_data(random_data)
|
||||||
|
|
||||||
|
def _optimizer(learning_rate=1e-6):
|
||||||
|
optimizer = fluid.optimizer.SGD(
|
||||||
|
learning_rate=learning_rate,
|
||||||
|
regularization=fluid.regularizer.L2Decay(1e-6))
|
||||||
|
return optimizer
|
||||||
|
|
||||||
|
not_fuse_op_first_loss, not_fuse_op_last_loss = self.check_network_convergence(
|
||||||
|
model,
|
||||||
|
feed_dict={"image": img,
|
||||||
|
"label": label},
|
||||||
|
use_cuda=use_cuda,
|
||||||
|
fuse_all_reduce_ops=False,
|
||||||
|
memory_opt=False,
|
||||||
|
optimizer=_optimizer)
|
||||||
|
fuse_op_first_loss, fuse_op_last_loss = self.check_network_convergence(
|
||||||
|
model,
|
||||||
|
feed_dict={"image": img,
|
||||||
|
"label": label},
|
||||||
|
use_cuda=use_cuda,
|
||||||
|
fuse_all_reduce_ops=True,
|
||||||
|
memory_opt=False,
|
||||||
|
optimizer=_optimizer)
|
||||||
|
|
||||||
|
for loss in zip(not_fuse_op_first_loss, fuse_op_first_loss):
|
||||||
|
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
|
||||||
|
for loss in zip(not_fuse_op_last_loss, fuse_op_last_loss):
|
||||||
|
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
|
||||||
|
|
||||||
|
def test_simple_fc_with_fuse_op(self):
|
||||||
|
self._compare_fuse_all_reduce_ops(simple_fc_net, True)
|
||||||
|
self._compare_fuse_all_reduce_ops(simple_fc_net, False)
|
||||||
|
|
||||||
|
def test_batchnorm_fc_with_fuse_op(self):
|
||||||
|
self._compare_fuse_all_reduce_ops(fc_with_batchnorm, True)
|
||||||
|
self._compare_fuse_all_reduce_ops(fc_with_batchnorm, False)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in new issue