!12004 thor generalization code submit

From: @sl_wang
Reviewed-by: @guoqi1024
Signed-off-by:
pull/12004/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 1f3b059195

@ -18,7 +18,7 @@ Layer.
The high-level components(Cells) used to construct the neural network.
"""
from . import activation, normalization, container, conv, lstm, basic, embedding, pooling, image, quant, math, \
combined, timedistributed
combined, timedistributed, thor_layer
from .activation import *
from .normalization import *
from .container import *
@ -32,6 +32,7 @@ from .quant import *
from .math import *
from .combined import *
from .timedistributed import *
from .thor_layer import *
__all__ = []
__all__.extend(activation.__all__)
@ -47,3 +48,4 @@ __all__.extend(quant.__all__)
__all__.extend(math.__all__)
__all__.extend(combined.__all__)
__all__.extend(timedistributed.__all__)
__all__.extend(thor_layer.__all__)

File diff suppressed because it is too large Load Diff

@ -29,6 +29,7 @@ from .rmsprop import RMSProp
from .proximal_ada_grad import ProximalAdagrad
from .lazyadam import LazyAdam
from .ada_grad import Adagrad
from .thor import THOR
__all__ = ['Optimizer', 'Momentum', 'LARS', 'Adam', 'AdamWeightDecay', 'LazyAdam', 'AdamOffload',
'Lamb', 'SGD', 'FTRL', 'RMSProp', 'ProximalAdagrad', 'Adagrad']
'Lamb', 'SGD', 'FTRL', 'RMSProp', 'ProximalAdagrad', 'Adagrad', 'THOR']

File diff suppressed because it is too large Load Diff

@ -239,6 +239,7 @@ class DistributedGradReducer(Cell):
parameters (list): the parameters to be updated.
mean (bool): When mean is true, the mean coefficient (degree) would apply on gradients. Default: False.
degree (int): The mean coefficient. Usually it equals to device number. Default: None.
fusion_type (int): The type of all reduce fusion. Default: 1.
Raises:
ValueError: If degree is not a int or less than 0.
@ -319,7 +320,7 @@ class DistributedGradReducer(Cell):
256.0
"""
def __init__(self, parameters, mean=True, degree=None):
def __init__(self, parameters, mean=True, degree=None, fusion_type=1):
super(DistributedGradReducer, self).__init__(auto_prefix=False)
self.map_ = C.Map()
if degree is None:
@ -337,7 +338,7 @@ class DistributedGradReducer(Cell):
self.op_list = _init_allreduce_operators(len(parameters), split_indices)
else:
self.split_fusion = False
self.allreduce = AllReduce().add_prim_attr('fusion', 1)
self.allreduce = AllReduce().add_prim_attr('fusion', fusion_type)
self.allgather = AllGather(GlobalComm.WORLD_COMM_GROUP)
ps_filter = lambda x: x.is_param_ps
self.ps_parameters = tuple(ps_filter(x) for x in parameters)

@ -0,0 +1,19 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""convert to second order related classes and functions."""
from .convert_utils import ConvertNetUntils, ConvertModelUtils
__all__ = ["ConvertNetUntils", "ConvertModelUtils"]

@ -0,0 +1,157 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
convert utils for second order optimizer: thor
"""
import mindspore.nn as nn
import mindspore.common.dtype as mstype
from mindspore import context
class ConvertNetUntils():
"""
Convert net to thor layer net
"""
def __init__(self):
self._convert_method_map = {nn.Dense: self._convert_dense,
nn.Embedding: self._convert_embedding,
nn.Conv2d: self._convert_conv2d}
def _convert_dense(self, subcell):
"""
convert dense cell to second_order cell
"""
weight = subcell.weight
act_name = None
if subcell.activation_flag:
act_class = subcell.activation.__class__.__name__
act_name = act_class.lower()
if subcell.out_channels == 1001:
new_subcell = nn.Dense_Thor(in_channels=subcell.in_channels,
out_channels=subcell.out_channels,
weight_init=weight,
has_bias=subcell.has_bias,
bias_init='zeros',
activation=act_name)
else:
compute_type = mstype.float16
if context.get_context("device_target") == "GPU":
compute_type = mstype.float32
new_subcell = nn.Dense_Thor(in_channels=subcell.in_channels,
out_channels=subcell.out_channels,
weight_init=weight,
has_bias=subcell.has_bias,
bias_init='zeros',
activation=act_name).to_float(compute_type)
if subcell.has_bias:
new_subcell.bias = subcell.bias
return new_subcell
def _convert_embedding(self, subcell):
"""
convert embedding cell to second_order cell
"""
new_subcell = nn.Embedding_Thor(vocab_size=subcell.vocab_size,
embedding_size=subcell.embedding_size,
use_one_hot=False)
new_subcell.embedding_table = subcell.embedding_table
return new_subcell
def _convert_conv2d(self, subcell):
"""
convert conv2d cell to second_order cell
"""
out_channel = subcell.out_channels
in_channel = subcell.in_channels
kernel_size = subcell.kernel_size[0]
stride = subcell.stride
padding = subcell.padding
pad_mode = subcell.pad_mode
has_bias = subcell.has_bias
weight = subcell.weight
new_subcell = nn.Conv2d_Thor(in_channel, out_channel,
kernel_size=kernel_size, stride=stride, padding=padding, pad_mode=pad_mode,
has_bias=has_bias, weight_init=weight)
return new_subcell
def _convert_to_thor_net(self, net):
"""
convert net to thor net
"""
cells = net.name_cells()
change = False
for name in cells:
subcell = cells[name]
if subcell == net:
continue
elif isinstance(subcell, (nn.Dense_Thor, nn.Conv2d_Thor, nn.Embedding_Thor)):
continue
elif isinstance(subcell, (nn.Conv2dTranspose, nn.Conv1d, nn.Conv1dTranspose, nn.BatchNorm1d, nn.GroupNorm,
nn.GlobalBatchNorm, nn.LayerNorm, nn.BatchNorm2d, nn.MaxPool2d)):
continue
elif isinstance(subcell, (nn.Embedding, nn.Dense, nn.Conv2d)):
prefix = subcell.param_prefix
new_subcell = self._convert_method_map[type(subcell)](subcell)
print("subcell name: ", name, "prefix is", prefix, flush=True)
if isinstance(new_subcell, (nn.Dense_Thor, nn.Embedding_Thor, nn.Conv2d_Thor)):
print("convert to thor layer success.", flush=True)
new_subcell.update_parameters_name(prefix + '.')
net.insert_child_to_cell(name, new_subcell)
change = True
else:
self._convert_to_thor_net(subcell)
if isinstance(net, nn.SequentialCell) and change:
print("is nn.SequentialCell and change")
net.cell_list = list(net.cells())
def convert_to_thor_net(self, net):
"""
api for convert net to thor net
"""
net.update_cell_prefix()
self._convert_to_thor_net(net)
net.update_cell_type("second_order")
class ConvertModelUtils():
"""
convert model to thor model utils
"""
def convert_to_thor_model(self, model, network, loss_fn=None, optimizer=None, metrics=None, amp_level="O0",
loss_scale_manager=None, keep_batchnorm_fp32=False, frequency=834):
"""
api for convert model to thor model
"""
optim_name = type(optimizer).__name__
if optim_name in ("THOR_Ascend", "THOR_GPU"):
from .model_thor import Model_Thor
if isinstance(network, nn.TrainOneStepCell):
model = Model_Thor(network=network, frequency=frequency)
else:
model = Model_Thor(network=network, loss_fn=loss_fn, optimizer=optimizer, amp_level=amp_level,
loss_scale_manager=loss_scale_manager,
keep_batchnorm_fp32=keep_batchnorm_fp32, metrics=metrics, frequency=frequency)
return model

