commit
86511f518b
File diff suppressed because it is too large
Load Diff
@ -1,154 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/framework/details/data_balance_op_handle.h"
|
||||
#include <algorithm>
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
DataBalanceOpHandle::DataBalanceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const platform::NCCLContextMap *ctxs)
|
||||
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {
|
||||
if (ctxs) {
|
||||
for (auto &p : places_) {
|
||||
this->SetDeviceContext(p, ctxs->DevCtx(p));
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
DataBalanceOpHandle::DataBalanceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places)
|
||||
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {}
|
||||
#endif
|
||||
|
||||
std::string DataBalanceOpHandle::Name() const { return "data balance"; }
|
||||
|
||||
std::vector<std::array<int, 3>> DataBalanceOpHandle::GetBalancePlan(
|
||||
const std::vector<int> &device_sizes) {
|
||||
int device_num = device_sizes.size();
|
||||
int total_size = 0;
|
||||
int empty_num = 0;
|
||||
std::vector<std::array<int, 2>> size_device_vec;
|
||||
size_device_vec.reserve(device_num);
|
||||
for (int i = 0; i < device_num; ++i) {
|
||||
if (device_sizes[i] == 0) {
|
||||
++empty_num;
|
||||
}
|
||||
total_size += device_sizes[i];
|
||||
size_device_vec.push_back({{device_sizes[i], i}});
|
||||
}
|
||||
std::vector<std::array<int, 3>> res;
|
||||
if (empty_num == 0) {
|
||||
// No need to do data balance.
|
||||
return res;
|
||||
}
|
||||
if (total_size < device_num) {
|
||||
// No enough data.
|
||||
PADDLE_THROW_EOF();
|
||||
}
|
||||
std::sort(size_device_vec.begin(), size_device_vec.end(),
|
||||
[](const std::array<int, 2> &a, const std::array<int, 2> &b) {
|
||||
return a[0] > b[0];
|
||||
});
|
||||
int expected_device_size = total_size / device_num;
|
||||
int src_idx = 0;
|
||||
for (int dst_idx = device_num - empty_num; dst_idx < device_num; ++dst_idx) {
|
||||
if (size_device_vec[src_idx][0] <= expected_device_size) {
|
||||
++src_idx;
|
||||
PADDLE_ENFORCE_LT(
|
||||
src_idx, device_num - empty_num,
|
||||
"In current srategy an empty tensor should not be copy source.");
|
||||
}
|
||||
size_device_vec[src_idx][0] -= expected_device_size;
|
||||
size_device_vec[dst_idx][0] += expected_device_size;
|
||||
res.push_back({{size_device_vec[src_idx][1], size_device_vec[dst_idx][1],
|
||||
expected_device_size}});
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void DataBalanceOpHandle::RunImpl() {
|
||||
PADDLE_ENFORCE_GT(places_.size(), 1UL,
|
||||
"Data balance can only be enabled when the number of "
|
||||
"places to run larger than 1.");
|
||||
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
|
||||
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
|
||||
PADDLE_ENFORCE(in_var_handles.size() % places_.size() == 0);
|
||||
PADDLE_ENFORCE_EQ(
|
||||
in_var_handles.size(), out_var_handles.size(),
|
||||
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
|
||||
int data_num = in_var_handles.size() / places_.size();
|
||||
WaitInputVarGenerated();
|
||||
std::vector<std::vector<LoDTensor *>> lod_tensors(data_num);
|
||||
std::vector<int> device_sizes;
|
||||
for (int i = 0; i < static_cast<int>(in_var_handles.size()); ++i) {
|
||||
PADDLE_ENFORCE_EQ(in_var_handles[i]->name(), out_var_handles[i]->name(),
|
||||
"The name of input and output should be equal.");
|
||||
int place_idx = i / data_num;
|
||||
int data_idx = i % data_num;
|
||||
auto *local_scope =
|
||||
local_scopes_[place_idx]->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||
auto *tensor_var = local_scope->FindVar(in_var_handles[i]->name());
|
||||
PADDLE_ENFORCE(tensor_var->IsType<LoDTensor>());
|
||||
auto *tensor = tensor_var->GetMutable<LoDTensor>();
|
||||
lod_tensors[data_idx].push_back(tensor);
|
||||
int ins_size =
|
||||
tensor->lod().empty() ? tensor->dims()[0] : tensor->NumElements();
|
||||
if (data_idx == 0) {
|
||||
device_sizes.emplace_back(ins_size);
|
||||
} else {
|
||||
PADDLE_ENFORCE_EQ(
|
||||
ins_size, device_sizes.at(place_idx),
|
||||
"All data on the same device shall have the same batch size.");
|
||||
}
|
||||
}
|
||||
const auto &balance_plan = GetBalancePlan(device_sizes);
|
||||
|
||||
for (const auto &trans : balance_plan) {
|
||||
for (int data_idx = 0; data_idx < data_num; ++data_idx) {
|
||||
LoDTensor *src_tensor = lod_tensors[data_idx][trans[0]];
|
||||
LoDTensor *dst_tensor = lod_tensors[data_idx][trans[1]];
|
||||
int trans_ins_size = trans[2];
|
||||
LoD src_lod = src_tensor->lod();
|
||||
int src_ins_size =
|
||||
src_lod.empty() ? src_tensor->dims()[0] : src_tensor->NumElements();
|
||||
int cut_point = src_ins_size - trans_ins_size;
|
||||
if (!src_lod.empty()) {
|
||||
for (auto &level : src_lod) {
|
||||
cut_point = level[cut_point];
|
||||
}
|
||||
}
|
||||
TensorCopySync(src_tensor->Slice(cut_point, src_tensor->dims()[0]),
|
||||
dst_tensor->place(), dst_tensor);
|
||||
src_tensor->ShareDataWith(src_tensor->Slice(0, cut_point));
|
||||
if (!src_lod.empty()) {
|
||||
dst_tensor->set_lod(SliceInLevel(
|
||||
src_lod, 0, src_ins_size - trans_ins_size, src_ins_size));
|
||||
src_tensor->set_lod(
|
||||
SliceInLevel(src_lod, 0, 0, src_ins_size - trans_ins_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -1,59 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "paddle/fluid/framework/details/op_handle_base.h"
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/scope.h"
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
#include "paddle/fluid/platform/nccl_helper.h"
|
||||
#endif
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
struct DataBalanceOpHandle : public OpHandleBase {
|
||||
public:
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
DataBalanceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const platform::NCCLContextMap *ctxs);
|
||||
#else
|
||||
DataBalanceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places);
|
||||
#endif
|
||||
|
||||
std::string Name() const override;
|
||||
|
||||
bool IsMultiDeviceTransfer() override { return false; };
|
||||
|
||||
protected:
|
||||
void RunImpl() override;
|
||||
|
||||
private:
|
||||
// std::vector<(src_dev_id, dst_dev_id, trans_size)>
|
||||
std::vector<std::array<int, 3>> GetBalancePlan(
|
||||
const std::vector<int> &batch_size_per_device);
|
||||
|
||||
const std::vector<Scope *> local_scopes_;
|
||||
const std::vector<platform::Place> places_;
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -0,0 +1,195 @@
|
||||
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
|
||||
#include "paddle/fluid/framework/details/multi_devices_helper.h"
|
||||
#include "paddle/fluid/framework/ir/graph_helper.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
class FuseAllReduceOpPass : public ir::Pass {
|
||||
protected:
|
||||
std::unique_ptr<ir::Graph> ApplyImpl(
|
||||
std::unique_ptr<ir::Graph> graph) const override {
|
||||
ir::Graph &result = *graph;
|
||||
|
||||
auto &places = Get<const std::vector<platform::Place>>(kPlaces);
|
||||
auto &local_scopes = Get<const std::vector<Scope *>>(kLocalScopes);
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
auto *nccl_ctxs = &Get<platform::NCCLContextMap>(kNCCLCtxs);
|
||||
#endif
|
||||
|
||||
std::unordered_set<std::string> grads;
|
||||
auto ¶ms_grads = result.Get<ParamsAndGrads>(kParamsAndGrads);
|
||||
size_t num_of_all_reduce = params_grads.size();
|
||||
grads.reserve(num_of_all_reduce);
|
||||
for (auto p_g : params_grads) {
|
||||
grads.insert(p_g.second);
|
||||
}
|
||||
|
||||
size_t num_place = places.size();
|
||||
std::unordered_map<std::string, ir::Node *> all_reduce_ops;
|
||||
all_reduce_ops.reserve(grads.size());
|
||||
for (auto &node : result.Nodes()) {
|
||||
if (node->IsOp()) {
|
||||
PADDLE_ENFORCE(node->IsWrappedBy<OpHandleBase>());
|
||||
auto *all_reduce_op_handle =
|
||||
dynamic_cast<AllReduceOpHandle *>(&node->Wrapper<OpHandleBase>());
|
||||
if (all_reduce_op_handle) {
|
||||
auto inputs = DynamicCast<VarHandle>(all_reduce_op_handle->Inputs());
|
||||
PADDLE_ENFORCE_EQ(inputs.size(), num_place);
|
||||
// The inputs' name should be the same.
|
||||
auto &grad_name = inputs[0]->name();
|
||||
for (size_t i = 1; i < inputs.size(); ++i) {
|
||||
PADDLE_ENFORCE_EQ(inputs[i]->name(), grad_name,
|
||||
"The input name should be the same.");
|
||||
}
|
||||
PADDLE_ENFORCE_NE(grads.count(grad_name), static_cast<size_t>(0));
|
||||
all_reduce_ops.emplace(grad_name, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VLOG(10) << "Find all_reduce_ops: " << all_reduce_ops.size();
|
||||
if (all_reduce_ops.size() == 0) {
|
||||
return std::move(graph);
|
||||
}
|
||||
|
||||
PADDLE_ENFORCE_EQ(all_reduce_ops.size(), grads.size(),
|
||||
"The number of all_reduce OpHandle is not equal to the "
|
||||
"number of grads. Maybe some gradients are sparse type, "
|
||||
"it is not supported currently.");
|
||||
VLOG(10) << "Insert fused_all_reduce";
|
||||
|
||||
auto &group_grads_params =
|
||||
graph->Get<GroupGradsAndParams>(kGroupGradsAndParams);
|
||||
|
||||
for (auto &group_g_p : group_grads_params) {
|
||||
size_t group_size = group_g_p.size();
|
||||
PADDLE_ENFORCE_GT(group_size, static_cast<size_t>(0));
|
||||
std::vector<ir::Node *> group_all_reduce_ops;
|
||||
group_all_reduce_ops.reserve(group_size);
|
||||
for (auto &g_p : group_g_p) {
|
||||
group_all_reduce_ops.emplace_back(all_reduce_ops.at(g_p.first));
|
||||
}
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||
group_all_reduce_ops, nccl_ctxs, &result);
|
||||
#else
|
||||
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||
group_all_reduce_ops, &result);
|
||||
#endif
|
||||
}
|
||||
return std::move(graph);
|
||||
}
|
||||
|
||||
void InsertFusedAllReduce(const std::vector<platform::Place> &places,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
const size_t num_of_all_reduce,
|
||||
const std::vector<ir::Node *> &all_reduce_ops,
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
const platform::NCCLContextMap *nccl_ctxs,
|
||||
#endif
|
||||
ir::Graph *result) const {
|
||||
std::vector<VarHandleBase *> inputs;
|
||||
std::vector<VarHandleBase *> outputs;
|
||||
for (auto &op : all_reduce_ops) {
|
||||
auto &op_handle = op->Wrapper<OpHandleBase>();
|
||||
inputs.insert(inputs.end(), op_handle.Inputs().begin(),
|
||||
op_handle.Inputs().end());
|
||||
// Remove output
|
||||
for_each(op_handle.Inputs().begin(), op_handle.Inputs().end(),
|
||||
[&op_handle](VarHandleBase *var_handle) {
|
||||
var_handle->RemoveOutput(&op_handle, op_handle.Node());
|
||||
});
|
||||
|
||||
outputs.insert(outputs.end(), op_handle.Outputs().begin(),
|
||||
op_handle.Outputs().end());
|
||||
// Remove Input
|
||||
for_each(
|
||||
op_handle.Outputs().begin(), op_handle.Outputs().end(),
|
||||
[](VarHandleBase *var_handle) { var_handle->ClearGeneratedOp(); });
|
||||
|
||||
result->RemoveNode(op_handle.Node());
|
||||
}
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||
local_scopes, nccl_ctxs, result);
|
||||
#else
|
||||
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||
local_scopes, result);
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
void CreateFusedAllReduceOp(const std::vector<VarHandleBase *> &inputs,
|
||||
const std::vector<VarHandleBase *> &outputs,
|
||||
const size_t num_of_all_reduce,
|
||||
const std::vector<platform::Place> &places,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
const platform::NCCLContextMap *nccl_ctxs,
|
||||
#endif
|
||||
ir::Graph *result) const {
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
auto *op_handle = new FusedAllReduceOpHandle(
|
||||
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||
local_scopes, places, num_of_all_reduce, nccl_ctxs);
|
||||
#else
|
||||
auto *op_handle = new FusedAllReduceOpHandle(
|
||||
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||
local_scopes, places, num_of_all_reduce);
|
||||
#endif
|
||||
|
||||
for (auto in : inputs) {
|
||||
op_handle->AddInput(in);
|
||||
}
|
||||
|
||||
for (auto out : outputs) {
|
||||
op_handle->AddOutput(out);
|
||||
}
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
if (!nccl_ctxs) {
|
||||
SetCommunicationContext(places, op_handle);
|
||||
}
|
||||
#else
|
||||
SetCommunicationContext(places, op_handle);
|
||||
#endif
|
||||
}
|
||||
|
||||
void SetCommunicationContext(const std::vector<platform::Place> &places,
|
||||
FusedAllReduceOpHandle *op_handle) const {
|
||||
for (size_t i = 0; i < places.size(); ++i) {
|
||||
op_handle->SetDeviceContext(
|
||||
places[i], platform::DeviceContextPool::Instance().Get(places[i]));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
||||
|
||||
REGISTER_PASS(fuse_all_reduce_op_pass,
|
||||
paddle::framework::details::FuseAllReduceOpPass);
|
@ -1,51 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/framework/details/fuse_vars_op_handle.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
void FuseVarsOpHandle::RunImpl() {
|
||||
WaitInputVarGenerated(place_);
|
||||
|
||||
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
|
||||
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
|
||||
PADDLE_ENFORCE_EQ(in_var_handles.size(), 0UL);
|
||||
PADDLE_ENFORCE_EQ(out_var_handles.size() - 1, inputs_numel_.size(), "");
|
||||
|
||||
auto scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||
|
||||
auto out_var_handle = out_var_handles[0];
|
||||
auto out_var = scope->Var(out_var_handle->name());
|
||||
|
||||
auto out_tensor = out_var->GetMutable<LoDTensor>();
|
||||
out_tensor->Resize({total_numel_}).mutable_data(this->place_, type_);
|
||||
|
||||
int64_t s = 0;
|
||||
for (size_t i = 1; i < out_var_handles.size(); ++i) {
|
||||
auto out_name = out_var_handles[i]->name();
|
||||
auto out_t = scope->Var(out_name)->GetMutable<LoDTensor>();
|
||||
auto numel = this->inputs_numel_.at(out_name);
|
||||
out_t->ShareDataWith(out_tensor->Slice(s, s + numel));
|
||||
s += numel;
|
||||
}
|
||||
this->RunAndRecordEvent([] {});
|
||||
}
|
||||
|
||||
std::string FuseVarsOpHandle::Name() const { return "fuse vars"; }
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -1,65 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
#include "paddle/fluid/framework/details/op_handle_base.h"
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/scope.h"
|
||||
#include "paddle/fluid/platform/device_context.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
struct FuseVarsOpHandle : public OpHandleBase {
|
||||
public:
|
||||
FuseVarsOpHandle(ir::Node *node, Scope *local_scope,
|
||||
const platform::Place &place,
|
||||
const std::unordered_map<std::string, int64_t> &inputs_numel,
|
||||
const proto::VarType::Type var_type)
|
||||
: OpHandleBase(node),
|
||||
local_scope_(local_scope),
|
||||
place_(place),
|
||||
inputs_numel_(inputs_numel),
|
||||
type_(var_type) {
|
||||
total_numel_ = 0;
|
||||
for (auto in_numel : inputs_numel) {
|
||||
PADDLE_ENFORCE_GT(in_numel.second, 0);
|
||||
total_numel_ += in_numel.second;
|
||||
}
|
||||
}
|
||||
|
||||
std::string Name() const override;
|
||||
|
||||
bool IsMultiDeviceTransfer() override { return false; };
|
||||
|
||||
protected:
|
||||
void RunImpl() override;
|
||||
|
||||
private:
|
||||
Scope *local_scope_;
|
||||
const platform::Place place_;
|
||||
const std::unordered_map<std::string, int64_t> inputs_numel_;
|
||||
const proto::VarType::Type type_;
|
||||
int64_t total_numel_;
|
||||
};
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -0,0 +1,249 @@
|
||||
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
|
||||
#include <algorithm>
|
||||
#include <utility>
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
#include "paddle/fluid/framework/details/reduce_and_gather.h"
|
||||
#include "paddle/fluid/framework/details/variable_visitor.h"
|
||||
#include "paddle/fluid/platform/profiler.h"
|
||||
|
||||
DEFINE_bool(skip_fused_all_reduce_check, false, "");
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
typedef std::vector<std::vector<std::pair<std::string, const LoDTensor *>>>
|
||||
GradientAndLoDTensor;
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places, const size_t num_of_all_reduce,
|
||||
const platform::NCCLContextMap *ctxs)
|
||||
: OpHandleBase(node),
|
||||
local_scopes_(local_scopes),
|
||||
places_(places),
|
||||
num_of_all_reduce_(num_of_all_reduce),
|
||||
nccl_ctxs_(ctxs) {
|
||||
if (nccl_ctxs_) {
|
||||
for (auto &p : places_) {
|
||||
this->SetDeviceContext(p, nccl_ctxs_->DevCtx(p));
|
||||
}
|
||||
}
|
||||
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
|
||||
}
|
||||
#else
|
||||
|
||||
FusedAllReduceOpHandle::FusedAllReduceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places, const size_t num_of_all_reduce)
|
||||
: OpHandleBase(node),
|
||||
local_scopes_(local_scopes),
|
||||
places_(places),
|
||||
num_of_all_reduce_(num_of_all_reduce) {
|
||||
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void FusedAllReduceOpHandle::RunImpl() {
|
||||
platform::RecordEvent record_event(Name());
|
||||
|
||||
VLOG(4) << this->DebugString();
|
||||
|
||||
WaitInputVarGenerated();
|
||||
// The input: grad0(dev0), grad0(dev1), grad1(dev0), grad1(dev1)...
|
||||
// The output: grad0(dev0), grad0(dev1), grad1(dev0), grad1(dev1)...
|
||||
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
|
||||
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
|
||||
|
||||
size_t place_num = places_.size();
|
||||
PADDLE_ENFORCE_EQ(
|
||||
in_var_handles.size(), place_num * num_of_all_reduce_,
|
||||
"The NoDummyInputSize should be equal to the number of places.");
|
||||
PADDLE_ENFORCE_EQ(
|
||||
in_var_handles.size(), out_var_handles.size(),
|
||||
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
|
||||
|
||||
GradientAndLoDTensor grads_tensor;
|
||||
grads_tensor.resize(place_num);
|
||||
|
||||
int64_t numel = -1;
|
||||
auto dtype = static_cast<framework::proto::VarType::Type>(0);
|
||||
for (size_t scope_idx = 0; scope_idx < local_scopes_.size(); ++scope_idx) {
|
||||
auto &g_tensor = grads_tensor.at(scope_idx);
|
||||
g_tensor.reserve(num_of_all_reduce_);
|
||||
|
||||
GetGradLoDTensor(scope_idx, in_var_handles, out_var_handles, &g_tensor);
|
||||
|
||||
int64_t element_num = 0;
|
||||
framework::proto::VarType::Type ele_dtype =
|
||||
static_cast<framework::proto::VarType::Type>(0);
|
||||
GetDTypeAndNumel(g_tensor, &ele_dtype, &element_num);
|
||||
|
||||
if (numel == -1) {
|
||||
numel = element_num;
|
||||
}
|
||||
if (dtype == static_cast<framework::proto::VarType::Type>(0)) {
|
||||
dtype = ele_dtype;
|
||||
PADDLE_ENFORCE_NE(ele_dtype,
|
||||
static_cast<framework::proto::VarType::Type>(0));
|
||||
}
|
||||
PADDLE_ENFORCE_EQ(ele_dtype, dtype);
|
||||
|
||||
// Check whether the address space is contiguous.
|
||||
std::sort(
|
||||
g_tensor.begin(), g_tensor.end(),
|
||||
[](const std::pair<std::string, const LoDTensor *> &grad1,
|
||||
const std::pair<std::string, const LoDTensor *> &grad2) -> bool {
|
||||
return grad1.second->data<void>() < grad2.second->data<void>();
|
||||
});
|
||||
|
||||
for (size_t k = 1; k < g_tensor.size(); ++k) {
|
||||
const void *cur_address = g_tensor.at(k - 1).second->data<void>();
|
||||
int64_t len = g_tensor.at(k - 1).second->numel();
|
||||
auto offset = len * framework::SizeOfType(dtype);
|
||||
void *infer_next_address = reinterpret_cast<void *>(
|
||||
reinterpret_cast<uintptr_t>(cur_address) + offset);
|
||||
const void *next_address = g_tensor.at(k).second->data<void>();
|
||||
|
||||
VLOG(10) << string::Sprintf(
|
||||
"Input[%d](%s) address: 0X%02x, Input[%d](%s) address: 0X%02x, Infer "
|
||||
"input[%d] address: 0X%02x. The offset: %d",
|
||||
k - 1, g_tensor.at(k - 1).first, cur_address, g_tensor.at(k).first, k,
|
||||
next_address, k, infer_next_address, offset);
|
||||
PADDLE_ENFORCE_EQ(infer_next_address, next_address,
|
||||
"The address is not consistent.");
|
||||
}
|
||||
}
|
||||
|
||||
if (!FLAGS_skip_fused_all_reduce_check) {
|
||||
for (size_t scope_idx = 0; scope_idx < place_num; ++scope_idx) {
|
||||
for (size_t j = 1; j < num_of_all_reduce_; ++j) {
|
||||
PADDLE_ENFORCE_EQ(grads_tensor.at(0).at(j).first,
|
||||
grads_tensor.at(scope_idx).at(j).first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<const void *> lod_tensor_data;
|
||||
for (size_t scope_idx = 0; scope_idx < place_num; ++scope_idx) {
|
||||
auto data = grads_tensor.at(scope_idx).at(0).second->data<void>();
|
||||
lod_tensor_data.emplace_back(data);
|
||||
}
|
||||
|
||||
if (platform::is_gpu_place(places_[0])) {
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr.");
|
||||
int nccl_dtype = platform::ToNCCLDataType(dtype);
|
||||
std::vector<std::function<void()>> all_reduce_calls;
|
||||
for (size_t i = 0; i < local_scopes_.size(); ++i) {
|
||||
auto &p = places_[i];
|
||||
void *buffer = const_cast<void *>(lod_tensor_data.at(i));
|
||||
|
||||
int dev_id = boost::get<platform::CUDAPlace>(p).device;
|
||||
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
|
||||
auto stream = nccl_ctx.stream();
|
||||
auto comm = nccl_ctx.comm_;
|
||||
all_reduce_calls.emplace_back([=] {
|
||||
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
|
||||
buffer, buffer, numel, static_cast<ncclDataType_t>(nccl_dtype),
|
||||
ncclSum, comm, stream));
|
||||
});
|
||||
}
|
||||
|
||||
this->RunAndRecordEvent([&] {
|
||||
if (all_reduce_calls.size() == 1UL) {
|
||||
// Do not use NCCLGroup when manage NCCL by per thread per device
|
||||
all_reduce_calls[0]();
|
||||
} else {
|
||||
platform::NCCLGroupGuard guard;
|
||||
for (auto &call : all_reduce_calls) {
|
||||
call();
|
||||
}
|
||||
}
|
||||
});
|
||||
#else
|
||||
PADDLE_THROW("Not compiled with CUDA");
|
||||
#endif
|
||||
} else {
|
||||
// Special handle CPU only Operator's gradient. Like CRF
|
||||
auto grad_name = grads_tensor.at(0).at(0).first;
|
||||
auto &trg = *this->local_scopes_[0]
|
||||
->FindVar(kLocalExecScopeName)
|
||||
->Get<Scope *>()
|
||||
->FindVar(grad_name)
|
||||
->GetMutable<framework::LoDTensor>();
|
||||
|
||||
// Reduce All data to trg in CPU
|
||||
ReduceBufferData func(lod_tensor_data, trg.data<void>(), numel);
|
||||
VisitDataType(trg.type(), func);
|
||||
|
||||
for (size_t i = 1; i < local_scopes_.size(); ++i) {
|
||||
auto &scope =
|
||||
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||
auto &p = places_[i];
|
||||
auto *var = scope.FindVar(grad_name);
|
||||
auto *dev_ctx = dev_ctxes_.at(p);
|
||||
size_t size = numel * SizeOfType(trg.type());
|
||||
RunAndRecordEvent(p, [&trg, var, dev_ctx, p, size] {
|
||||
auto dst_ptr = var->GetMutable<framework::LoDTensor>()->data<void>();
|
||||
platform::CPUPlace cpu_place;
|
||||
memory::Copy(cpu_place, dst_ptr, cpu_place, trg.data<void>(), size);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FusedAllReduceOpHandle::GetGradLoDTensor(
|
||||
const size_t &scope_idx, const std::vector<VarHandle *> &in_var_handles,
|
||||
const std::vector<VarHandle *> &out_var_handles,
|
||||
std::vector<std::pair<std::string, const LoDTensor *>> *grad_tensor) const {
|
||||
auto *local_scope =
|
||||
local_scopes_.at(scope_idx)->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||
size_t place_num = places_.size();
|
||||
|
||||
for (size_t j = 0; j < in_var_handles.size(); j += place_num) {
|
||||
auto var_name = in_var_handles[j]->name();
|
||||
PADDLE_ENFORCE_EQ(var_name, out_var_handles[j]->name());
|
||||
auto &lod_tensor = local_scope->FindVar(var_name)->Get<LoDTensor>();
|
||||
PADDLE_ENFORCE_EQ(lod_tensor.place(), places_.at(scope_idx));
|
||||
grad_tensor->emplace_back(std::make_pair(var_name, &lod_tensor));
|
||||
}
|
||||
}
|
||||
|
||||
void FusedAllReduceOpHandle::GetDTypeAndNumel(
|
||||
const std::vector<std::pair<std::string, const LoDTensor *>> &grad_tensor,
|
||||
proto::VarType::Type *dtype, int64_t *numel) const {
|
||||
*numel = 0;
|
||||
for (size_t i = 0; i < grad_tensor.size(); ++i) {
|
||||
// Get element number
|
||||
int64_t len = grad_tensor.at(i).second->numel();
|
||||
PADDLE_ENFORCE_GT(len, 0);
|
||||
*numel += len;
|
||||
|
||||
// Get dtype
|
||||
auto ele_type = grad_tensor.at(i).second->type();
|
||||
if (i == 0) {
|
||||
*dtype = ele_type;
|
||||
}
|
||||
PADDLE_ENFORCE_EQ(ele_type, *dtype);
|
||||
}
|
||||
}
|
||||
|
||||
std::string FusedAllReduceOpHandle::Name() const { return "fused_all_reduce"; }
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -0,0 +1,76 @@
|
||||
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "paddle/fluid/framework/details/op_handle_base.h"
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/scope.h"
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
#include "paddle/fluid/platform/nccl_helper.h"
|
||||
#endif
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
struct FusedAllReduceOpHandle : public OpHandleBase {
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
FusedAllReduceOpHandle(ir::Node *node,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const size_t num_of_all_reduce,
|
||||
const platform::NCCLContextMap *ctxs);
|
||||
#else
|
||||
FusedAllReduceOpHandle(ir::Node *node,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const size_t num_of_all_reduce);
|
||||
#endif
|
||||
std::string Name() const override;
|
||||
|
||||
// Delay and buffer nccl_all_reduce together can significantly increase
|
||||
// performance. Disable this feature by returning false.
|
||||
bool IsMultiDeviceTransfer() override { return true; };
|
||||
|
||||
protected:
|
||||
void RunImpl() override;
|
||||
|
||||
private:
|
||||
std::vector<Scope *> local_scopes_;
|
||||
std::vector<platform::Place> places_;
|
||||
size_t num_of_all_reduce_;
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
const platform::NCCLContextMap *nccl_ctxs_;
|
||||
#endif
|
||||
|
||||
// Check the dtype of the input
|
||||
void GetDTypeAndNumel(
|
||||
const std::vector<std::pair<std::string, const LoDTensor *>> &g_tensor,
|
||||
proto::VarType::Type *dtype, int64_t *total_num) const;
|
||||
|
||||
// Get gradient's name and LoDTensor
|
||||
void GetGradLoDTensor(const size_t &scope_idx,
|
||||
const std::vector<VarHandle *> &in_var_handles,
|
||||
const std::vector<VarHandle *> &out_var_handles,
|
||||
std::vector<std::pair<std::string, const LoDTensor *>>
|
||||
*grad_tensor) const;
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
File diff suppressed because it is too large
Load Diff
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue