You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mindspore/mindspore/parallel/_tensor.py

293 lines
9.9 KiB

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""load tensor and combine tensor"""
import numpy as np
from mindspore.common.tensor import Tensor
from ..communication.management import get_rank
def _get_tensor_strategy(dev_mat, tensor_map):
"""
Get split strategy by device arrangement and tensor map.
Args:
dev_mat (list): The device matrix.
tensor_map (list): The map relation between tensor and devices.
Returns:
List, the split strategy with the same size of np_tensor.
"""
tensor_strategy = []
for dim in tensor_map:
if dim == -1:
tensor_strategy.append(1)
else:
tensor_strategy.append(dev_mat[-dim-1])
return tensor_strategy
def _get_tensor_slice_index(device_arrangement, tensor_strategy, tensor_map, rank_index):
"""
Get the tensor slice index for the local device.
Args:
device_arrangement (list): The device matrix.
tensor_strategy (list): The split strategy with the same size of np_tensor.
tensor_map (list): The map relation between tensor and devices.
rank_index (int): The rank of local device.
Returns:
Integer, the index of the local device for tensor slices.
"""
device_coordinate = _rank_to_coordinate(rank_index, device_arrangement)
device_coordinate_new = _convert_to_new_device_coordinate(device_coordinate, tensor_map)
tensor_slice_index = _coordinate_to_rank(device_coordinate_new, tensor_strategy)
return tensor_slice_index
def _rank_to_coordinate(rank_index, device_arrangement):
"""
Convert rank index to device coordinate.
Args:
rank_index (int): The index of the local device.
device_arrangement (list): The device matrix.
Returns:
List, the coordinate for local device in the device matrix
"""
dim_len = len(device_arrangement)
device_coordinate = np.zeros(dim_len)
for i in range(dim_len):
size = device_arrangement[dim_len - 1 - i]
device_coordinate[dim_len - 1 - i] = rank_index % size
rank_index = int(rank_index / size)
return device_coordinate
def _coordinate_to_rank(device_coordinate, device_arrangement):
"""
Convert device coordinate to rank index.
Args:
device_coordinate (list): The coordinate for local device in the device matrix.
device_arrangement (list): The device matrix.
Returns:
Integer, the index of the local device for tensor slices.
"""
rank_index = 0
size = 1
for i in range(len(device_coordinate)):
rank_index += size * device_coordinate[len(device_coordinate) - 1 - i]
size *= device_arrangement[len(device_coordinate) - 1 - i]
return rank_index
def _convert_to_new_device_coordinate(device_coordinate, tensor_map):
"""
Convert device_coordinate according to the tensor map.
Args:
device_coordinate (list): The coordinate for local device in the device matrix.
tensor_map (list): The map relation between tensor and devices.
Returns:
List, the converted coordinate.
"""
device_coordinate_new = []
for i in range(len(tensor_map)):
if tensor_map[len(tensor_map) - 1 - i] != -1:
device_coordinate_new.insert(0, device_coordinate[len(device_coordinate) - 1 -
tensor_map[len(tensor_map) - 1 - i]])
else:
device_coordinate_new.insert(0, 0)
return device_coordinate_new
def _chunk_tensor(np_tensor, strategy, depth):
"""
Recursive function to chunk tensor.
Args:
np_tensor (NDarray): The matrix to be split.
strategy (list): The split strategy with the same size of np_tensor.
depth (int): Recursion depth.
Returns:
NDarray, the splited matrix.
Raises:
ValueError: If np_tensor can not be split by strategy.
"""
output = []
axis = len(np_tensor.shape) - depth
if np_tensor.shape[axis] % strategy[0] != 0:
raise ValueError("np_tensor can not be split by strategy!")
ret = list(np.split(np_tensor, strategy[0], axis))
if depth == 1:
return ret
for ret_ in ret:
output.extend(
_chunk_tensor(ret_, strategy[len(strategy) - depth + 1:len(strategy)], depth - 1))
return output
def _chunk_tensor_by_strategy(np_tensor, strategy):
"""
Split the input by strategy.
Args:
np_tensor (NDarray): The matrix to be split.
strategy (list): The split strategy with the same size of np_tensor.
Returns:
NDarray, the splited matrix.
Raises:
TypeError: If np_tensor is not ndarray
ValueError: If the length of np_tensor does not match the length of strategy.
"""
if not isinstance(np_tensor, np.ndarray):
raise TypeError("np_tensor should be ndarray!")
if len(strategy) != len(np_tensor.shape):
raise ValueError("The length of np_tensor does not match the length of strategy!")
return _chunk_tensor(np_tensor, strategy, len(strategy))
def _get_seed(dev_mat, tensor_map):
"""
Get the random seed for current slice.
Args:
dev_mat (list): The device matrix of devices.
tensor_map (list): The split strategy of tensor.
Returns:
Integer, the local random seed for this device.
"""
rank = get_rank()
tensor_strategy = _get_tensor_strategy(dev_mat, tensor_map)
tensor_slice_seed = _get_tensor_slice_index(dev_mat, tensor_strategy, tensor_map, rank)
return tensor_slice_seed
def _load_tensor(tensor, dev_mat, tensor_map):
"""
Get the tensor slice of the local device by the device matrix and the tensor map
Args:
tensor (Tensor): The tensor to be split.
dev_mat (list): The device matrix of devices.
tensor_map (list): The split strategy of tensor.
Returns:
Tensor, the sliced tensor.
Examples:
>>> tensor = Tensor(np.ones([32, 32]))
>>> dev_mat = [2, 4]
>>> tensor_map = [1, -1]
>>> tensor_slice = _load_tensor(tensor, dev_mat, tensor_map)
"""
rank = get_rank()
tensor_strategy = _get_tensor_strategy(dev_mat, tensor_map)
tensor_slice_index = _get_tensor_slice_index(dev_mat, tensor_strategy, tensor_map, rank)
np_tensor = tensor.asnumpy()
np_tensor_list = _chunk_tensor_by_strategy(np_tensor, tensor_strategy)
np_tensor_slice = np_tensor_list[int(tensor_slice_index)]
tensor_slice = Tensor(np_tensor_slice)
return tensor_slice
def _load_tensor_by_layout(tensor, layout):
"""
Load tensor by layout.
Args:
tensor (Tensor): The input tensor.
layout (list): The tensor layout in auto parallel.
Returns:
Tensor, the sliced tensor.
Raises:
TypeError: If layout is not list.
ValueError: If the length of layout is not 3.
"""
if not isinstance(layout, list):
raise TypeError("The layout should be list! layout is {}".format(layout))
if len(layout) != 3:
raise ValueError("The length of layout must be 3! layout is {}".format(layout))
dev_mat = layout[0]
tensor_map = layout[1]
if tensor.size() == 1:
return tensor
return _load_tensor(tensor, dev_mat, tensor_map)
def _reshape_param_data(param_data, dev_mat, tensor_map):
"""
Combine param slice by the device matrix and the tensor map, used in model parallel scenario.
Args:
param_data (Tensor): The tensor to be reshaped, generated from all the device from AllGatherParamNet.
dev_mat (list): The device matrix of devices.
tensor_map (list): The split strategy of tensor.
Returns:
Tensor, the combined tensor which with the whole data value.
Examples:
>>> param_data = _allgather_param_net(param_data)
>>> dev_mat = [2, 2]
>>> tensor_map = [1, 0]
>>> tensor = _reshape_param_data(tensor_slices, dev_mat, tensor_map)
"""
device_count = 1
for dim in dev_mat:
device_count *= dim
tensor_slices = np.split(param_data.asnumpy(), device_count, axis=0)
tensor_strategy = _get_tensor_strategy(dev_mat, tensor_map)
# get the actual number of slices,as: different devices may load the same slice
slice_count = 1
for dim in tensor_strategy:
slice_count *= dim
# reorder slices and remove duplicates based on device matrix and tensor_map
tensor_slices_new = list(range(slice_count))
for i in range(device_count):
slice_index = _get_tensor_slice_index(dev_mat, tensor_strategy, tensor_map, i)
tensor_slices_new[int(slice_index)] = np.array(tensor_slices[i])
# combine slices to generate complete parameter
dim_len = len(tensor_strategy)
for i in range(dim_len):
ele_count = int(len(tensor_slices_new) / tensor_strategy[dim_len - 1 - i])
tensor_slices_new_inner = []
for j in range(ele_count):
new_tensor = tensor_slices_new[j * tensor_strategy[dim_len - 1 - i]]
for l in range(j * tensor_strategy[dim_len - 1 - i] + 1,
(j + 1) * tensor_strategy[dim_len - 1 - i]):
new_tensor = np.concatenate((new_tensor, tensor_slices_new[l]), axis=dim_len - 1 - i)
tensor_slices_new_inner.insert(len(tensor_slices_new_inner), np.array(new_tensor))
tensor_slices_new = tensor_slices_new_inner
return Tensor(tensor_slices_new[0])