@ -0,0 +1,188 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Dataset help for minddata dataset"""
import math
import os
from mindspore._checkparam import Validator
from mindspore import context
from mindspore.train._utils import _exec_datagraph, _get_types_and_shapes
from mindspore.nn.wrap import GetNextSingleOp
from mindspore.parallel._utils import _get_device_num, _need_to_full, _to_full_shapes
def _send_data(dataset, epoch_num):
"""Engine dataset to write data to tdt queue."""
if not hasattr(dataset, '__has_sent__'):
exec_dataset = dataset.__transfer_dataset__
exec_dataset.send(epoch_num)
dataset.__has_sent__ = True
def _send_data_no_flag(dataset, epoch_num):
"""Engine dataset to write data to tdt queue directly."""
exec_dataset = dataset.__transfer_dataset__
exec_dataset.send(epoch_num)
class DatasetHelper:
"""
Help function to use the MindData dataset.
According to different contexts, change the iterations of dataset and use the same iteration for loop in different
contexts.
Note:
The iteration of DatasetHelper will provide one epoch data.
Args:
dataset (DataSet): The training dataset iterator.
dataset_sink_mode (bool): If true use GetNext to fetch the data, or else feed the data from host. Default: True.
sink_size (int): Control the amount of data in each sink.
If sink_size=-1, sink the complete dataset for each epoch.
If sink_size>0, sink sink_size data for each epoch. Default: -1.
epoch_num (int): Control the number of epoch data to send. Default: 1.
Examples:
>>> dataset_helper = DatasetHelper(dataset)
>>> for inputs in dataset_helper:
>>> outputs = network(*inputs)
"""
def __init__(self, dataset, dataset_sink_mode=True, sink_size=-1, epoch_num=1, iter_first_order=1):
dataset_sink_mode = Validator.check_bool(dataset_sink_mode)
Validator.check_is_int(sink_size)
if sink_size < -1 or sink_size == 0:
raise ValueError("The sink_size must be -1 or positive, but got sink_size {}.".format(sink_size))
if dataset_sink_mode:
if context.get_context("device_target") == "Ascend":
iterclass = _DatasetIterMSLoopSink
self.iter = iterclass(dataset, sink_size, epoch_num, iter_first_order)
elif context.get_context("device_target") == "GPU":
iterclass = _DatasetIterMS
self.iter = iterclass(dataset, sink_size, epoch_num)
elif context.get_context("device_target") == "CPU":
raise RuntimeError("Currently dataset sink mode is not supported when the device target is CPU.")
def __iter__(self):
return self.iter.__iter__()
# A temp solution for loop sink. Delete later
def types_shapes(self):
"""Get the types and shapes from dataset on the current configuration."""
return self.iter.types_shapes()
def sink_size(self):
"""Get sink_size for each iteration."""
return self.iter.get_sink_size()
def stop_send(self):
"""Free up resources about data sink."""
self.iter.stop_send()
class _DatasetIter:
"""Base iter for dataset helper"""
def __init__(self, dataset, sink_size, epoch_num):
self.dataset = dataset
self.sink_size = sink_size
self.sink_count = 1
if not hasattr(dataset, '__transfer_dataset__'):
if hasattr(dataset, '__loop_size__'):
self.sink_size = dataset.__loop_size__
dataset.__transfer_dataset__ = _exec_datagraph(dataset, self.sink_size)
if not hasattr(dataset, '__no_send__'):
_send_data(dataset, epoch_num)
else:
_send_data_no_flag(dataset, epoch_num)
self.stop_send = dataset.__transfer_dataset__.stop_send
self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset)
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.index >= self.sink_count:
raise StopIteration()
self.index += 1
return self.op()
def types_shapes(self):
return self.dataset_types, self.dataset_shapes
def get_sink_count(self, dataset):
sink_count = 1
if hasattr(dataset, '__loop_size__'):
loop_size = dataset.__loop_size__
if loop_size <= dataset.get_dataset_size() and dataset.get_dataset_size() % loop_size != 0:
raise ValueError(f'Dataset size {dataset.get_dataset_size()} and '
f'sink_size {loop_size} are not matched.')
sink_count = math.ceil(dataset.get_dataset_size() / loop_size)
return sink_count
def get_sink_size(self):
"""get sink_size to device"""
sink_size = 1
if hasattr(self.dataset, '__loop_size__'):
sink_size = self.dataset.__loop_size__
else:
if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend":
if self.sink_size > 0:
sink_size = self.sink_size
else:
sink_size = self.dataset.get_dataset_size()
return sink_size
class _DatasetIterMSLoopSink(_DatasetIter):
"""Iter for context when device_target is Ascend"""
def __init__(self, dataset, sink_size, epoch_num, iter_first_order):
super().__init__(dataset, sink_size, epoch_num)
sink_count = 1
if hasattr(dataset, '__loop_size__'):
loop_size = dataset.__loop_size__ + iter_first_order
sink_count = int(sink_size / loop_size) * 2
self.sink_count = sink_count
ms_role = os.getenv("MS_ROLE")
if ms_role in ("MS_PSERVER", "MS_SCHED"):
self.sink_count = 1
# for self._parallel_mode equal to semi_auto_parallel or auto_parallel, and not using full_batch,
# use a complete tensor to compile, and slice tensor to run. The batch dimension of tensors for
# compile is device_number times the batch dimension of tensors for run. Now only support LoopSink.
if _need_to_full():
device_num = _get_device_num()
self.dataset_shapes = _to_full_shapes(self.dataset_shapes, device_num)
def op():
return tuple()
self.op = op
class _DatasetIterMS(_DatasetIter):
"""Iter for MS when enable_loop_sink is False."""
def __init__(self, dataset, sink_size, epoch_num):
super().__init__(dataset, sink_size, epoch_num)
if sink_size > 0:
self.sink_count = sink_size
else:
self.sink_count = dataset.get_dataset_size()
queue_name = dataset.__transfer_dataset__.queue_name
self.op = GetNextSingleOp(self.dataset_types, self.dataset_shapes, queue_name)

@ -0,0 +1,236 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Model."""
import math
from mindspore.train.callback import RunContext
from mindspore import context
from mindspore.context import ParallelMode
from mindspore.train.model import Model
from mindspore.train.dataset_helper import connect_network_with_dataset
from mindspore.parallel._utils import _need_to_full, _to_full_tensor
from mindspore.common.dtype import pytype_to_dtype
from mindspore._c_expression import init_exec_dataset
from .dataset_helper import DatasetHelper
def _convert_type(types):
"""
Convert from numpy type to tensor type.
Args:
types (list): Numpy type list of element in dataset.
Returns:
list, list of element in dataset.
"""
ms_types = []
for np_type in types:
ms_type = pytype_to_dtype(np_type)
ms_types.append(ms_type)
return ms_types
def _get_types_and_shapes(dataset):
"""Get dataset types and shapes."""
dataset_types = _convert_type(dataset.output_types())
dataset_shapes = dataset.output_shapes()
return dataset_types, dataset_shapes
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):
"""Initialize and execute the dataset graph."""
batch_size = exec_dataset.get_batch_size()
input_indexs = exec_dataset.input_indexs
# transform data format
dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)
init_exec_dataset(exec_dataset.__transfer_dataset__.queue_name,
dataset_size,
batch_size,
dataset_types,
dataset_shapes,
input_indexs,
phase=phase,
need_run=False)
class Model_Thor(Model):
"""
High-Level API for Training or Testing.
`Model` groups layers into an object with training and inference features.
Args:
network (Cell): A training or testing network.
loss_fn (Cell): Objective function, if loss_fn is None, the
network should contain the logic of loss and grads calculation, and the logic
of parallel if needed. Default: None.
optimizer (Cell): Optimizer for updating the weights. Default: None.
metrics (Union[dict, set]): A Dictionary or a set of metrics to be evaluated by the model during
training and testing. eg: {'accuracy', 'recall'}. Default: None.
eval_network (Cell): Network for evaluation. If not defined, `network` and `loss_fn` would be wrapped as
`eval_network`. Default: None.
eval_indexes (list): When defining the `eval_network`, if `eval_indexes` is None, all outputs of the
`eval_network` would be passed to metrics, otherwise `eval_indexes` must contain three
elements, including the positions of loss value, predicted value and label. The loss
value would be passed to the `Loss` metric, the predicted value and label would be passed
to other metric. Default: None.
amp_level (str): Option for argument `level` in `mindspore.amp.build_train_network`, level for mixed
precision training. Supports [O0, O2, O3]. Default: "O0".
- O0: Do not change.
- O2: Cast network to float16, keep batchnorm run in float32, using dynamic loss scale.
- O3: Cast network to float16, with additional property 'keep_batchnorm_fp32=False'.
O2 is recommended on GPU, O3 is recommended on Ascend.
loss_scale_manager (Union[None, LossScaleManager]): If it is None, the loss would not be scaled. Otherwise,
scale the loss by LossScaleManager. It is a key argument.
e.g. Use `loss_scale_manager=None` to set the value.
keep_batchnorm_fp32 (bool): Keep Batchnorm running in `float32`. If it is set to true, the level setting before
will be overwritten. Default: True.
"""
def __init__(self, network, loss_fn=None, optimizer=None, metrics=None, eval_network=None,
eval_indexes=None, amp_level="O0", frequency=834, **kwargs):
super(Model_Thor, self).__init__(network, loss_fn, optimizer, metrics, eval_network,
eval_indexes, amp_level, **kwargs)
self._frequency = frequency
self._train_network = self._build_train_network()
def _exec_preprocess(self, network, is_train, phase, dataset, dataset_sink_mode, sink_size=-1,
epoch_num=1, iter_first_order=1):
"""Initializes dataset."""
if dataset_sink_mode and not is_train:
dataset.__loop_size__ = 1
dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num, iter_first_order)
if dataset_sink_mode and context.get_context("device_target") != "GPU":
network = connect_network_with_dataset(network, dataset_helper)
network.set_train(is_train)
network.phase = phase
if self._parallel_mode in (ParallelMode.SEMI_AUTO_PARALLEL, ParallelMode.AUTO_PARALLEL):
network.set_auto_parallel()
return dataset_helper, network
def _train_dataset_sink_process(self, epoch, train_dataset, list_callback=None, cb_params=None, sink_size=-1):
"""
Training process. The data would be passed to network through dataset channel.
Args:
epoch (int): Total number of iterations on the data.
train_dataset (Dataset): A training dataset iterator. If there is no
loss_fn, a tuple with multiple data (data1, data2, data3, ...) should be
returned and passed to the network. Otherwise, a tuple (data, label) should
be returned. The data and label would be passed to the network and loss
function respectively.
list_callback (Callback): Executor of callback list. Default: None.
cb_params (_InternalCallbackParam): Callback parameters. Default: None.
sink_size (int): Control the amount of data in each sink. Default: -1.
"""
if sink_size == -1:
epoch_num = epoch
else:
epoch_num = math.ceil(epoch * sink_size / train_dataset.get_dataset_size())
iter_first_order = self._frequency - 1
iter_second_order = 1
train_dataset.__loop_size__ = iter_second_order
dataset_helper, train_network = self._exec_preprocess(self._train_network,
is_train=True,
phase='train',
dataset=train_dataset,
dataset_sink_mode=True,
sink_size=sink_size,
epoch_num=epoch_num,
iter_first_order=iter_first_order)
self._train_network = train_network
cb_params.train_network = self._train_network
cb_params.cur_step_num = 0
run_context = RunContext(cb_params)
list_callback.begin(run_context)
# used to stop training for early stop, such as stopAtTIme or stopATStep
should_stop = False
switch_branch_one = True
index_first_order = 0
train_network_init_flag = True
has_do_dataset_init = False
for i in range(epoch):
cb_params.cur_epoch_num = i + 1
list_callback.epoch_begin(run_context)
# for data sink dataset_helper only iter once, other wise iter epoch_size times.
for inputs in dataset_helper:
if _need_to_full() and context.get_context("device_target") == "GPU":
inputs = _to_full_tensor(inputs, self._device_number, self._global_rank)
list_callback.step_begin(run_context)
if context.get_context("device_target") == "GPU":
if switch_branch_one:
cb_params.cur_step_num += 1
if train_network_init_flag:
self._train_network.add_flags_recursive(thor=True)
self._train_network.phase = 'train0'
switch_branch_one = not switch_branch_one
outputs = self._train_network(*inputs)
cb_params.net_outputs = outputs
list_callback.step_end(run_context)
else:
cb_params.cur_step_num += 1
if train_network_init_flag:
self._train_network.add_flags_recursive(thor=False)
train_network_init_flag = False
self._train_network.phase = 'train1'
outputs = self._train_network(*inputs)
cb_params.net_outputs = outputs
index_first_order += 1
if index_first_order == iter_first_order:
index_first_order = 0
switch_branch_one = not switch_branch_one
list_callback.step_end(run_context)
else:
if switch_branch_one:
cb_params.cur_step_num += 1
if train_network_init_flag:
self._train_network.add_flags_recursive(thor=True)
self._train_network.phase = 'train0'
else:
cb_params.cur_step_num += iter_first_order
if train_network_init_flag:
self._train_network.add_flags_recursive(thor=False)
train_network_init_flag = False
self._train_network.phase = 'train1'
if not has_do_dataset_init:
_exec_datagraph(train_dataset, iter_first_order, phase='train1_dataset')
has_do_dataset_init = True
switch_branch_one = not switch_branch_one
outputs = self._train_network(*inputs)
cb_params.net_outputs = outputs
list_callback.step_end(run_context)
list_callback.epoch_end(run_context)
should_stop = should_stop or run_context.get_stop_requested()
if should_stop:
break
dataset_helper.stop_send()
list_callback.end(run_context)
__all__ = ["Model_Thor"]

@ -16,6 +16,10 @@
network config setting, will be used in train.py and eval.py
"""
from easydict import EasyDict as ed
# config optimizer for resnet50, imagenet2012. Momentum is default, Thor is optional.
cfg = ed({
'optimizer': 'Thor',
})
# config for resent50, cifar10
config1 = ed({
@ -101,3 +105,49 @@ config4 = ed({
"lr_max": 0.3,
"lr_end": 0.0001
})
# config for resnet50, imagenet2012, Ascend 910
config_thor_Ascend = ed({
"class_num": 1001,
"batch_size": 32,
"loss_scale": 128,
"momentum": 0.9,
"weight_decay": 5e-4,
"epoch_size": 45,
"pretrain_epoch_size": 0,
"save_checkpoint": True,
"save_checkpoint_epochs": 2,
"keep_checkpoint_max": 15,
"save_checkpoint_path": "./",
"use_label_smooth": True,
"label_smooth_factor": 0.1,
"lr_init": 0.05803,
"lr_decay": 4.04839,
"lr_end_epoch": 53,
"damping_init": 0.02714,
"damping_decay": 0.50036,
"frequency": 834,
})
# config for resnet50, imagenet2012, GPU
config_thor_gpu = ed({
"class_num": 1001,
"batch_size": 32,
"loss_scale": 128,
"momentum": 0.9,
"weight_decay": 5e-4,
"epoch_size": 40,
"pretrain_epoch_size": 0,
"save_checkpoint": True,
"save_checkpoint_epochs": 1,
"keep_checkpoint_max": 15,
"save_checkpoint_path": "./",
"use_label_smooth": True,
"label_smooth_factor": 0.1,
"lr_init": 0.05672,
"lr_decay": 4.9687,
"lr_end_epoch": 50,
"damping_init": 0.02345,
"damping_decay": 0.5467,
"frequency": 834,
})

@ -205,3 +205,36 @@ def warmup_cosine_annealing_lr(lr, steps_per_epoch, warmup_epochs, max_epoch=120
lr_each_step = np.array(lr_each_step).astype(np.float32)
learning_rate = lr_each_step[global_step:]
return learning_rate
def get_thor_lr(global_step, lr_init, decay, total_epochs, steps_per_epoch, decay_epochs=100):
"""get_model_lr"""
lr_each_step = []
total_steps = steps_per_epoch * total_epochs
for i in range(total_steps):
epoch = (i + 1) / steps_per_epoch
base = (1.0 - float(epoch) / total_epochs) ** decay
lr_local = lr_init * base
if epoch >= decay_epochs:
lr_local = lr_local * 0.5
if epoch >= decay_epochs + 1:
lr_local = lr_local * 0.5
lr_each_step.append(lr_local)
current_step = global_step
lr_each_step = np.array(lr_each_step).astype(np.float32)
learning_rate = lr_each_step[current_step:]
return learning_rate
def get_thor_damping(global_step, damping_init, decay_rate, total_epochs, steps_per_epoch):
"""get_model_damping"""
damping_each_step = []
total_steps = steps_per_epoch * total_epochs
for step in range(total_steps):
epoch = (step + 1) / steps_per_epoch
damping_here = damping_init * (decay_rate ** (epoch / 10))
damping_each_step.append(damping_here)
current_step = global_step
damping_each_step = np.array(damping_each_step).astype(np.float32)
damping_now = damping_each_step[current_step:]
return damping_now

@ -13,6 +13,7 @@
# limitations under the License.
# ============================================================================
"""ResNet."""
import math
import numpy as np
import mindspore.nn as nn
import mindspore.common.dtype as mstype
@ -36,12 +37,81 @@ def _weight_variable(shape, factor=0.01):
return Tensor(init_value)
def calculate_gain(nonlinearity, param=None):
"""calculate_gain"""
linear_fns = ['linear', 'conv1d', 'conv2d', 'conv3d', 'conv_transpose1d', 'conv_transpose2d', 'conv_transpose3d']
res = 0
if nonlinearity in linear_fns or nonlinearity == 'sigmoid':
res = 1
elif nonlinearity == 'tanh':
res = 5.0 / 3
elif nonlinearity == 'relu':
res = math.sqrt(2.0)
elif nonlinearity == 'leaky_relu':
if param is None:
negative_slope = 0.01
elif not isinstance(param, bool) and isinstance(param, int) or isinstance(param, float):
# True/False are instances of int, hence check above
negative_slope = param
else:
raise ValueError("negative_slope {} not a valid number".format(param))
res = math.sqrt(2.0 / (1 + negative_slope ** 2))
else:
raise ValueError("Unsupported nonlinearity {}".format(nonlinearity))
return res
def _calculate_fan_in_and_fan_out(tensor):
"""_calculate_fan_in_and_fan_out"""
dimensions = len(tensor)
if dimensions < 2:
raise ValueError("Fan in and fan out can not be computed for tensor with fewer than 2 dimensions")
if dimensions == 2: # Linear
fan_in = tensor[1]
fan_out = tensor[0]
else:
num_input_fmaps = tensor[1]
num_output_fmaps = tensor[0]
receptive_field_size = 1
if dimensions > 2:
import time
time.sleep(10)
receptive_field_size = tensor[2] * tensor[3]
fan_in = num_input_fmaps * receptive_field_size
fan_out = num_output_fmaps * receptive_field_size
return fan_in, fan_out
def _calculate_correct_fan(tensor, mode):
mode = mode.lower()
valid_modes = ['fan_in', 'fan_out']
if mode not in valid_modes:
raise ValueError("Mode {} not supported, please use one of {}".format(mode, valid_modes))
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
return fan_in if mode == 'fan_in' else fan_out
def kaiming_normal(inputs_shape, a=0, mode='fan_in', nonlinearity='leaky_relu'):
fan = _calculate_correct_fan(inputs_shape, mode)
gain = calculate_gain(nonlinearity, a)
std = gain / math.sqrt(fan)
return np.random.normal(0, std, size=inputs_shape).astype(np.float32)
def kaiming_uniform(inputs_shape, a=0., mode='fan_in', nonlinearity='leaky_relu'):
fan = _calculate_correct_fan(inputs_shape, mode)
gain = calculate_gain(nonlinearity, a)
std = gain / math.sqrt(fan)
bound = math.sqrt(3.0) * std # Calculate uniform bounds from standard deviation
return np.random.uniform(-bound, bound, size=inputs_shape).astype(np.float32)
def _conv3x3(in_channel, out_channel, stride=1, use_se=False):
if use_se:
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=3)
else:
weight_shape = (out_channel, in_channel, 3, 3)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
return nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride, padding=0, pad_mode='same', weight_init=weight)
@ -51,7 +121,7 @@ def _conv1x1(in_channel, out_channel, stride=1, use_se=False):
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=1)
else:
weight_shape = (out_channel, in_channel, 1, 1)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
return nn.Conv2d(in_channel, out_channel,
kernel_size=1, stride=stride, padding=0, pad_mode='same', weight_init=weight)
@ -61,7 +131,7 @@ def _conv7x7(in_channel, out_channel, stride=1, use_se=False):
weight = _conv_variance_scaling_initializer(in_channel, out_channel, kernel_size=7)
else:
weight_shape = (out_channel, in_channel, 7, 7)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_normal(weight_shape, mode="fan_out", nonlinearity='relu'))
return nn.Conv2d(in_channel, out_channel,
kernel_size=7, stride=stride, padding=0, pad_mode='same', weight_init=weight)
@ -82,7 +152,7 @@ def _fc(in_channel, out_channel, use_se=False):
weight = Tensor(np.reshape(weight, (out_channel, in_channel)), dtype=mstype.float32)
else:
weight_shape = (out_channel, in_channel)
weight = _weight_variable(weight_shape)
weight = Tensor(kaiming_uniform(weight_shape, a=math.sqrt(5)))
return nn.Dense(in_channel, out_channel, has_bias=True, weight_init=weight, bias_init=0)

@ -18,9 +18,10 @@ import argparse
import ast
from mindspore import context
from mindspore import Tensor
from mindspore.nn.optim.momentum import Momentum
from mindspore.nn.optim.momentum import Momentum, THOR
from mindspore.train.model import Model
from mindspore.context import ParallelMode
from mindspore.train.train_thor import ConvertModelUtils
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits
from mindspore.train.loss_scale_manager import FixedLossScaleManager
@ -32,6 +33,7 @@ import mindspore.nn as nn
import mindspore.common.initializer as weight_init
from src.lr_generator import get_lr, warmup_cosine_annealing_lr
from src.CrossEntropySmooth import CrossEntropySmooth
from src.config import cfg
parser = argparse.ArgumentParser(description='Image classification')
parser.add_argument('--net', type=str, default=None, help='Resnet Model, either resnet50 or resnet101')
@ -65,6 +67,12 @@ else:
from src.config import config4 as config
from src.dataset import create_dataset4 as create_dataset
if cfg.optimizer == "Thor":
if args_opt.device_target == "Ascend":
from src.config import config_thor_Ascend as config
else:
from src.config import config_thor_gpu as config
if __name__ == '__main__':
target = args_opt.device_target
@ -124,13 +132,17 @@ if __name__ == '__main__':
cell.weight.dtype))
# init lr
if args_opt.net == "resnet50" or args_opt.net == "se-resnet50":
lr = get_lr(lr_init=config.lr_init, lr_end=config.lr_end, lr_max=config.lr_max,
warmup_epochs=config.warmup_epochs, total_epochs=config.epoch_size, steps_per_epoch=step_size,
lr_decay_mode=config.lr_decay_mode)
if cfg.optimizer == "Thor":
from src.lr_generator import get_thor_lr
lr = get_thor_lr(0, config.lr_init, config.lr_decay, config.lr_end_epoch, step_size, decay_epochs=39)
else:
lr = warmup_cosine_annealing_lr(config.lr, step_size, config.warmup_epochs, config.epoch_size,
config.pretrain_epoch_size * step_size)
if args_opt.net == "resnet50" or args_opt.net == "se-resnet50":
lr = get_lr(lr_init=config.lr_init, lr_end=config.lr_end, lr_max=config.lr_max,
warmup_epochs=config.warmup_epochs, total_epochs=config.epoch_size, steps_per_epoch=step_size,
lr_decay_mode=config.lr_decay_mode)
else:
lr = warmup_cosine_annealing_lr(config.lr, step_size, config.warmup_epochs, config.epoch_size,
config.pretrain_epoch_size * step_size)
lr = Tensor(lr)
# define opt
@ -180,6 +192,16 @@ if __name__ == '__main__':
## fp32 training
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr, config.momentum, config.weight_decay)
model = Model(net, loss_fn=loss, optimizer=opt, metrics={'acc'})
if cfg.optimizer == "Thor" and args_opt.dataset == "imagenet2012":
from src.lr_generator import get_thor_damping
damping = get_thor_damping(0, config.damping_init, config.damping_decay, 70, step_size)
split_indices = [26, 53]
opt = THOR(net, lr, Tensor(damping), config.momentum, config.weight_decay, config.loss_scale,
config.batch_size, split_indices=split_indices)
model = ConvertModelUtils().convert_to_thor_model(model=model, network=net, loss_fn=loss, optimizer=opt,
loss_scale_manager=loss_scale, metrics={'acc'},
amp_level="O2", keep_batchnorm_fp32=False,
frequency=config.frequency)
# define callbacks
time_cb = TimeMonitor(data_size=step_size)

@ -23,7 +23,7 @@ echo "For hyper parameter, please note that you should customize the scripts:
'{CUR_DIR}/scripts/ascend_distributed_launcher/hyper_parameter_config.ini' "
echo "=============================================================================================================="
CUR_DIR=`pwd`
ulimit -s 102400
python ${CUR_DIR}/scripts/ascend_distributed_launcher/get_distribute_pretrain_cmd.py \
--run_script_dir=${CUR_DIR}/run_pretrain.py \
--hyper_parameter_config_dir=${CUR_DIR}/scripts/ascend_distributed_launcher/hyper_parameter_config.ini \

@ -24,6 +24,7 @@ DEVICE_ID=$1
EPOCH_SIZE=$2
DATA_DIR=$3
SCHEMA_DIR=$4
ulimit -s 102400
mkdir -p ms_log
PROJECT_DIR=$(cd "$(dirname "$0")" || exit; pwd)

@ -48,6 +48,12 @@ cfg = edict({
'learning_rate': 2e-5,
'momentum': 0.9,
}),
'Thor': edict({
'momentum': 0.9,
'weight_decay': 5e-4,
'loss_scale': 1.0,
'frequency': 100,
}),
})
'''

@ -22,6 +22,7 @@ import math
import collections
import numpy as np
import mindspore.nn as nn
from mindspore import context
from mindspore import log as logger
from mindspore.ops import operations as P
from mindspore.common.tensor import Tensor
@ -106,11 +107,10 @@ class LossCallBack(Callback):
percent = 1
epoch_num -= 1
print("epoch: {}, current epoch percent: {}, step: {}, outputs are {}"
.format(int(epoch_num), "%.3f" % percent, cb_params.cur_step_num, str(cb_params.net_outputs),
flush=True))
.format(int(epoch_num), "%.3f" % percent, cb_params.cur_step_num, str(cb_params.net_outputs)))
else:
print("epoch: {}, step: {}, outputs are {}".format(cb_params.cur_epoch_num, cb_params.cur_step_num,
str(cb_params.net_outputs), flush=True))
str(cb_params.net_outputs)))
def LoadNewestCkpt(load_finetune_checkpoint_dir, steps_per_epoch, epoch_num, prefix):
"""
@ -181,3 +181,61 @@ def convert_labels_to_index(label_list):
sub_label = pre + label
label2id[sub_label] = index
return label2id
def _get_poly_lr(global_step, lr_init, lr_end, lr_max, warmup_steps, total_steps, poly_power):
"""
generate learning rate array
Args:
global_step(int): current step
lr_init(float): init learning rate
lr_end(float): end learning rate
lr_max(float): max learning rate
warmup_steps(int): number of warmup epochs
total_steps(int): total epoch of training
poly_power(int): poly learning rate power
Returns:
np.array, learning rate array
"""
lr_each_step = []
if warmup_steps != 0:
inc_each_step = (float(lr_max) - float(lr_init)) / float(warmup_steps)
else:
inc_each_step = 0
for i in range(total_steps):
if i < warmup_steps:
lr = float(lr_init) + inc_each_step * float(i)
else:
base = (1.0 - (float(i) - float(warmup_steps)) / (float(total_steps) - float(warmup_steps)))
lr = float(lr_max - lr_end) * (base ** poly_power)
lr = lr + lr_end
if lr < 0.0:
lr = 0.0
lr_each_step.append(lr)
learning_rate = np.array(lr_each_step).astype(np.float32)
current_step = global_step
learning_rate = learning_rate[current_step:]
return learning_rate
def get_bert_thor_lr():
if context.get_context("device_target") == "Ascend":
learning_rate = _get_poly_lr(global_step=0, lr_init=0.0, lr_end=3.244018779068399e-05,
lr_max=0.0034022148941459055, warmup_steps=0, total_steps=30000, poly_power=1)
else:
learning_rate = _get_poly_lr(global_step=0, lr_init=0.0, lr_end=1e-6, lr_max=1.7e-3, warmup_steps=0,
total_steps=30000, poly_power=1)
return Tensor(learning_rate)
def get_bert_thor_damping():
if context.get_context("device_target") == "Ascend":
damping = _get_poly_lr(global_step=0, lr_init=0.0, lr_end=1e-6, lr_max=5e-2, warmup_steps=0, total_steps=30000,
poly_power=1)
else:
damping = _get_poly_lr(global_step=0, lr_init=0.0, lr_end=1e-6, lr_max=3.5e-2, warmup_steps=0,
total_steps=30000, poly_power=1)
return Tensor(damping)

