Support unused parameters in dynamic graph distributed (#30224)

revert-31562-mean
ShenLiang 4 years ago committed by GitHub
parent ca74dd3c69
commit a60f17b89d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

File diff suppressed because it is too large Load Diff

@ -18,14 +18,18 @@
#include <iostream>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/imperative/op_base.h"
#include "paddle/fluid/imperative/variable_wrapper.h"
#include "paddle/fluid/memory/memory.h"
#include "paddle/fluid/string/string_helper.h"
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/imperative/all_reduce.h"
@ -121,7 +125,7 @@ class Reducer {
const std::vector<std::vector<size_t>>& group_indices,
const std::vector<bool>& is_sparse_gradient,
std::shared_ptr<imperative::ParallelContext> parallel_ctx,
const std::vector<size_t>& group_size_limits);
const std::vector<size_t>& group_size_limits, bool find_unused_vars);
virtual ~Reducer() {}
@ -130,13 +134,18 @@ class Reducer {
void InitializeDenseGroups(const std::vector<size_t>& variable_indices_,
Group* p_group);
void PrepareForBackward();
void PrepareDeps(const std::unordered_set<GradOpNode*>& init_nodes);
void AddDistHook(VariableWrapper* var_warpper, size_t var_index);
void PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs);
void MarkDenseVarReady(size_t var_index, VariableWrapper* var_warpper);
void AddDistHook(size_t var_index);
void MarkSparseVarReady(size_t var_index, VariableWrapper* var_warpper);
// void MarkDenseVarReady(size_t var_index);
// void MarkSparseVarReady(size_t var_index);
void MarkVarReady(const size_t var_index, const bool is_used_var);
void MarkGroupReady(size_t group_index);
@ -148,17 +157,19 @@ class Reducer {
void CreateGroupEvents(int group_num);
inline bool NeedRebuildGroup() { return !has_rebuilt_group_; }
// Reducer Singleton
static std::shared_ptr<Reducer> SetInstance(
const std::vector<std::shared_ptr<imperative::VarBase>>& vars,
const std::vector<std::vector<size_t>>& group_indices,
const std::vector<bool>& is_sparse_gradient,
std::shared_ptr<imperative::ParallelContext> parallel_ctx,
const std::vector<size_t>& group_size_limits) {
const std::vector<size_t>& group_size_limits, bool find_unused_vars) {
if (NULL == s_instance_) {
s_instance_.reset(new paddle::imperative::Reducer(
vars, group_indices, is_sparse_gradient, parallel_ctx,
group_size_limits));
group_size_limits, find_unused_vars));
}
return s_instance_;
}
@ -194,6 +205,14 @@ class Reducer {
std::vector<std::shared_ptr<imperative::VarBase>> rebuild_vars_;
std::vector<int64_t> rebuild_var_indices_;
const std::vector<size_t> group_size_limits_;
// Following variables are to help unused vars
std::unordered_map<GradOpNode*, size_t> node_deps_;
std::unordered_map<VariableWrapper*, size_t> var_index_map_;
std::vector<size_t> unused_vars_;
bool has_marked_unused_vars_{false};
bool find_unused_vars_{false};
bool all_group_ready_{false};
};
std::vector<std::vector<size_t>> AssignGroupBySize(

@ -1358,18 +1358,18 @@ void BindImperative(py::module *m_ptr) {
py::class_<imperative::Reducer, std::shared_ptr<imperative::Reducer>>(
m, "Reducer", R"DOC()DOC")
.def(py::init(
[](const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
const std::vector<std::vector<size_t>> &group_indices,
const std::vector<bool> &is_sparse_gradient,
std::shared_ptr<imperative::ParallelContext> parallel_ctx,
const std::vector<size_t> &group_size_limits) {
return imperative::Reducer::SetInstance(
vars, group_indices, is_sparse_gradient, parallel_ctx,
group_size_limits);
}))
.def(py::init([](
const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
const std::vector<std::vector<size_t>> &group_indices,
const std::vector<bool> &is_sparse_gradient,
std::shared_ptr<imperative::ParallelContext> parallel_ctx,
const std::vector<size_t> &group_size_limits, bool find_unused_vars) {
return imperative::Reducer::SetInstance(
vars, group_indices, is_sparse_gradient, parallel_ctx,
group_size_limits, find_unused_vars);
}))
.def("prepare_for_backward", &imperative::Reducer::PrepareForBackward,
py::call_guard<py::gil_scoped_release>());
py::arg("vars"), py::call_guard<py::gil_scoped_release>());
m.def("assign_group_by_size", &imperative::AssignGroupBySize, py::arg("vars"),
py::arg("is_sparse_gradient"),

@ -26,6 +26,7 @@ from paddle.fluid.dygraph import to_variable, no_grad
from paddle.utils import deprecated
import warnings
import paddle
import itertools
__all__ = ["prepare_context", "ParallelEnv", "DataParallel"]
@ -465,17 +466,32 @@ class DataParallel(layers.Layer):
"ParallelContext must be initialized before. You should use init_parallel_env() before" \
"constructing the DataParallel."
# TODO(shenliang03) "find_unused_vars" interface will be exposed in the future
# to handle control flow to process unused parameters
find_unused_vars = True
self._reducer = core.Reducer(
trainable_parameters,
list(reversed(self.group_indices)), is_sparse_gradient,
parallel_helper.__parallel_ctx__clz__,
[self.last_comm_buffer_size, self.comm_buffer_size])
[self.last_comm_buffer_size, self.comm_buffer_size],
find_unused_vars)
def _find_varbase(self, obj):
if isinstance(obj, core.VarBase):
return [obj]
if isinstance(obj, (list, tuple)):
return itertools.chain(*map(self._find_varbase, obj))
if isinstance(obj, dict):
return itertools.chain(*map(self._find_varbase, obj.values()))
return []
def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1:
self._reducer.prepare_for_backward()
self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)))
return self._layers(*inputs, **kwargs)
return outputs
@deprecated(
since="2.0.0", reason="This method does not need to be called anymore.")

@ -18,6 +18,7 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer)
list(APPEND DIST_TEST_OPS test_fleet_pipeline_meta_optimizer)
list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
list(APPEND DIST_TEST_OPS test_gen_nccl_id_op)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests.
list(APPEND MIXED_DIST_TEST_OPS test_dgc_op)
@ -151,6 +152,7 @@ if (NOT ${WITH_GPU})
LIST(REMOVE_ITEM TEST_OPS test_rank_attention_op) # TODO(shenliang03): rank_attention_op support CPU device in future
LIST(REMOVE_ITEM TEST_OPS test_batch_fc_op) # TODO(shenliang03): batch_fc_op support CPU device in future
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_mnist) # TODO(Yancey1989): parallel dygraph support CPU device in future
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_unused_variables)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_se_resnext)
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sparse_embedding)
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sparse_embedding_over_height)
@ -813,6 +815,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212)
set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_transformer PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120)
endif()
endif()
if(WITH_GPU AND NOT WIN32)

@ -55,10 +55,18 @@ class SimpleNet(Layer):
dtype=dtype,
default_initializer=paddle.nn.initializer.Uniform(
low=-self.init_scale, high=self.init_scale))
self.tmp = self.create_parameter(
attr=paddle.ParamAttr(),
shape=[self.hidden_size, self.vocab_size],
dtype=dtype,
default_initializer=paddle.nn.initializer.Uniform(
low=-self.init_scale, high=self.init_scale))
def forward(self, input, label):
x_emb = self.embedding(input)
fc = paddle.matmul(x_emb, self.softmax_weight)
# use detach to stop gradient
fc = fc.detach()
fc = paddle.add(fc, self.softmax_bias)
projection = paddle.reshape(fc, shape=[-1, self.vocab_size])
loss = paddle.nn.functional.softmax_with_cross_entropy(

@ -0,0 +1,133 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import paddle
from test_dist_base import runtime_main, TestParallelDyGraphRunnerBase
from paddle.nn import Layer, Embedding
class SimpleNet(Layer):
def __init__(self,
hidden_size,
vocab_size,
num_steps=20,
init_scale=0.1,
is_sparse=False,
dtype="float32"):
super(SimpleNet, self).__init__()
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.init_scale = init_scale
self.num_steps = num_steps
self.embedding = Embedding(
self.vocab_size,
self.hidden_size,
sparse=True,
weight_attr=paddle.ParamAttr(
name='embedding_param',
initializer=paddle.nn.initializer.Uniform(
low=-init_scale, high=init_scale)))
self.softmax_weight = self.create_parameter(
attr=paddle.ParamAttr(),
shape=[self.hidden_size, self.vocab_size],
dtype=dtype,
default_initializer=paddle.nn.initializer.Uniform(
low=-self.init_scale, high=self.init_scale))
self.softmax_bias = self.create_parameter(
attr=paddle.ParamAttr(),
shape=[self.vocab_size],
dtype=dtype,
default_initializer=paddle.nn.initializer.Uniform(
low=-self.init_scale, high=self.init_scale))
# add tmp var
self.tmp = self.create_parameter(
attr=paddle.ParamAttr(),
shape=[self.vocab_size],
dtype=dtype,
default_initializer=paddle.nn.initializer.Uniform(
low=-self.init_scale, high=self.init_scale))
def forward(self, input, label):
x_emb = self.embedding(input)
fc = paddle.matmul(x_emb, self.softmax_weight)
# it use stop gradient to block gradient return
fc.stop_gradient = True
fc = paddle.add(fc, self.softmax_bias)
projection = paddle.reshape(fc, shape=[-1, self.vocab_size])
loss = paddle.nn.functional.softmax_with_cross_entropy(
logits=projection, label=label, soft_label=False)
loss = paddle.reshape(loss, shape=[-1, self.num_steps])
loss = paddle.mean(loss, axis=[0])
loss = paddle.sum(loss)
return {"loss": loss}
# global configs
batch_size = 4
batch_num = 200
hidden_size = 10
vocab_size = 1000
num_steps = 3
init_scale = 0.1
def fake_sample_reader():
def __reader__():
for i in range(batch_num):
x_data = np.arange(num_steps).astype('int64')
y_data = np.arange(1, 1 + num_steps).astype('int64')
yield x_data, y_data
return __reader__
class TestSparseEmbeddingUnusedVars(TestParallelDyGraphRunnerBase):
def get_model(self):
model = SimpleNet(
hidden_size=hidden_size,
vocab_size=vocab_size,
num_steps=num_steps,
init_scale=init_scale,
is_sparse=True)
train_reader = paddle.batch(
fake_sample_reader(), batch_size=batch_size, drop_last=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.001,
parameters=model.parameters())
return model, train_reader, optimizer
def run_one_loop(self, model, optimizer, batch):
x_data = np.array([x[0].reshape(3) for x in batch]).astype('int64')
y_data = np.array([x[1].reshape(3) for x in batch]).astype('int64')
x_data = x_data.reshape((-1, num_steps, 1))
y_data = y_data.reshape((-1, 1))
x = paddle.to_tensor(x_data)
y = paddle.to_tensor(y_data)
dy_loss = model(x, y)
return dy_loss["loss"]
if __name__ == "__main__":
runtime_main(TestSparseEmbeddingUnusedVars)

@ -0,0 +1,68 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import unittest
import paddle.fluid as fluid
from test_dist_base import TestDistBase
from spawn_runner_base import TestDistSpawnRunner
from parallel_dygraph_unused_variables import TestSparseEmbeddingUnusedVars
flag_name = os.path.splitext(__file__)[0]
class TestParallelDygraphMnist(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
def test_mnist(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_unused_variables.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
class TestSparseEmbeddingUnusedVarsSpawn(TestDistSpawnRunner):
def test_mnist_with_spawn(self):
if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4):
self.check_dist_result_with_spawn(
test_class=TestSparseEmbeddingUnusedVars, delta=1e-5)
class TestFleetDygraphMnist(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._gpu_fleet_api = True
def test_mnist(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_unused_variables.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save