You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
714 lines
26 KiB
714 lines
26 KiB
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import numpy as np
|
|
import os
|
|
from ..fluid.layer_helper import LayerHelper
|
|
from ..fluid.framework import Variable, OpProtoHolder, in_dygraph_mode, convert_np_dtype_to_dtype_
|
|
from ..fluid.data_feeder import convert_dtype, check_variable_and_dtype, check_type, check_dtype
|
|
from ..fluid.layers.tensor import fill_constant
|
|
from ..fluid.layers import utils
|
|
from ..fluid.dygraph.parallel import prepare_context
|
|
import paddle
|
|
from .fleet import fleet
|
|
import paddle.fluid as fluid
|
|
import paddle.fluid.core as core
|
|
|
|
__all__ = [
|
|
'broadcast',
|
|
'all_reduce',
|
|
'reduce',
|
|
'all_gather',
|
|
'scatter',
|
|
'barrier',
|
|
'split',
|
|
'ReduceOp',
|
|
]
|
|
|
|
|
|
class ReduceOp:
|
|
"""
|
|
Specify the type of operation used for element-wise reductions.
|
|
It should be one of the following values:
|
|
|
|
ReduceOp.SUM
|
|
|
|
ReduceOp.MAX
|
|
|
|
ReduceOp.MIN
|
|
|
|
ReduceOp.PROD
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import ReduceOp
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data = np.array([[4, 5, 6], [4, 5, 6]])
|
|
else:
|
|
np_data = np.array([[1, 2, 3], [1, 2, 3]])
|
|
data = paddle.to_tensor(np_data)
|
|
paddle.distributed.all_reduce(data, op=ReduceOp.SUM)
|
|
out = data.numpy()
|
|
# [[5, 7, 9], [5, 7, 9]]
|
|
"""
|
|
SUM = 0
|
|
MAX = 1
|
|
MIN = 2
|
|
PROD = 3
|
|
|
|
|
|
class _Group():
|
|
"""The abstract representation of group."""
|
|
|
|
def __init__(self, rank, rank_num):
|
|
self.rank = rank
|
|
self.nranks = rank_num
|
|
|
|
|
|
# NOTE(chenweihang): Lazily initialized global group information
|
|
# If we initialize _default_group when import module, it will
|
|
# not update when we use spawn to run multi-process training
|
|
_default_group = None
|
|
|
|
|
|
def _get_global_default_group():
|
|
global _default_group
|
|
if _default_group is None:
|
|
_default_group = _Group(
|
|
int(os.getenv("PADDLE_TRAINER_ID", "0")),
|
|
int(os.getenv("PADDLE_TRAINERS_NUM", "1")))
|
|
return _default_group
|
|
|
|
|
|
def broadcast(tensor, src, group=0):
|
|
"""
|
|
|
|
Broadcast a tensor from the source to all others.
|
|
|
|
Args:
|
|
tensor (Tensor): The Tensor to send if current rank is the source, or the tensor to receive otherwise. Its data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
src (int): The source rank.
|
|
group (int): The process group to work on. It is Optional.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data = np.array([[4, 5, 6], [4, 5, 6]])
|
|
else:
|
|
np_data = np.array([[1, 2, 3], [1, 2, 3]])
|
|
data = paddle.to_tensor(np_data)
|
|
paddle.distributed.broadcast(data, 1)
|
|
out = data.numpy()
|
|
# [[1, 2, 3], [1, 2, 3]]
|
|
"""
|
|
if in_dygraph_mode():
|
|
return core.ops.c_broadcast(tensor, tensor, 'root', src,
|
|
'use_calc_stream', True, 'ring_id', group)
|
|
|
|
op_type = 'c_broadcast'
|
|
check_variable_and_dtype(
|
|
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
|
|
'broadcast')
|
|
if not isinstance(src, int) or not isinstance(group, int):
|
|
raise ValueError("Both the type of 'src' and 'group' for broadcast "
|
|
"should be int.")
|
|
|
|
helper = LayerHelper(op_type, **locals())
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [tensor]},
|
|
outputs={'Out': [tensor]},
|
|
attrs={
|
|
'root': src,
|
|
'use_calc_stream': True,
|
|
'ring_id': group,
|
|
})
|
|
|
|
|
|
def all_reduce(tensor, op=ReduceOp.SUM, group=0):
|
|
"""
|
|
|
|
Reduce a tensor over all ranks so that all get the result.
|
|
|
|
Args:
|
|
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD): Optional. The operation used.
|
|
group (int): Optional. The process group to work on.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import ReduceOp
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data = np.array([[4, 5, 6], [4, 5, 6]])
|
|
else:
|
|
np_data = np.array([[1, 2, 3], [1, 2, 3]])
|
|
data = paddle.to_tensor(np_data)
|
|
paddle.distributed.all_reduce(data)
|
|
out = data.numpy()
|
|
# [[5, 7, 9], [5, 7, 9]]
|
|
"""
|
|
if in_dygraph_mode():
|
|
if op == ReduceOp.SUM:
|
|
return core.ops.c_allreduce_sum(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group)
|
|
elif op == ReduceOp.MAX:
|
|
return core.ops.c_allreduce_max(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group)
|
|
elif op == ReduceOp.MIN:
|
|
return core.ops.c_allreduce_min(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group)
|
|
elif op == ReduceOp.PROD:
|
|
return core.ops.c_allreduce_prod(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group)
|
|
else:
|
|
raise ValueError("Unknown parameter: {}.".format(op))
|
|
|
|
check_variable_and_dtype(
|
|
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
|
|
'all_reduce')
|
|
if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]:
|
|
raise ValueError("The op for all_reduce must be one of educeOp.PROD, "
|
|
"ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.")
|
|
if op == ReduceOp.SUM:
|
|
op_type = 'c_allreduce_sum'
|
|
elif op == ReduceOp.MAX:
|
|
op_type = 'c_allreduce_max'
|
|
elif op == ReduceOp.MIN:
|
|
op_type = 'c_allreduce_min'
|
|
elif op == ReduceOp.PROD:
|
|
op_type = 'c_allreduce_prod'
|
|
if not isinstance(group, int):
|
|
raise ValueError("The type of 'group' for all_reduce should be int.")
|
|
helper = LayerHelper(op_type, **locals())
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [tensor]},
|
|
outputs={'Out': [tensor]},
|
|
attrs={'ring_id': group,
|
|
'use_calc_stream': True})
|
|
|
|
|
|
def reduce(tensor, dst, op=ReduceOp.SUM, group=0):
|
|
"""
|
|
|
|
Reduce a tensor to the destination from all others.
|
|
|
|
Args:
|
|
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
dst (int): The destination rank id.
|
|
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD): Optional. The operation used.
|
|
group (int): The id of the process group to work on.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data = np.array([[4, 5, 6], [4, 5, 6]])
|
|
else:
|
|
np_data = np.array([[1, 2, 3], [1, 2, 3]])
|
|
data = paddle.to_tensor(np_data)
|
|
paddle.distributed.reduce(data, 0)
|
|
out = data.numpy()
|
|
# [[5, 7, 9], [5, 7, 9]]
|
|
"""
|
|
if in_dygraph_mode():
|
|
if op == ReduceOp.SUM:
|
|
return core.ops.c_reduce_sum(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group, 'root_id', dst)
|
|
elif op == ReduceOp.MAX:
|
|
return core.ops.c_reduce_max(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group, 'root_id', dst)
|
|
elif op == ReduceOp.MIN:
|
|
return core.ops.c_reduce_min(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group, 'root_id', dst)
|
|
elif op == ReduceOp.PROD:
|
|
return core.ops.c_reduce_prod(tensor, tensor, 'use_calc_stream',
|
|
True, 'ring_id', group, 'root_id',
|
|
dst)
|
|
else:
|
|
raise ValueError("Unknown parameter: {}.".format(op))
|
|
|
|
op_type = 'c_reduce'
|
|
check_variable_and_dtype(
|
|
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
|
|
'all_reduce')
|
|
if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]:
|
|
raise ValueError("The op for reduce must be one of educeOp.PROD, "
|
|
"ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.")
|
|
|
|
if op == ReduceOp.SUM:
|
|
op_type = 'c_reduce_sum'
|
|
elif op == ReduceOp.MAX:
|
|
op_type = 'c_reduce_max'
|
|
elif op == ReduceOp.MIN:
|
|
op_type = 'c_reduce_min'
|
|
elif op == ReduceOp.PROD:
|
|
op_type = 'c_reduce_prod'
|
|
|
|
if not isinstance(dst, int) or not isinstance(group, int):
|
|
raise ValueError("Both the type of 'dst' and 'group' for reduce "
|
|
"should be int.")
|
|
helper = LayerHelper(op_type, **locals())
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [tensor]},
|
|
outputs={'Out': [tensor]},
|
|
attrs={
|
|
'ring_id': group,
|
|
'use_calc_stream': True,
|
|
'root_id': dst,
|
|
})
|
|
|
|
|
|
def all_gather(tensor_list, tensor, group=0):
|
|
"""
|
|
|
|
Gather tensors from all participators and all get the result.
|
|
|
|
Args:
|
|
tensor_list (list): A list of output Tensors. Every element in the list must be a Tensor whose data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
tensor (Tensor): The Tensor to send. Its data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
group (int): The id of the process group to work on.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
tensor_list = []
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data1 = np.array([[4, 5, 6], [4, 5, 6]])
|
|
np_data2 = np.array([[4, 5, 6], [4, 5, 6]])
|
|
data1 = paddle.to_tensor(np_data1)
|
|
data2 = paddle.to_tensor(np_data2)
|
|
paddle.distributed.all_gather(tensor_list, data1)
|
|
else:
|
|
np_data1 = np.array([[1, 2, 3], [1, 2, 3]])
|
|
np_data2 = np.array([[1, 2, 3], [1, 2, 3]])
|
|
data1 = paddle.to_tensor(np_data1)
|
|
data2 = paddle.to_tensor(np_data2)
|
|
paddle.distributed.all_gather(tensor_list, data2)
|
|
"""
|
|
op_type = 'c_allgather'
|
|
helper = LayerHelper(op_type, **locals())
|
|
out = helper.create_variable_for_type_inference(dtype=tensor.dtype)
|
|
_default_group = _get_global_default_group()
|
|
if in_dygraph_mode():
|
|
core.ops.c_allgather(tensor, out, 'use_calc_stream', True, 'ring_id',
|
|
group, 'nranks', _default_group.nranks)
|
|
else:
|
|
if not isinstance(tensor_list, list):
|
|
raise ValueError("The type of 'tensor_list' for all_gather "
|
|
"should be list.")
|
|
for elem in tensor_list:
|
|
check_variable_and_dtype(
|
|
elem, 'tensor_list',
|
|
['float16', 'float32', 'float64', 'int32', 'int64'],
|
|
'all_gather')
|
|
check_variable_and_dtype(
|
|
tensor, 'tensor',
|
|
['float16', 'float32', 'float64', 'int32', 'int64'], 'all_gather')
|
|
if not isinstance(group, int):
|
|
raise ValueError("The type of 'group' for all_gather "
|
|
"should be int.")
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [tensor]},
|
|
outputs={'Out': [out]},
|
|
attrs={
|
|
'ring_id': group,
|
|
'use_calc_stream': True,
|
|
'nranks': _default_group.nranks
|
|
})
|
|
|
|
tensor_list.extend(paddle.split(out, _default_group.nranks, 0))
|
|
|
|
|
|
def scatter(tensor, tensor_list=None, src=0, group=0):
|
|
"""
|
|
|
|
Scatter a tensor to all participators.
|
|
|
|
Args:
|
|
tensor (Tensor): The output Tensor. Its data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
tensor_list (list): A list of Tensors to scatter. Every element in the list must be a Tensor whose data type
|
|
should be float16, float32, float64, int32 or int64.
|
|
src (int): The source rank id.
|
|
group (int): The id of the process group to work on.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import numpy as np
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
np_data1 = np.array([7, 8, 9])
|
|
np_data2 = np.array([10, 11, 12])
|
|
else:
|
|
np_data1 = np.array([1, 2, 3])
|
|
np_data2 = np.array([4, 5, 6])
|
|
data1 = paddle.to_tensor(np_data1)
|
|
data2 = paddle.to_tensor(np_data2)
|
|
if paddle.distributed.ParallelEnv().local_rank == 0:
|
|
paddle.distributed.scatter(data1, src=1)
|
|
else:
|
|
paddle.distributed.scatter(data1, tensor_list=[data1, data2], src=1)
|
|
out = data1.numpy()
|
|
"""
|
|
op_type = 'c_scatter'
|
|
_default_group = _get_global_default_group()
|
|
rank = _default_group.rank
|
|
nranks = _default_group.nranks
|
|
if rank != src:
|
|
tensor_list = []
|
|
for _ in range(nranks):
|
|
tensor_list.append(tensor)
|
|
temp = paddle.concat(tensor_list, axis=0)
|
|
if in_dygraph_mode():
|
|
return core.ops.c_scatter(temp, tensor, 'use_calc_stream', True,
|
|
'ring_id', group, 'nranks',
|
|
_default_group.nranks, 'root', src)
|
|
check_variable_and_dtype(
|
|
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
|
|
'scatter')
|
|
if not isinstance(group, int) or not isinstance(src, int):
|
|
raise ValueError("Both the type of 'src' and 'group' for scatter "
|
|
"should be int.")
|
|
helper = LayerHelper(op_type, **locals())
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [temp]},
|
|
outputs={'Out': [tensor]},
|
|
attrs={
|
|
'ring_id': group,
|
|
'root': src,
|
|
'use_calc_stream': True,
|
|
'nranks': nranks,
|
|
})
|
|
|
|
|
|
def barrier(group=0):
|
|
"""
|
|
|
|
Barrier among all participators in the group.
|
|
|
|
Args:
|
|
group (int): The id of the process group to work on.
|
|
|
|
Returns:
|
|
None.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
paddle.distributed.barrier()
|
|
"""
|
|
op_type = 'barrier'
|
|
temp = fill_constant([1], dtype="int32", value="1")
|
|
if in_dygraph_mode():
|
|
return core.ops.barrier(temp, temp, 'ring_id', group)
|
|
if not isinstance(group, int):
|
|
raise ValueError("The type of 'group' for barrier must be int.")
|
|
helper = LayerHelper(op_type, **locals())
|
|
helper.append_op(
|
|
type=op_type,
|
|
inputs={'X': [temp]},
|
|
outputs={'Out': [temp]},
|
|
attrs={'ring_id': group})
|
|
|
|
|
|
def _parallel_linear(x, num_rows, num_cols, axis, param_attr, bias_attr,
|
|
gather_out, inner_rank, name):
|
|
"""
|
|
Parallel Linear
|
|
"""
|
|
if not name:
|
|
name = "fc_by_row_rank_%d" % inner_rank if axis == 0 else "fc_by_col_rank_%d" % inner_rank
|
|
else:
|
|
name = name + "_by_row_rank_%d" % inner_rank if axis == 0 else name + "_by_col_rank_%d" % inner_rank
|
|
linear = paddle.nn.Linear(
|
|
num_rows,
|
|
num_cols,
|
|
weight_attr=param_attr,
|
|
bias_attr=bias_attr,
|
|
name=name)
|
|
|
|
weight = linear.weight
|
|
weight.is_distributed = True
|
|
linear_out = linear(x)
|
|
startup_block = paddle.static.default_startup_program().global_block()
|
|
main_block = paddle.static.default_main_program().global_block()
|
|
startup_block.vars[weight.name].is_distributed = True
|
|
main_block.vars[weight.name].is_distributed = True
|
|
|
|
if gather_out:
|
|
if axis == 0:
|
|
paddle.distributed.all_reduce(linear_out, group=0)
|
|
else:
|
|
output = []
|
|
paddle.distributed.all_gather(output, linear_out, group=0)
|
|
linear_out = paddle.concat(output, axis=len(linear_out.shape) - 1)
|
|
return linear_out
|
|
|
|
|
|
def _parallel_embedding(x, per_part_embeddings, origin_size, param_attr,
|
|
inner_rank, num_partitions, name):
|
|
"""
|
|
Parallel Embedding
|
|
"""
|
|
if not name:
|
|
name = "emb_rank_%d" % inner_rank
|
|
else:
|
|
name = name + "_rank_%d" % inner_rank
|
|
|
|
origin_num_embeddings = origin_size[0]
|
|
embedding = paddle.nn.Embedding(
|
|
per_part_embeddings,
|
|
origin_size[1],
|
|
padding_idx=per_part_embeddings - 1,
|
|
sparse=False,
|
|
weight_attr=param_attr,
|
|
name=name)
|
|
|
|
origin_input_shape = x.shape
|
|
if len(origin_input_shape) == 2:
|
|
x = paddle.unsqueeze(x, axis=-1)
|
|
else:
|
|
assert origin_input_shape[-1] == 1, (
|
|
"The last dimension size of x must be 1.")
|
|
x_shard = paddle.shard_index(x, origin_num_embeddings, num_partitions,
|
|
inner_rank, per_part_embeddings - 1)
|
|
if len(origin_input_shape) == 2:
|
|
x_shard = paddle.squeeze(x_shard, axis=-1)
|
|
|
|
embedding.weight.is_distributed = True
|
|
emb_out = embedding(x_shard)
|
|
startup_block = paddle.static.default_startup_program().global_block()
|
|
main_block = paddle.static.default_main_program().global_block()
|
|
startup_block.vars[embedding.weight.name].is_distributed = True
|
|
main_block.vars[embedding.weight.name].is_distributed = True
|
|
paddle.distributed.all_reduce(emb_out, group=0)
|
|
return emb_out
|
|
|
|
|
|
def split(x,
|
|
size,
|
|
operation,
|
|
axis=0,
|
|
num_partitions=1,
|
|
gather_out=True,
|
|
weight_attr=None,
|
|
bias_attr=None,
|
|
name=None):
|
|
"""
|
|
|
|
Split the weight of the specified operation into multiple devices
|
|
and do the computation in parallel.
|
|
|
|
Now the following three cases are supported.
|
|
|
|
Case 1: Parallel Embedding
|
|
The weight of the embedding operation is a NxM matrix with N rows and M columns.
|
|
With parallel embedding, the weight is split into num_partitions partitions, each
|
|
of which is a matrix with (N/num_partitions + 1) rows and M column where the last
|
|
row as the padding idx.
|
|
|
|
Suppose we split the NxM weight into two partitons on device_0 and device_1
|
|
respectively. Then, one each device, the final weight has (N/2 + 1) rows with the
|
|
index range from 0 to N/2. On device_0, all values in the input within [0, N/2 -1]
|
|
keep unchanged and all other values are changed to N/2 which is the padding index and
|
|
are mapped to all zeros after embedding. In the same way, on device_1, the value V in the
|
|
input within [N/2, N-1] will be changed to (V - N/2), and all other values are changed
|
|
to N/2 and are mapped to all zeros after embedding. Finally, the results on the two
|
|
devices are sum-reduced.
|
|
|
|
Case 2: Row Parallel Linear
|
|
The weight of the linear operation is a NxM matrix with N rows and M columns.
|
|
With row parallel linear, the weight is split into num_partitions partitions, each
|
|
of which is a matrix with N/num_partitions rows and M column.
|
|
|
|
Case 3: Column Parallel Linear
|
|
The weight of the linear operation is a NxM matrix with N rows and M columns.
|
|
With column parallel linear, the weight is split into num_paratitions partitions, each
|
|
of which is a matrix with N rows and M/num_partitions column.
|
|
|
|
Args:
|
|
x (Tensor): Input tensor. It's data type should be float16, float32, float64, int32 or int64.
|
|
size (list|tuple): A list or tuple with two elements indicating the shape of the weight.
|
|
operation (str): The name of the operation. The supported operations are 'linear' and 'embedding'.
|
|
axis (int, Optional): Indicate along which axis to split the weight. Default: 0.
|
|
num_partitions (int, Optional): How many parts the weight is partitioned. Default: 1.
|
|
gather_out (bool, Optional): Whether to gather the output after computation. By default, the output
|
|
on each partitions will be gathered after computation. Default: True.
|
|
weight_attr (ParamAttr, Optional): The parameter attribute for the learnable
|
|
weights(Parameter) of the specified operation. Default: None.
|
|
bias_attr (ParamAttr, Optional): The parameter attribute for the bias
|
|
of the specified operation. Default: None.
|
|
name (str, Optional): The default value is None. Normally there is no need for user to set this
|
|
property. Default: None. For more information, please refer to :ref:`api_guide_Name`.
|
|
|
|
Returns:
|
|
Tensor.
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
import paddle
|
|
from paddle.distributed import init_parallel_env
|
|
|
|
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
|
|
init_parallel_env()
|
|
data = paddle.randint(0, 8, shape=[10,4])
|
|
emb_out = padle.distributed.split(
|
|
data,
|
|
(8, 8),
|
|
operation="embedding",
|
|
num_partitions=2)
|
|
"""
|
|
assert isinstance(size, (list, tuple)), (
|
|
"The type of size for "
|
|
"paddle.distributed.split must be list or tuple.")
|
|
assert len(size) == 2, ("Number of elements in size of "
|
|
"paddle.distributed.split must be two.")
|
|
assert isinstance(operation, str), ("The type of operation for "
|
|
"paddle.distributed.split must be str.")
|
|
supported_operations = [
|
|
'linear',
|
|
'embedding',
|
|
]
|
|
assert operation in supported_operations, (
|
|
"The operation for "
|
|
"paddle.distributed.split must be one of {}.".format(
|
|
supported_operations))
|
|
if in_dygraph_mode():
|
|
rank = paddle.distributed.get_rank()
|
|
nranks = paddle.distributed.get_world_size()
|
|
else:
|
|
assert fleet._role_maker, ("To use paddle.distributed.split, "
|
|
"you must call fleet.init() firstly.")
|
|
rank = fleet.worker_index()
|
|
nranks = fleet.worker_num()
|
|
|
|
# rank within a model parallel group
|
|
inner_rank = rank % num_partitions
|
|
|
|
if operation == "embedding":
|
|
assert axis == 0, ("We only support to split the weight of embedding "
|
|
"along the first axis now.")
|
|
per_part_size = (size[0] + num_partitions - 1) // num_partitions
|
|
last_part_size = size[0] - per_part_size * (num_partitions - 1)
|
|
if inner_rank == num_partitions - 1: per_part_size = last_part_size
|
|
per_part_size += 1 # make the last row as the padding index
|
|
|
|
emb_out = _parallel_embedding(x, per_part_size, size, weight_attr,
|
|
inner_rank, num_partitions, name)
|
|
return emb_out
|
|
else:
|
|
if axis == 0:
|
|
assert size[0] % num_partitions == 0, (
|
|
"Number of rows of the weight for linear ({}) must be"
|
|
" divisible by num_partitions ({})".format(size[0],
|
|
num_partitions))
|
|
per_part_size = size[0] // num_partitions
|
|
linear_size = (per_part_size, size[1])
|
|
assert x.shape[-1] == per_part_size, (
|
|
"The width ({}) of the input "
|
|
"x must be equal to the height ({}) of the weight. Maybe you "
|
|
"should split the input x using paddle.split.".format(
|
|
x.shape[-1], per_part_size))
|
|
|
|
elif axis == 1:
|
|
assert size[1] % num_partitions == 0, (
|
|
"Number of column of the weight for linear ({}) must be"
|
|
" divisible by num_partitions ({})".format(size[1],
|
|
num_partitions))
|
|
per_part_size = size[1] // num_partitions
|
|
linear_size = (size[0], per_part_size)
|
|
else:
|
|
raise ValueError("The value of axis must be 0 or 1, but the value "
|
|
"given is {}.".format(axis))
|
|
|
|
linear_out = _parallel_linear(
|
|
x,
|
|
linear_size[0],
|
|
linear_size[1],
|
|
axis,
|
|
weight_attr,
|
|
bias_attr,
|
|
gather_out,
|
|
inner_rank,
|
|
name=name)
|
|
return linear_out
|