|
|
|
@ -89,101 +89,25 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
|
|
|
|
|
|
|
|
|
|
bool is_forwarding = true;
|
|
|
|
|
for (auto *op : program.Block(0).AllOps()) {
|
|
|
|
|
bool change_forward = false;
|
|
|
|
|
if (!is_forwarding) {
|
|
|
|
|
// FIXME(yy): Do not hard code like this
|
|
|
|
|
if (op->OutputArgumentNames().size() == 1 &&
|
|
|
|
|
op->OutputArgumentNames()[0] == GradVarName(loss_var_name_)) {
|
|
|
|
|
continue; // Drop fill 1. for backward coeff;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// append send op if program is distributed trainer main program.
|
|
|
|
|
// always use the first device
|
|
|
|
|
if (!is_forwarding && op->Type() == "send") {
|
|
|
|
|
auto &p = places_[0];
|
|
|
|
|
auto *s = local_scopes_[0];
|
|
|
|
|
// FIXME(wuyi): send op always copy from GPU 0
|
|
|
|
|
result.ops_.emplace_back(new SendOpHandle(*op, s, p));
|
|
|
|
|
// Create inputs for output on original place and no ssa output
|
|
|
|
|
// is created for send op.
|
|
|
|
|
CreateOpHandleIOs(&result, *op, p, 0);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < places_.size(); ++i) {
|
|
|
|
|
auto &p = places_[i];
|
|
|
|
|
auto *s = local_scopes_[i];
|
|
|
|
|
|
|
|
|
|
result.ops_.emplace_back(new ComputationOpHandle(*op, s, p));
|
|
|
|
|
auto *op_handle = result.ops_.back().get();
|
|
|
|
|
CreateOpHandleIOs(&result, *op, p, i);
|
|
|
|
|
|
|
|
|
|
auto var_names = op->OutputArgumentNames();
|
|
|
|
|
|
|
|
|
|
if (is_forwarding) {
|
|
|
|
|
if (var_names.size() == 1 && var_names[0] == loss_var_name_) {
|
|
|
|
|
// Insert ScaleCost OpHandle
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
auto *communication_dev_ctx = nccl_ctxs_->DevCtx(p);
|
|
|
|
|
#else
|
|
|
|
|
auto *communication_dev_ctx =
|
|
|
|
|
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
op_handle = new ScaleLossGradOpHandle(local_scopes_.size(), s, p,
|
|
|
|
|
communication_dev_ctx);
|
|
|
|
|
result.ops_.emplace_back(op_handle);
|
|
|
|
|
|
|
|
|
|
// FIXME: Currently ScaleLossGradOp only use device_count as scale
|
|
|
|
|
// factor. So it does not depend on any other operators.
|
|
|
|
|
// VarHandle *loss = GetVarHandle(loss_var_name, place);
|
|
|
|
|
// loss->pending_ops_.emplace_back(op_handle);
|
|
|
|
|
// op_handle->inputs_.emplace_back(loss);
|
|
|
|
|
|
|
|
|
|
CreateOpOutput(&result, op_handle, GradVarName(loss_var_name_), p, i);
|
|
|
|
|
change_forward = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (change_forward) {
|
|
|
|
|
if (op->Type() == "send") {
|
|
|
|
|
// append send op if program is distributed trainer main program.
|
|
|
|
|
// always use the first device
|
|
|
|
|
CreateSendOp(&result, *op);
|
|
|
|
|
} else if (IsScaleLossOp(*op)) {
|
|
|
|
|
CreateScaleLossGradOp(&result);
|
|
|
|
|
is_forwarding = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!is_forwarding) {
|
|
|
|
|
auto var_names = op->OutputArgumentNames();
|
|
|
|
|
// Currently, we assume that once gradient is generated, it can be
|
|
|
|
|
// broadcast, and each gradient is only broadcast once. But there are no
|
|
|
|
|
// other cases, for example, we need to adjust the gradient according to
|
|
|
|
|
// the input when we get the gradient, which is not considered at present.
|
|
|
|
|
for (auto &og : var_names) {
|
|
|
|
|
if (grad_names_.count(og) != 0 &&
|
|
|
|
|
og_has_been_broadcast.count(og) == 0) { // is param grad
|
|
|
|
|
// Insert NCCL AllReduce Op
|
|
|
|
|
og_has_been_broadcast.insert(og);
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
result.ops_.emplace_back(
|
|
|
|
|
new NCCLAllReduceOpHandle(local_scopes_, places_, *nccl_ctxs_));
|
|
|
|
|
auto *op_handle = result.ops_.back().get();
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < places_.size(); ++i) {
|
|
|
|
|
auto &p = places_[i];
|
|
|
|
|
auto &vars = result.vars_[i][og];
|
|
|
|
|
|
|
|
|
|
if (vars.empty()) { // This device has no data. continue.
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
auto &prev_grad = vars[vars.size() - 1];
|
|
|
|
|
op_handle->AddInput(prev_grad.get());
|
|
|
|
|
|
|
|
|
|
auto var = new VarHandle(vars.size() - 1, i, og, p);
|
|
|
|
|
vars.emplace_back(var);
|
|
|
|
|
op_handle->AddOutput(var);
|
|
|
|
|
} else {
|
|
|
|
|
CreateComputationalOps(&result, *op);
|
|
|
|
|
if (!is_forwarding) {
|
|
|
|
|
// Currently, we assume that once gradient is generated, it can be
|
|
|
|
|
// broadcast, and each gradient is only broadcast once. But there are no
|
|
|
|
|
// other cases, for example, we need to adjust the gradient according to
|
|
|
|
|
// the input when we get the gradient, which is not considered at
|
|
|
|
|
// present.
|
|
|
|
|
for (auto &og : op->OutputArgumentNames()) {
|
|
|
|
|
if (IsParameterGradientOnce(og, &og_has_been_broadcast)) {
|
|
|
|
|
InsertNCCLAllReduceOp(&result, og);
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
PADDLE_ENFORCE("Not implemented");
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -207,7 +131,97 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return std::unique_ptr<SSAGraph>(graph);
|
|
|
|
|
} // namespace details
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MultiDevSSAGraphBuilder::InsertNCCLAllReduceOp(
|
|
|
|
|
SSAGraph *result, const std::string &og) const {
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
result->ops_.emplace_back(
|
|
|
|
|
new NCCLAllReduceOpHandle(local_scopes_, places_, *nccl_ctxs_));
|
|
|
|
|
auto *op_handle = result->ops_.back().get();
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < places_.size(); ++i) {
|
|
|
|
|
auto &p = places_[i];
|
|
|
|
|
auto &vars = result->vars_[i][og];
|
|
|
|
|
if (vars.empty()) { // This device has no data. continue.
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
auto &prev_grad = vars[vars.size() - 1];
|
|
|
|
|
op_handle->AddInput(prev_grad.get());
|
|
|
|
|
|
|
|
|
|
auto var = new VarHandle(vars.size() - 1, i, og, p);
|
|
|
|
|
vars.emplace_back(var);
|
|
|
|
|
op_handle->AddOutput(var);
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
PADDLE_ENFORCE("Not implemented");
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
|
|
|
|
|
const std::string &og,
|
|
|
|
|
std::unordered_set<std::string> *og_has_been_broadcast) const {
|
|
|
|
|
bool is_pg_once =
|
|
|
|
|
grad_names_.count(og) != 0 && og_has_been_broadcast->count(og) == 0;
|
|
|
|
|
if (is_pg_once) {
|
|
|
|
|
// Insert NCCL AllReduce Op
|
|
|
|
|
og_has_been_broadcast->insert(og);
|
|
|
|
|
}
|
|
|
|
|
return is_pg_once;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const {
|
|
|
|
|
for (size_t i = 0; i < places_.size(); ++i) {
|
|
|
|
|
// Insert ScaleCost OpHandle
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
auto *communication_dev_ctx = nccl_ctxs_->DevCtx(places_[i]);
|
|
|
|
|
#else
|
|
|
|
|
auto *communication_dev_ctx =
|
|
|
|
|
platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
auto *op_handle =
|
|
|
|
|
new ScaleLossGradOpHandle(local_scopes_.size(), local_scopes_[i],
|
|
|
|
|
places_[i], communication_dev_ctx);
|
|
|
|
|
result->ops_.emplace_back(op_handle);
|
|
|
|
|
|
|
|
|
|
// FIXME: Currently ScaleLossGradOp only use device_count as scale
|
|
|
|
|
// factor. So it does not depend on any other operators.
|
|
|
|
|
// VarHandle *loss = GetVarHandle(loss_var_name, place);
|
|
|
|
|
// loss->pending_ops_.emplace_back(op_handle);
|
|
|
|
|
// op_handle->inputs_.emplace_back(loss);
|
|
|
|
|
|
|
|
|
|
CreateOpOutput(result, op_handle, GradVarName(loss_var_name_), places_[i],
|
|
|
|
|
i);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MultiDevSSAGraphBuilder::CreateComputationalOps(SSAGraph *result,
|
|
|
|
|
const OpDesc &op) const {
|
|
|
|
|
for (size_t scope_idx = 0; scope_idx < places_.size(); ++scope_idx) {
|
|
|
|
|
auto p = places_[scope_idx];
|
|
|
|
|
auto s = local_scopes_[scope_idx];
|
|
|
|
|
result->ops_.emplace_back(new ComputationOpHandle(op, s, p));
|
|
|
|
|
CreateOpHandleIOs(result, op, p, scope_idx);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MultiDevSSAGraphBuilder::CreateSendOp(SSAGraph *result,
|
|
|
|
|
const OpDesc &op) const {
|
|
|
|
|
auto &p = places_[0];
|
|
|
|
|
auto *s = local_scopes_[0];
|
|
|
|
|
// FIXME(wuyi): send op always copy from GPU 0
|
|
|
|
|
result->ops_.emplace_back(new SendOpHandle(op, s, p));
|
|
|
|
|
// Create inputs for output on original place and no ssa output
|
|
|
|
|
// is created for send op.
|
|
|
|
|
CreateOpHandleIOs(result, op, p, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool MultiDevSSAGraphBuilder::IsScaleLossOp(const OpDesc &op) const {
|
|
|
|
|
// FIXME(yy): Do not hard code like this
|
|
|
|
|
return op.OutputArgumentNames().size() == 1 &&
|
|
|
|
|
op.OutputArgumentNames()[0] == GradVarName(loss_var_name_);
|
|
|
|
|
}
|
|
|
|
|
} // namespace details
|
|
|
|
|
} // namespace framework
|
|
|
|
|
} // namespace paddle
|
|
|
|
|