@ -28,20 +28,44 @@ from mindspore import log as logger
from mindspore.train.callback import Callback
from mindspore.context import ParallelMode
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.nn.optim import THOR
from mindspore.train.model import Model
from mindspore.train.train_thor import ConvertModelUtils
import mindspore.dataset.transforms.c_transforms as C
from model_zoo.official.nlp.bert_thor.src.bert_for_pre_training import BertNetworkWithLoss, BertTrainOneStepCell
from model_zoo.official.nlp.bert_thor.src.bert_net_config import bert_net_cfg
from model_zoo.official.nlp.bert_thor.src.config import cfg
from model_zoo.official.nlp.bert_thor.src.lr_generator import get_bert_lr, get_bert_damping
from model_zoo.official.nlp.bert_thor.src.model_thor import Model
from model_zoo.official.nlp.bert_thor.src.thor_for_bert_arg import THOR
from model_zoo.official.nlp.bert.src.bert_for_pre_training import BertNetworkWithLoss, BertTrainOneStepCell
from model_zoo.official.nlp.bert.src.utils import get_bert_thor_lr, get_bert_thor_damping
from model_zoo.official.nlp.bert.src.bert_model import BertConfig
MINDSPORE_HCCL_CONFIG_PATH = "/home/workspace/mindspore_config/hccl/rank_table_8p.json"
DATASET_PATH = "/home/workspace/mindspore_dataset/bert/thor/en-wiki-512_test_first1wan"
load_checkpoint_path = ""
data_sink_steps = 100
train_steps = 200
batch_size = 12
frequency = 100
momentum = 0.9
weight_decay = 5e-4
loss_scale = 1.0
bert_net_cfg = BertConfig(
seq_length=512,
vocab_size=30522,
hidden_size=1024,
num_hidden_layers=4,
num_attention_heads=16,
intermediate_size=4096,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=2,
initializer_range=0.02,
use_relative_positions=False,
dtype=mstype.float32,
compute_type=mstype.float16
)
np.random.seed(1)
ds.config.set_seed(1)
@ -113,27 +137,7 @@ def create_bert_dataset(device_num=1, rank=0, do_shuffle="true", data_dir=None,
def _set_bert_all_reduce_split():
"""set bert all_reduce fusion split, support num_hidden_layers is 12 and 24."""
from mindspore.parallel._auto_parallel_context import auto_parallel_context
if bert_net_cfg.num_hidden_layers == 12:
if bert_net_cfg.use_relative_positions:
auto_parallel_context().set_all_reduce_fusion_split_indices([29, 58, 87, 116, 145, 174, 203, 217],
"hccl_world_groupsum1")
auto_parallel_context().set_all_reduce_fusion_split_indices([29, 58, 87, 116, 145, 174, 203, 217],
"hccl_world_groupsum3")
else:
auto_parallel_context().set_all_reduce_fusion_split_indices([28, 55, 82, 109, 136, 163, 190, 205],
"hccl_world_groupsum1")
auto_parallel_context().set_all_reduce_fusion_split_indices([28, 55, 82, 109, 136, 163, 190, 205],
"hccl_world_groupsum3")
elif bert_net_cfg.num_hidden_layers == 24:
if bert_net_cfg.use_relative_positions:
auto_parallel_context().set_all_reduce_fusion_split_indices([30, 90, 150, 210, 270, 330, 390, 421],
"hccl_world_groupsum1")
auto_parallel_context().set_all_reduce_fusion_split_indices([30, 90, 150, 210, 270, 330, 390, 421],
"hccl_world_groupsum3")
else:
auto_parallel_context().set_all_reduce_fusion_split_indices([38, 77], "hccl_world_groupsum1")
auto_parallel_context().set_all_reduce_fusion_split_indices([38, 77], "hccl_world_groupsum3")
context.set_auto_parallel_context(all_reduce_fusion_config=[38, 77])
def train_process_bert_thor(q, device_id, epoch_size, device_num):
@ -153,7 +157,6 @@ def train_process_bert_thor(q, device_id, epoch_size, device_num):
context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True,
device_num=device_num)
bert_net_cfg.num_hidden_layers = 4
data_set = create_bert_dataset(device_num=device_num, rank=rank, do_shuffle=False, data_dir=DATASET_PATH,
schema_dir=None)
net_with_loss = BertNetworkWithLoss(bert_net_cfg, True)
@ -161,13 +164,12 @@ def train_process_bert_thor(q, device_id, epoch_size, device_num):
new_repeat_count = epoch_size * data_set.get_dataset_size() // data_sink_steps
new_repeat_count = min(new_repeat_count, train_steps // data_sink_steps)
lr = get_bert_lr()
damping = get_bert_damping()
optimizer = THOR(filter(lambda x: x.requires_grad, net_with_loss.get_parameters()), lr, cfg.Thor.momentum,
filter(lambda x: 'matrix_A' in x.name, net_with_loss.get_parameters()),
filter(lambda x: 'matrix_G' in x.name, net_with_loss.get_parameters()),
cfg.Thor.weight_decay, cfg.Thor.loss_scale, bert_net_cfg.num_hidden_layers,
bert_net_cfg.batch_size, damping)
lr = get_bert_thor_lr()
damping = get_bert_thor_damping()
split_indices = [38, 77]
optimizer = THOR(net_with_loss, lr, damping, momentum, weight_decay, loss_scale, batch_size,
decay_filter=lambda x: 'layernorm' not in x.name.lower() and 'bias' not in x.name.lower(),
split_indices=split_indices)
time_monitor_callback = TimeMonitor(data_sink_steps)
loss_callback = LossCallback()
callback = [time_monitor_callback, loss_callback]
@ -177,7 +179,9 @@ def train_process_bert_thor(q, device_id, epoch_size, device_num):
load_param_into_net(net_with_loss, param_dict)
net_with_grads = BertTrainOneStepCell(net_with_loss, optimizer=optimizer)
model = Model(net_with_grads, frequency=cfg.Thor.frequency)
model = Model(net_with_grads)
model = ConvertModelUtils().convert_to_thor_model(model, network=net_with_grads, optimizer=optimizer,
frequency=frequency)
model.train(new_repeat_count, data_set, callbacks=callback, dataset_sink_mode=True, sink_size=data_sink_steps)
loss_list = loss_callback.loss_list
@ -230,9 +234,12 @@ def test_bert_thor_mlperf_8p():
os.system("rm -rf " + str(i))
print("End training...")
assert mean_cost < 64.4
assert mean_loss < 7.9
assert mean_cost < 71.5
assert mean_loss < 8.125
if __name__ == '__main__':
begin = time.time()
test_bert_thor_mlperf_8p()
end = time.time()
print("time span is", end - begin, flush=True)

@ -18,7 +18,7 @@ network config setting, will be used in train.py and eval.py
from easydict import EasyDict as ed
config = ed({
"class_num": 1000,
"class_num": 1001,
"batch_size": 32,
"loss_scale": 128,
"momentum": 0.9,

@ -1,135 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""grad reducer cell for distributed training"""
from mindspore.nn.cell import Cell
from mindspore.communication.management import GlobalComm, get_group_size
from mindspore.ops import functional as F, composite as C, operations as P
from mindspore.ops.operations.comm_ops import AllReduce
import mindspore.common.dtype as mstype
reduce_opt = C.MultitypeFuncGraph("reduce_opt")
def _init_allreduce_operators(length, split_indices):
""" initialize allreduce communication operators"""
indices = split_indices[0]
fusion = split_indices[1]
op_list = ()
j = 0
for i in range(length):
if j <= len(indices)-1:
temp = indices[j]
else:
temp = length
if i >= temp:
j = j + 1
fusion = fusion + 1
op = AllReduce('sum', GlobalComm.WORLD_COMM_GROUP)
op.add_prim_attr('fusion', fusion)
op_list = op_list + (op,)
return op_list
@reduce_opt.register("Function", "Number", "Function", "Tensor")
def _tensors_allreduce_mean(mul, degree, allreduce, parameters):
"""
Apply allreduce on parameters.
Args:
mul(Primitive): The mul operator for parameters.
degree (int): The mean coefficient.
allreduce (Primitive): The communication operator for parameters.
parameters (Tensor): The parameters before operation.
Returns:
Tensor, the parameters after operation.
"""
degree = F.scalar_cast(degree, F.dtype(parameters))
parameters = allreduce(parameters)
cast_op = P.Cast()
return mul(parameters, cast_op(F.scalar_to_array(1.0 / degree), F.dtype(parameters)))
_get_datatype = C.MultitypeFuncGraph("_get_datatype")
@_get_datatype.register("Tensor")
def _tensors_get_datatype(parameters):
"""
Acquire parameters datatype.
Args:
parameters (Tensor): The parameters before operation.
Returns:
mstype, the datatype of parameters.
"""
return F.dtype(parameters)
_cast_datatype = C.MultitypeFuncGraph("_cast_datatype")
@_cast_datatype.register("TypeType", "Tensor")
def _tensors_cast_datatype(datatype, parameters):
"""
Cast parameters to datatype.
Args:
datatype (mstype): the destination datatype of parameters.
parameters (Tensor): The parameters before operation.
Returns:
Tensor, the parameters after operation.
"""
return F.cast(parameters, datatype)
class DistributedGradReducerThor(Cell):
"""
A distributed optimizer.
Constructs a parameters reducer Cell, which applies communication and average operations on
single-process parameters values.
Args:
parameter_length (int): length of the parameters to be updated.
split_indices(tuple): parameter split indices.
mean (bool): When mean is true, the mean coefficient (degree) would apply on parameters. Default: False.
degree (int): The mean coefficient. Usually it equals to device number. Default: None.
Raises:
ValueError: If degree is not a int or less than 0.
"""
def __init__(self, parameter_length, split_indices, mean=True, degree=None):
super(DistributedGradReducerThor, self).__init__(auto_prefix=False)
self.hyper_map = C.HyperMap()
self.mul = P.Mul()
if degree is None:
self.degree = get_group_size()
else:
if not isinstance(degree, int) or degree <= 0:
raise ValueError("Parameter 'degree' in DistributedGradReducer should large than 0 and be int")
self.degree = degree
self.mean = mean
self.op_list = _init_allreduce_operators(parameter_length, split_indices)
def construct(self, parameters):
datatypes = self.hyper_map(F.partial(_get_datatype), parameters)
parameters = self.hyper_map(F.partial(_cast_datatype, mstype.float32), parameters)
new_parameters = self.hyper_map(F.partial(reduce_opt, self.mul, self.degree), self.op_list, parameters)
new_parameters = self.hyper_map(F.partial(_cast_datatype), datatypes, new_parameters)
return new_parameters

@ -1,88 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""learning rate generator"""
import math
import numpy as np
def get_lr(lr_init, lr_end, lr_max, warmup_epochs, total_epochs, steps_per_epoch, lr_decay_mode):
"""
generate learning rate array
Args:
lr_init(float): init learning rate
lr_end(float): end learning rate
lr_max(float): max learning rate
warmup_epochs(int): number of warmup epochs
total_epochs(int): total epoch of training
steps_per_epoch(int): steps of one epoch
lr_decay_mode(string): learning rate decay mode, including steps, poly, cosine or default
Returns:
np.array, learning rate array
"""
lr_each_step = []
total_steps = steps_per_epoch * total_epochs
warmup_steps = steps_per_epoch * warmup_epochs
if lr_decay_mode == 'steps':
decay_epoch_index = [0.3 * total_steps, 0.6 * total_steps, 0.8 * total_steps]
for i in range(total_steps):
if i < decay_epoch_index[0]:
lr = lr_max
elif i < decay_epoch_index[1]:
lr = lr_max * 0.1
elif i < decay_epoch_index[2]:
lr = lr_max * 0.01
else:
lr = lr_max * 0.001
lr_each_step.append(lr)
elif lr_decay_mode == 'poly':
if warmup_steps != 0:
inc_each_step = (float(lr_max) - float(lr_init)) / float(warmup_steps)
else:
inc_each_step = 0
for i in range(total_steps):
if i < warmup_steps:
lr = float(lr_init) + inc_each_step * float(i)
else:
base = (1.0 - (float(i) - float(warmup_steps)) / (float(total_steps) - float(warmup_steps)))
lr = float(lr_max) * base * base
if lr < 0.0:
lr = 0.0
lr_each_step.append(lr)
elif lr_decay_mode == 'cosine':
decay_steps = total_steps - warmup_steps
for i in range(total_steps):
if i < warmup_steps:
lr_inc = (float(lr_max) - float(lr_init)) / float(warmup_steps)
lr = float(lr_init) + lr_inc * (i + 1)
else:
linear_decay = (total_steps - i) / decay_steps
cosine_decay = 0.5 * (1 + math.cos(math.pi * 2 * 0.47 * i / decay_steps))
decayed = linear_decay * cosine_decay + 0.00001
lr = lr_max * decayed
lr_each_step.append(lr)
else:
for i in range(total_steps):
if i < warmup_steps:
lr = lr_init + (lr_max - lr_init) * i / warmup_steps
else:
lr = lr_max - (lr_max - lr_end) * (i - warmup_steps) / (total_steps - warmup_steps)
lr_each_step.append(lr)
learning_rate = np.array(lr_each_step).astype(np.float32)
return learning_rate

File diff suppressed because it is too large Load Diff

@ -1,202 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""momentum"""
import mindspore.common.dtype as mstype
from mindspore.common.initializer import initializer
from mindspore.common.parameter import Parameter
from mindspore.common.parameter import ParameterTuple
from mindspore.common.tensor import Tensor
from mindspore.nn.optim.optimizer import Optimizer
from mindspore.ops import functional as F, composite as C, operations as P
from mindspore.parallel._utils import _get_device_num, _get_gradients_mean
from .grad_reducer_thor import DistributedGradReducerThor
momentum_opt = C.MultitypeFuncGraph("momentum_opt")
@momentum_opt.register("Function", "Tensor", "Tensor", "Tensor", "Tensor", "Tensor")
def _tensor_run_opt_ext(opt, learning_rate, momentum, gradient, weight, moment):
"""Apply momentum optimizer to the weight parameter using Tensor."""
success = True
success = F.depend(success, opt(weight, moment, learning_rate, gradient, momentum))
return success
op_add = P.AddN()
apply_decay = C.MultitypeFuncGraph("apply_decay")
@apply_decay.register("Number", "Bool", "Tensor", "Tensor")
def _tensor_apply_decay(weight_decay, if_apply, weight, gradient):
"""Get grad with weight_decay."""
if if_apply:
return op_add((weight * weight_decay, gradient))
return gradient
class THOR(Optimizer):
"""THOR"""
def __init__(self, params, learning_rate, momentum, matrix_A, matrix_G, A_inv_max, G_inv_max, weight_decay=0.0,
loss_scale=1.0,
decay_filter=lambda x: x.name not in []):
super(THOR, self).__init__(learning_rate, params, weight_decay, loss_scale)
if isinstance(momentum, float) and momentum < 0.0:
raise ValueError("momentum should be at least 0.0, but got momentum {}".format(momentum))
self.momentum = Parameter(Tensor(momentum, mstype.float32), name="momentum")
self.params = self.parameters
self.moments = self.params.clone(prefix="moments", init='zeros')
self.hyper_map = C.HyperMap()
self.opt = P.ApplyMomentum()
self.matrix_A = ParameterTuple(matrix_A)
self.matrix_G = ParameterTuple(matrix_G)
self.A_inv_max = ParameterTuple(A_inv_max)
self.G_inv_max = ParameterTuple(G_inv_max)
self.cube_matmul_left = P.CusMatMulCubeFraczLeftCast()
self.cube_matmul_left_fc = P.CusMatMulCubeDenseLeft()
self.cube_matmul_right_fc = P.CusMatMulCubeDenseRight()
self.cube_matmul_right_mul = P.CusMatMulCubeFraczRightMul()
self.transpose = P.Transpose()
self.shape = P.Shape()
self.reshape = P.Reshape()
self.mul = P.Mul()
self.weight_idx = []
for i in range(len(self.params)):
if "conv" in self.params[i].name or "end_point" in self.params[i].name:
self.weight_idx.append(i)
self.weight_idx.append(len(self.params))
self.feature_map = [1.0 / 12544, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136,
1.0 / 3136, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136, 1.0 / 3136,
1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784,
1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784, 1.0 / 784,
1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196,
1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196, 1.0 / 196,
1.0 / 196, 1.0 / 196, 1.0 / 196,
1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49, 1.0 / 49,
1.0]
mean = _get_gradients_mean()
degree = _get_device_num()
parameter_length = len(self.feature_map)
self.grad_reducer_Amax = DistributedGradReducerThor(parameter_length, ((27,), 2), mean, degree)
self.grad_reducer_Gmax = DistributedGradReducerThor(parameter_length, ((27,), 4), mean, degree)
self.grad_reducer_A = DistributedGradReducerThor(parameter_length, ((27,), 6), mean, degree)
self.grad_reducer_G = DistributedGradReducerThor(parameter_length, ((27,), 8), mean, degree)
self.matrix_A_inv = ()
self.matrix_G_inv = ()
self.matrix_max_inv = ()
for i in range(54):
self.matrix_max_inv = self.matrix_max_inv + (
Parameter(initializer(1, [1], mstype.float32), name="matrix_max" + str(i), requires_grad=False),)
self.log = P.Log()
self.exp = P.Exp()
self.sqrt = P.Sqrt()
self.matrix_max_inv = ParameterTuple(self.matrix_max_inv)
self.assign = P.Assign()
self.cast = P.Cast()
self.thor = True
self.weight_decay = weight_decay * loss_scale
self.decay_flags = tuple(decay_filter(x) for x in self.parameters)
def construct(self, gradients):
params = self.params
moments = self.moments
if self.thor:
matrix_A_allreduce = ()
matrix_G_allreduce = ()
matrix_A_max_allreduce = ()
matrix_G_max_allreduce = ()
for i in range(54):
g = gradients[i * 3]
matrix_A = self.matrix_A[i]
matrix_G = self.matrix_G[i]
A_max = self.A_inv_max[i]
G_max = self.G_inv_max[i]
matrix_A = F.depend(matrix_A, g)
matrix_G = F.depend(matrix_G, g)
A_max = F.depend(A_max, g)
G_max = F.depend(G_max, g)
matrix_A_allreduce = matrix_A_allreduce + (matrix_A,)
matrix_G_allreduce = matrix_G_allreduce + (matrix_G,)
matrix_A_max_allreduce = matrix_A_max_allreduce + (A_max,)
matrix_G_max_allreduce = matrix_G_max_allreduce + (G_max,)
matrix_A_allreduce = self.grad_reducer_A(matrix_A_allreduce)
matrix_G_allreduce = self.grad_reducer_G(matrix_G_allreduce)
matrix_A_max_allreduce = self.grad_reducer_Amax(matrix_A_max_allreduce)
matrix_G_max_allreduce = self.grad_reducer_Gmax(matrix_G_max_allreduce)
new_grads = ()
for i in range(54):
g = gradients[i * 3]
temp_a = matrix_A_allreduce[i]
temp_g = matrix_G_allreduce[i]
temp_a = self.cast(temp_a, mstype.float32)
temp_g = self.cast(temp_g, mstype.float32)
matrix_A_inv_max = self.log(matrix_A_max_allreduce[i])
matrix_A_inv_max = self.mul(matrix_A_inv_max, -1)
matrix_A_inv_max = self.exp(matrix_A_inv_max)
temp_a = self.mul(temp_a, matrix_A_inv_max)
matrix_G_inv_max = self.log(matrix_G_max_allreduce[i])
matrix_G_inv_max = self.mul(matrix_G_inv_max, -1)
matrix_G_inv_max = self.exp(matrix_G_inv_max)
temp_g = self.mul(temp_g, matrix_G_inv_max)
temp_max = self.mul(matrix_A_max_allreduce[i], matrix_G_max_allreduce[i])
temp_max = self.mul(temp_max, self.feature_map[i])
temp_a = self.cast(temp_a, mstype.float16)
temp_g = self.cast(temp_g, mstype.float16)
if i == 53:
g = self.cube_matmul_left_fc(temp_g, g)
g = self.cube_matmul_right_fc(g, temp_a, temp_max)
else:
g = self.cube_matmul_left(temp_g, g)
g = self.cube_matmul_right_mul(g, temp_a, temp_max)
fake_A = self.assign(self.matrix_A[i], temp_a)
fake_G = self.assign(self.matrix_G[i], temp_g)
fake_max = self.assign(self.matrix_max_inv[i], temp_max)
g = F.depend(g, fake_A)
g = F.depend(g, fake_G)
g = F.depend(g, fake_max)
if i == 53:
new_grads = new_grads + (g,)
else:
new_grads = new_grads + (g, gradients[i * 3 + 1], gradients[i * 3 + 2])
gradients = new_grads
else:
new_grads = ()
for i in range(54):
g = gradients[i * 3]
matrix_A = self.matrix_A[i]
matrix_G = self.matrix_G[i]
matrix_max = self.matrix_max_inv[i]
matrix_A = F.depend(matrix_A, g)
matrix_G = F.depend(matrix_G, g)
matrix_max = F.depend(matrix_max, g)
if i == 53:
g = self.cube_matmul_left_fc(matrix_G, g)
g = self.cube_matmul_right_fc(g, matrix_A, matrix_max)
new_grads = new_grads + (g,)
else:
g = self.cube_matmul_left(matrix_G, g)
g = self.cube_matmul_right_mul(g, matrix_A, matrix_max)
new_grads = new_grads + (g, gradients[i * 3 + 1], gradients[i * 3 + 2])
gradients = new_grads
if self.weight_decay > 0:
gradients = self.hyper_map(F.partial(apply_decay, self.weight_decay), self.decay_flags,
params, gradients)
gradients = self.scale_grad(gradients)
lr = self.get_lr()
success = self.hyper_map(F.partial(momentum_opt, self.opt, lr, self.momentum), gradients, params, moments)
return success

File diff suppressed because it is too large Load Diff

@ -21,7 +21,8 @@ from multiprocessing import Process, Queue
import pytest
import numpy as np
from mindspore import context, Tensor
from mindspore import context
from mindspore.common.tensor import Tensor
from mindspore.communication.management import init
from mindspore.train.model import Model
from mindspore.context import ParallelMode
@ -29,6 +30,7 @@ from mindspore.train.callback import Callback
from mindspore.train.loss_scale_manager import FixedLossScaleManager
import mindspore.nn as nn
import mindspore.dataset as ds
from mindspore.nn.optim import THOR
from tests.st.networks.models.resnet50.src.resnet import resnet50
from tests.st.networks.models.resnet50.src.dataset import create_dataset
@ -39,7 +41,7 @@ from tests.st.networks.models.resnet50.src.CrossEntropySmooth import CrossEntrop
from tests.st.networks.models.resnet50.src_thor.config import config as thor_config
from tests.st.networks.models.resnet50.src_thor.model_thor import Model as THOR_Model
from tests.st.networks.models.resnet50.src_thor.resnet import resnet50 as resnet50_thor
from tests.st.networks.models.resnet50.src_thor.thor import THOR
MINDSPORE_HCCL_CONFIG_PATH = "/home/workspace/mindspore_config/hccl/rank_tabel_4p/rank_table_4p_1.json"
MINDSPORE_HCCL_CONFIG_PATH_2 = "/home/workspace/mindspore_config/hccl/rank_tabel_4p/rank_table_4p_2.json"
@ -50,7 +52,8 @@ np.random.seed(1)
ds.config.set_seed(1)
os.environ['GLOG_v'] = str(2)
def get_model_lr(global_step, lr_init, decay, total_epochs, steps_per_epoch):
def get_thor_lr(global_step, lr_init, decay, total_epochs, steps_per_epoch, decay_epochs=100):
"""get_model_lr"""
lr_each_step = []
total_steps = steps_per_epoch * total_epochs
@ -58,9 +61,9 @@ def get_model_lr(global_step, lr_init, decay, total_epochs, steps_per_epoch):
epoch = (i + 1) / steps_per_epoch
base = (1.0 - float(epoch) / total_epochs) ** decay
lr_local = lr_init * base
if epoch >= 39:
if epoch >= decay_epochs:
lr_local = lr_local * 0.5
if epoch >= 40:
if epoch >= decay_epochs + 1:
lr_local = lr_local * 0.5
lr_each_step.append(lr_local)
current_step = global_step
@ -69,7 +72,7 @@ def get_model_lr(global_step, lr_init, decay, total_epochs, steps_per_epoch):
return learning_rate
def get_model_damping(global_step, damping_init, decay_rate, total_epochs, steps_per_epoch):
def get_thor_damping(global_step, damping_init, decay_rate, total_epochs, steps_per_epoch):
"""get_model_damping"""
damping_each_step = []
total_steps = steps_per_epoch * total_epochs
@ -77,7 +80,6 @@ def get_model_damping(global_step, damping_init, decay_rate, total_epochs, steps
epoch = (step + 1) / steps_per_epoch
damping_here = damping_init * (decay_rate ** (epoch / 10))
damping_each_step.append(damping_here)
current_step = global_step
damping_each_step = np.array(damping_each_step).astype(np.float32)
damping_now = damping_each_step[current_step:]
@ -140,6 +142,7 @@ def train_process(q, device_id, epoch_size, device_num, enable_hccl):
init()
# network
net = resnet50(class_num=config.class_num)
# evaluation network
@ -160,7 +163,7 @@ def train_process(q, device_id, epoch_size, device_num, enable_hccl):
eval_interval = config.eval_interval
dataset.__loop_size__ = step_size * eval_interval
# evalutation dataset
# evaluation dataset
eval_dataset = create_dataset(dataset_path=eval_path, do_train=False,
repeat_num=1, batch_size=config.eval_batch_size)
@ -233,16 +236,11 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
os.environ['RANK_SIZE'] = str(device_num)
if enable_hccl:
context.set_auto_parallel_context(device_num=device_num, parallel_mode=ParallelMode.DATA_PARALLEL,
gradients_mean=True, all_reduce_fusion_config=[107])
gradients_mean=True, all_reduce_fusion_config=[85, 160])
init()
# network
damping = get_model_damping(0, 0.03, 0.87, 50, 5004)
net = resnet50_thor(class_num=thor_config.class_num, damping=damping, loss_scale=thor_config.loss_scale,
frequency=thor_config.frequency)
# evaluation network
dist_eval_network = ClassifyCorrectCell(net)
net = resnet50_thor(thor_config.class_num)
if not thor_config.label_smooth:
thor_config.label_smooth_factor = 0.0
@ -258,7 +256,7 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
step_size = dataset.get_dataset_size()
eval_interval = thor_config.eval_interval
# evalutation dataset
# evaluation dataset
eval_dataset = create_dataset(dataset_path=eval_path, do_train=False,
repeat_num=1, batch_size=thor_config.eval_batch_size)
@ -266,16 +264,15 @@ def train_process_thor(q, device_id, epoch_size, device_num, enable_hccl):
loss_scale = FixedLossScaleManager(thor_config.loss_scale, drop_overflow_update=False)
# learning rate
lr = Tensor(get_model_lr(0, 0.045, 6, 70, 5004))
lr = get_thor_lr(0, 0.05803, 4.04839, 53, 5004, decay_epochs=39)
damping = get_thor_damping(0, 0.02714, 0.50036, 70, 5004)
# optimizer
opt = THOR(filter(lambda x: x.requires_grad, net.get_parameters()), lr, thor_config.momentum,
filter(lambda x: 'matrix_A' in x.name, net.get_parameters()),
filter(lambda x: 'matrix_G' in x.name, net.get_parameters()),
filter(lambda x: 'A_inv_max' in x.name, net.get_parameters()),
filter(lambda x: 'G_inv_max' in x.name, net.get_parameters()),
thor_config.weight_decay, thor_config.loss_scale)
split_indices = [26, 53]
opt = THOR(net, Tensor(lr), Tensor(damping), thor_config.momentum, thor_config.weight_decay, thor_config.loss_scale,
thor_config.batch_size, split_indices=split_indices)
# evaluation network
dist_eval_network = ClassifyCorrectCell(net)
# model
model = THOR_Model(net, loss_fn=loss, optimizer=opt, loss_scale_manager=loss_scale, amp_level="O2",
keep_batchnorm_fp32=False,

Loading…
Cancel
Save