commit
cdb315e9d8
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,154 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/framework/details/data_balance_op_handle.h"
|
||||
#include <algorithm>
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
DataBalanceOpHandle::DataBalanceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const platform::NCCLContextMap *ctxs)
|
||||
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {
|
||||
if (ctxs) {
|
||||
for (auto &p : places_) {
|
||||
this->SetDeviceContext(p, ctxs->DevCtx(p));
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
DataBalanceOpHandle::DataBalanceOpHandle(
|
||||
ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places)
|
||||
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {}
|
||||
#endif
|
||||
|
||||
std::string DataBalanceOpHandle::Name() const { return "data balance"; }
|
||||
|
||||
std::vector<std::array<int, 3>> DataBalanceOpHandle::GetBalancePlan(
|
||||
const std::vector<int> &device_sizes) {
|
||||
int device_num = device_sizes.size();
|
||||
int total_size = 0;
|
||||
int empty_num = 0;
|
||||
std::vector<std::array<int, 2>> size_device_vec;
|
||||
size_device_vec.reserve(device_num);
|
||||
for (int i = 0; i < device_num; ++i) {
|
||||
if (device_sizes[i] == 0) {
|
||||
++empty_num;
|
||||
}
|
||||
total_size += device_sizes[i];
|
||||
size_device_vec.push_back({{device_sizes[i], i}});
|
||||
}
|
||||
std::vector<std::array<int, 3>> res;
|
||||
if (empty_num == 0) {
|
||||
// No need to do data balance.
|
||||
return res;
|
||||
}
|
||||
if (total_size < device_num) {
|
||||
// No enough data.
|
||||
PADDLE_THROW_EOF();
|
||||
}
|
||||
std::sort(size_device_vec.begin(), size_device_vec.end(),
|
||||
[](const std::array<int, 2> &a, const std::array<int, 2> &b) {
|
||||
return a[0] > b[0];
|
||||
});
|
||||
int expected_device_size = total_size / device_num;
|
||||
int src_idx = 0;
|
||||
for (int dst_idx = device_num - empty_num; dst_idx < device_num; ++dst_idx) {
|
||||
if (size_device_vec[src_idx][0] <= expected_device_size) {
|
||||
++src_idx;
|
||||
PADDLE_ENFORCE_LT(
|
||||
src_idx, device_num - empty_num,
|
||||
"In current srategy an empty tensor should not be copy source.");
|
||||
}
|
||||
size_device_vec[src_idx][0] -= expected_device_size;
|
||||
size_device_vec[dst_idx][0] += expected_device_size;
|
||||
res.push_back({{size_device_vec[src_idx][1], size_device_vec[dst_idx][1],
|
||||
expected_device_size}});
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void DataBalanceOpHandle::RunImpl() {
|
||||
PADDLE_ENFORCE_GT(places_.size(), 1UL,
|
||||
"Data balance can only be enabled when the number of "
|
||||
"places to run larger than 1.");
|
||||
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
|
||||
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
|
||||
PADDLE_ENFORCE(in_var_handles.size() % places_.size() == 0);
|
||||
PADDLE_ENFORCE_EQ(
|
||||
in_var_handles.size(), out_var_handles.size(),
|
||||
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
|
||||
int data_num = in_var_handles.size() / places_.size();
|
||||
WaitInputVarGenerated();
|
||||
std::vector<std::vector<LoDTensor *>> lod_tensors(data_num);
|
||||
std::vector<int> device_sizes;
|
||||
for (int i = 0; i < static_cast<int>(in_var_handles.size()); ++i) {
|
||||
PADDLE_ENFORCE_EQ(in_var_handles[i]->name(), out_var_handles[i]->name(),
|
||||
"The name of input and output should be equal.");
|
||||
int place_idx = i / data_num;
|
||||
int data_idx = i % data_num;
|
||||
auto *local_scope =
|
||||
local_scopes_[place_idx]->FindVar(kLocalExecScopeName)->Get<Scope *>();
|
||||
auto *tensor_var = local_scope->FindVar(in_var_handles[i]->name());
|
||||
PADDLE_ENFORCE(tensor_var->IsType<LoDTensor>());
|
||||
auto *tensor = tensor_var->GetMutable<LoDTensor>();
|
||||
lod_tensors[data_idx].push_back(tensor);
|
||||
int ins_size =
|
||||
tensor->lod().empty() ? tensor->dims()[0] : tensor->NumElements();
|
||||
if (data_idx == 0) {
|
||||
device_sizes.emplace_back(ins_size);
|
||||
} else {
|
||||
PADDLE_ENFORCE_EQ(
|
||||
ins_size, device_sizes.at(place_idx),
|
||||
"All data on the same device shall have the same batch size.");
|
||||
}
|
||||
}
|
||||
const auto &balance_plan = GetBalancePlan(device_sizes);
|
||||
|
||||
for (const auto &trans : balance_plan) {
|
||||
for (int data_idx = 0; data_idx < data_num; ++data_idx) {
|
||||
LoDTensor *src_tensor = lod_tensors[data_idx][trans[0]];
|
||||
LoDTensor *dst_tensor = lod_tensors[data_idx][trans[1]];
|
||||
int trans_ins_size = trans[2];
|
||||
LoD src_lod = src_tensor->lod();
|
||||
int src_ins_size =
|
||||
src_lod.empty() ? src_tensor->dims()[0] : src_tensor->NumElements();
|
||||
int cut_point = src_ins_size - trans_ins_size;
|
||||
if (!src_lod.empty()) {
|
||||
for (auto &level : src_lod) {
|
||||
cut_point = level[cut_point];
|
||||
}
|
||||
}
|
||||
TensorCopySync(src_tensor->Slice(cut_point, src_tensor->dims()[0]),
|
||||
dst_tensor->place(), dst_tensor);
|
||||
src_tensor->ShareDataWith(src_tensor->Slice(0, cut_point));
|
||||
if (!src_lod.empty()) {
|
||||
dst_tensor->set_lod(SliceInLevel(
|
||||
src_lod, 0, src_ins_size - trans_ins_size, src_ins_size));
|
||||
src_tensor->set_lod(
|
||||
SliceInLevel(src_lod, 0, 0, src_ins_size - trans_ins_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -1,59 +0,0 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "paddle/fluid/framework/details/op_handle_base.h"
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/scope.h"
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
#include "paddle/fluid/platform/nccl_helper.h"
|
||||
#endif
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
struct DataBalanceOpHandle : public OpHandleBase {
|
||||
public:
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
DataBalanceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places,
|
||||
const platform::NCCLContextMap *ctxs);
|
||||
#else
|
||||
DataBalanceOpHandle(ir::Node *node, const std::vector<Scope *> &local_scopes,
|
||||
const std::vector<platform::Place> &places);
|
||||
#endif
|
||||
|
||||
std::string Name() const override;
|
||||
|
||||
bool IsMultiDeviceTransfer() override { return false; };
|
||||
|
||||
protected:
|
||||
void RunImpl() override;
|
||||
|
||||
private:
|
||||
// std::vector<(src_dev_id, dst_dev_id, trans_size)>
|
||||
std::vector<std::array<int, 3>> GetBalancePlan(
|
||||
const std::vector<int> &batch_size_per_device);
|
||||
|
||||
const std::vector<Scope *> local_scopes_;
|
||||
const std::vector<platform::Place> places_;
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
@ -0,0 +1,195 @@
|
||||
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
|
||||
#include "paddle/fluid/framework/details/container_cast.h"
|
||||
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
|
||||
#include "paddle/fluid/framework/details/multi_devices_helper.h"
|
||||
#include "paddle/fluid/framework/ir/graph_helper.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace framework {
|
||||
namespace details {
|
||||
|
||||
class FuseAllReduceOpPass : public ir::Pass {
|
||||
protected:
|
||||
std::unique_ptr<ir::Graph> ApplyImpl(
|
||||
std::unique_ptr<ir::Graph> graph) const override {
|
||||
ir::Graph &result = *graph;
|
||||
|
||||
auto &places = Get<const std::vector<platform::Place>>(kPlaces);
|
||||
auto &local_scopes = Get<const std::vector<Scope *>>(kLocalScopes);
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
auto *nccl_ctxs = &Get<platform::NCCLContextMap>(kNCCLCtxs);
|
||||
#endif
|
||||
|
||||
std::unordered_set<std::string> grads;
|
||||
auto ¶ms_grads = result.Get<ParamsAndGrads>(kParamsAndGrads);
|
||||
size_t num_of_all_reduce = params_grads.size();
|
||||
grads.reserve(num_of_all_reduce);
|
||||
for (auto p_g : params_grads) {
|
||||
grads.insert(p_g.second);
|
||||
}
|
||||
|
||||
size_t num_place = places.size();
|
||||
std::unordered_map<std::string, ir::Node *> all_reduce_ops;
|
||||
all_reduce_ops.reserve(grads.size());
|
||||
for (auto &node : result.Nodes()) {
|
||||
if (node->IsOp()) {
|
||||
PADDLE_ENFORCE(node->IsWrappedBy<OpHandleBase>());
|
||||
auto *all_reduce_op_handle =
|
||||
dynamic_cast<AllReduceOpHandle *>(&node->Wrapper<OpHandleBase>());
|
||||
if (all_reduce_op_handle) {
|
||||
auto inputs = DynamicCast<VarHandle>(all_reduce_op_handle->Inputs());
|
||||
PADDLE_ENFORCE_EQ(inputs.size(), num_place);
|
||||
// The inputs' name should be the same.
|
||||
auto &grad_name = inputs[0]->name();
|
||||
for (size_t i = 1; i < inputs.size(); ++i) {
|
||||
PADDLE_ENFORCE_EQ(inputs[i]->name(), grad_name,
|
||||
"The input name should be the same.");
|
||||
}
|
||||
PADDLE_ENFORCE_NE(grads.count(grad_name), static_cast<size_t>(0));
|
||||
all_reduce_ops.emplace(grad_name, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VLOG(10) << "Find all_reduce_ops: " << all_reduce_ops.size();
|
||||
if (all_reduce_ops.size() == 0) {
|
||||
return std::move(graph);
|
||||
}
|
||||
|
||||
PADDLE_ENFORCE_EQ(all_reduce_ops.size(), grads.size(),
|
||||
"The number of all_reduce OpHandle is not equal to the "
|
||||
"number of grads. Maybe some gradients are sparse type, "
|
||||
"it is not supported currently.");
|
||||
VLOG(10) << "Insert fused_all_reduce";
|
||||
|
||||
auto &group_grads_params =
|
||||
graph->Get<GroupGradsAndParams>(kGroupGradsAndParams);
|
||||
|
||||
for (auto &group_g_p : group_grads_params) {
|
||||
size_t group_size = group_g_p.size();
|
||||
PADDLE_ENFORCE_GT(group_size, static_cast<size_t>(0));
|
||||
std::vector<ir::Node *> group_all_reduce_ops;
|
||||
group_all_reduce_ops.reserve(group_size);
|
||||
for (auto &g_p : group_g_p) {
|
||||
group_all_reduce_ops.emplace_back(all_reduce_ops.at(g_p.first));
|
||||
}
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||
group_all_reduce_ops, nccl_ctxs, &result);
|
||||
#else
|
||||
InsertFusedAllReduce(places, local_scopes, group_size,
|
||||
group_all_reduce_ops, &result);
|
||||
#endif
|
||||
}
|
||||
return std::move(graph);
|
||||
}
|
||||
|
||||
void InsertFusedAllReduce(const std::vector<platform::Place> &places,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
const size_t num_of_all_reduce,
|
||||
const std::vector<ir::Node *> &all_reduce_ops,
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
const platform::NCCLContextMap *nccl_ctxs,
|
||||
#endif
|
||||
ir::Graph *result) const {
|
||||
std::vector<VarHandleBase *> inputs;
|
||||
std::vector<VarHandleBase *> outputs;
|
||||
for (auto &op : all_reduce_ops) {
|
||||
auto &op_handle = op->Wrapper<OpHandleBase>();
|
||||
inputs.insert(inputs.end(), op_handle.Inputs().begin(),
|
||||
op_handle.Inputs().end());
|
||||
// Remove output
|
||||
for_each(op_handle.Inputs().begin(), op_handle.Inputs().end(),
|
||||
[&op_handle](VarHandleBase *var_handle) {
|
||||
var_handle->RemoveOutput(&op_handle, op_handle.Node());
|
||||
});
|
||||
|
||||
outputs.insert(outputs.end(), op_handle.Outputs().begin(),
|
||||
op_handle.Outputs().end());
|
||||
// Remove Input
|
||||
for_each(
|
||||
op_handle.Outputs().begin(), op_handle.Outputs().end(),
|
||||
[](VarHandleBase *var_handle) { var_handle->ClearGeneratedOp(); });
|
||||
|
||||
result->RemoveNode(op_handle.Node());
|
||||
}
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||
local_scopes, nccl_ctxs, result);
|
||||
#else
|
||||
CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places,
|
||||
local_scopes, result);
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
void CreateFusedAllReduceOp(const std::vector<VarHandleBase *> &inputs,
|
||||
const std::vector<VarHandleBase *> &outputs,
|
||||
const size_t num_of_all_reduce,
|
||||
const std::vector<platform::Place> &places,
|
||||
const std::vector<Scope *> &local_scopes,
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
const platform::NCCLContextMap *nccl_ctxs,
|
||||
#endif
|
||||
ir::Graph *result) const {
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
auto *op_handle = new FusedAllReduceOpHandle(
|
||||
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||
local_scopes, places, num_of_all_reduce, nccl_ctxs);
|
||||
#else
|
||||
auto *op_handle = new FusedAllReduceOpHandle(
|
||||
result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation),
|
||||
local_scopes, places, num_of_all_reduce);
|
||||
#endif
|
||||
|
||||
for (auto in : inputs) {
|
||||
op_handle->AddInput(in);
|
||||
}
|
||||
|
||||
for (auto out : outputs) {
|
||||
op_handle->AddOutput(out);
|
||||
}
|
||||
|
||||
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
|
||||
if (!nccl_ctxs) {
|
||||
SetCommunicationContext(places, op_handle);
|
||||
}
|
||||
#else
|
||||
SetCommunicationContext(places, op_handle);
|
||||
#endif
|
||||
}
|
||||
|
||||
void SetCommunicationContext(const std::vector<platform::Place> &places,
|
||||
FusedAllReduceOpHandle *op_handle) const {
|
||||
for (size_t i = 0; i < places.size(); ++i) {
|
||||
op_handle->SetDeviceContext(
|
||||
places[i], platform::DeviceContextPool::Instance().Get(places[i]));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace framework
|
||||
} // namespace paddle
|
||||
|
||||
REGISTER_PASS(fuse_all_reduce_op_pass,
|
||||
paddle::framework::details::FuseAllReduceOpPass);
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue