You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mindspore/tests/st/auto_monad/test_auto_monad_mindtester.py

647 lines
21 KiB

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import os
import pytest
import numpy as np
import mindspore as ms
import mindspore.ops.operations as P
from mindspore.nn import Cell
from mindspore import context, Tensor
from mindspore.common.parameter import Parameter
from mindspore.common.initializer import initializer
from mindspore.train.model import Model
from mindspore.ops.composite import GradOperation
from mindspore.common import ParameterTuple
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
class _Grad(Cell):
def __init__(self, grad, network, wrt_params=False, real_inputs_count=None):
super().__init__()
self.network = network
self.grad = grad
self.sens_param = self.grad.sens_param
self.wrt_params = wrt_params
self.real_inputs_count = real_inputs_count
if self.wrt_params:
self.params = ParameterTuple(self.network.trainable_params())
def construct(self, *inputs):
if self.real_inputs_count is None or self.sens_param is False:
if self.wrt_params:
return self.grad(self.network, self.params)(*inputs)
return self.grad(self.network)(*inputs)
real_inputs = inputs[:self.real_inputs_count]
sense_param_inputs = inputs[self.real_inputs_count:]
if self.wrt_params:
return self.grad(self.network, self.params)(*real_inputs, sense_param_inputs)
return self.grad(self.network)(*real_inputs, sense_param_inputs)
class GradOfFirstInput(_Grad):
"""
get grad of first input
"""
def __init__(self, network, sens_param=True, real_inputs_count=None):
super().__init__(grad=GradOperation(sens_param=sens_param),
network=network, real_inputs_count=real_inputs_count)
class GradOfAllInputs(_Grad):
'''
get grads of all inputs
'''
def __init__(self, network, sens_param=True, real_inputs_count=None):
super().__init__(grad=GradOperation(get_all=True, sens_param=sens_param),
network=network, real_inputs_count=real_inputs_count)
class GradOfAllInputsAndParams(_Grad):
'''
get grads of all inputs and params
'''
def __init__(self, network, sens_param=True, real_inputs_count=None):
super().__init__(grad=GradOperation(get_all=True, get_by_list=True, sens_param=sens_param),
network=network, wrt_params=True, real_inputs_count=real_inputs_count)
def _count_unequal_element(data_expected, data_me, rtol, atol):
assert data_expected.shape == data_me.shape
total_count = len(data_expected.flatten())
error = np.abs(data_expected - data_me)
greater = np.greater(error, atol + np.abs(data_me)*rtol)
loss_count = np.count_nonzero(greater)
assert (loss_count/total_count) < rtol, \
"\ndata_expected_std:{0}\ndata_me_error:{1}\nloss:{2}".\
format(data_expected[greater], data_me[greater], error[greater])
def allclose_nparray(data_expected, data_me, rtol, atol, equal_nan=True):
if np.any(np.isnan(data_expected)):
assert np.allclose(data_expected, data_me, rtol,
atol, equal_nan=equal_nan)
elif not np.allclose(data_expected, data_me, rtol, atol, equal_nan=equal_nan):
_count_unequal_element(data_expected, data_me, rtol, atol)
else:
assert True
class ControlGraphSupportNotEqual(Cell):
def construct(self, x, y, z, input_data):
if x != y:
out = input_data + input_data
else:
out = input_data - input_data
if x == z:
out2 = input_data * input_data
else:
out2 = input_data / input_data
if x == z:
out3_f = (lambda a: a+a)
out3 = out3_f(input_data)
else:
out3_f = (lambda a: a+a+a)
out3 = out3_f(input_data)
return out, out2, out3
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_ctrl_if_while_graph_support_not_equal_true():
x = np.array(0).astype(np.float32)
y = np.array(3).astype(np.float32)
input_shape = (512, 512, 7, 7)
input_data = np.random.randn(*input_shape).astype(np.float32)
net = ControlGraphSupportNotEqual()
model = Model(net)
out_me = model.predict(Tensor(x), Tensor(y), Tensor(x), Tensor(input_data))
out = input_data + input_data
out2 = input_data * input_data
out3 = input_data + input_data
allclose_nparray(out, out_me[0].asnumpy(), 0.0001, 0.0001)
allclose_nparray(out2, out_me[1].asnumpy(), 0.0001, 0.0001)
allclose_nparray(out3, out_me[2].asnumpy(), 0.0001, 0.0001)
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_ctrl_if_while_graph_support_not_equal_false():
x = np.array(0).astype(np.float32)
y = np.array(0).astype(np.float32)
z = np.array(3).astype(np.float32)
input_shape = (512, 512, 7, 7)
input_data = np.random.randn(*input_shape).astype(np.float32)
net = ControlGraphSupportNotEqual()
model = Model(net)
out_me = model.predict(Tensor(x), Tensor(y), Tensor(z), Tensor(input_data))
out = input_data - input_data
out2 = input_data / input_data
out3 = input_data + input_data + input_data
allclose_nparray(out, out_me[0].asnumpy(), 0.0001, 0.0001)
allclose_nparray(out2, out_me[1].asnumpy(), 0.0001, 0.0001)
allclose_nparray(out3, out_me[2].asnumpy(), 0.0001, 0.0001)
class ControlBprop(Cell):
def construct(self, x, y, z, input_data):
if x != y:
out = input_data + input_data
else:
out = input_data - input_data
if x == z:
out2 = input_data * input_data
else:
out2 = input_data / input_data
if x == z:
out3_f = (lambda a: a+a)
out3 = out3_f(input_data)
else:
out3_f = (lambda a: a+a+a)
out3 = out3_f(input_data)
return out, out2, out3
def bprop(self, x, y, z, input_data, out, dout):
return x*2, y*3, z, input_data*5.1
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_ctrl_if_while_bprop_true():
x = np.array(0).astype(np.float32)
y = np.array(3).astype(np.float32)
input_shape = (512, 512, 7, 7)
input_data = np.random.randn(*input_shape).astype(np.float32)
net = ControlBprop()
grad_net = GradOfAllInputs(net, sens_param=False)
grad_net.set_train()
grads = grad_net(Tensor(x), Tensor(y), Tensor(x), Tensor(input_data))
allclose_nparray(x*2, grads[0].asnumpy(), 0.0000, 0.0000)
allclose_nparray(y*3, grads[1].asnumpy(), 0.0000, 0.0000)
allclose_nparray(x, grads[2].asnumpy(), 0.0000, 0.0000)
allclose_nparray(input_data*5.1, grads[3].asnumpy(), 0.0000, 0.0000)
class TwoInput(Cell):
def __init__(self):
super().__init__()
self.op = P.Mul()
def construct(self, x, y):
x = self.op(x, y)
return x
class InlineBpropTwoInput1(Cell):
def __init__(self):
super().__init__()
self.f = TwoInput()
self.f.set_grad()
self.grad = GradOfAllInputs(self.f, sens_param=False)
def construct(self, x, y):
if x > y:
x = self.f(x, y)
else:
x = self.f(x, y)
return x
def bprop(self, x, y, out, dout):
if x > y:
grads = self.grad(x, y)
else:
grads = self.grad(x, y)
return grads[0]*2, grads[1]*2
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_ctrl_if_while_bprop_inlinebprop_twoinput():
net = InlineBpropTwoInput1()
input1 = Tensor(np.array(2).astype(np.float32))
input2 = Tensor(np.array(1).astype(np.float32))
grad_net = GradOfAllInputs(net, sens_param=False)
grad_net.set_train()
grads = grad_net(input1, input2)
allclose_nparray(input1.asnumpy()*2, grads[1].asnumpy(), 0, 0)
allclose_nparray(input2.asnumpy()*2, grads[0].asnumpy(), 0, 0)
class ControlOneIfOneParaOneAddn(Cell):
def __init__(self, input_shape):
super().__init__()
self.addn = P.AddN()
self.assign = P.Assign()
self.inputdata = Parameter(initializer(
1, input_shape, ms.float32), name="global_step")
def construct(self, x, y, input_data):
if x > y:
out = self.inputdata
else:
out = self.addn([input_data, input_data, input_data])
if x > y:
out = self.assign(self.inputdata, input_data)
return out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_ctrl_if_para_addn_true():
x = Tensor(1, ms.float32)
y = Tensor(0, ms.float32)
input_shape = (1024, 512, 7, 7)
input_data = np.random.randn(*input_shape).astype(np.float32)
net = ControlOneIfOneParaOneAddn(input_shape)
out = net(x, y, Tensor(input_data))
allclose_nparray(input_data[0], out.asnumpy()[0], 0.0001, 0.0001)
class AddnCell(Cell):
def __init__(self):
super().__init__()
self.addn = P.AddN()
def construct(self, x):
x = self.addn((x, x))
return x
class SideEffectMemoryCellAddnNet(Cell):
def __init__(self):
super().__init__()
self.para = Parameter(Tensor([1.0], ms.float32), name="para")
self.assign = P.Assign()
self.addn = P.AddN()
self.addn1 = AddnCell()
def construct(self, x):
x = self.addn1(x)
self.assign(self.para, x)
out = self.addn((self.para, x))
return out
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_grad_memory_addn():
net = SideEffectMemoryCellAddnNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
net.grad_mindspore_impl(inputs, grad_ys)
class SideEffectIOCellAddnNet(Cell):
def __init__(self):
super().__init__()
self.para1 = Parameter(Tensor([1.0], ms.float32), name="para1")
self.para2 = Parameter(Tensor([3.0], ms.float32), name="para2")
self.print = P.Print()
self.addn = AddnCell()
def construct(self, x):
self.print("para1:", self.para1)
self.print("para2:", self.para2)
x = self.addn(x)
return x
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_grad_io_addn():
net = SideEffectIOCellAddnNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
net.grad_mindspore_impl(inputs, grad_ys)
class SideEffectReturnParameterNet(Cell):
def __init__(self):
super().__init__()
self.para = Parameter(Tensor([1.0], ms.float32), name="para")
self.assign = P.Assign()
self.addn = P.AddN()
self.relu = P.ReLU()
def construct(self, inputs):
p1 = self.assign(self.para, inputs)
out = self.addn((inputs, inputs, inputs))
out = self.relu(out)
return p1
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_grad_read_dependency_return_parameter():
net = SideEffectReturnParameterNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
net.grad_mindspore_impl(inputs, grad_ys)
class SideEffectAssignAddnReluReturnParNet(Cell):
def __init__(self):
super().__init__()
self.parameter1 = Parameter(
Tensor([1.0], ms.float32), name="parameter1")
self.assign = P.Assign()
self.addN = P.AddN()
self.relu = P.ReLU()
def construct(self, inputs):
p1 = self.assign(self.parameter1, inputs)
out = self.addN((inputs, inputs, inputs))
out = self.relu(out)
return p1
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_side_effect_grad_read_dependency_assign_addn_relu_return_parameter():
net = SideEffectAssignAddnReluReturnParNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
out1 = net.grad_mindspore_impl(inputs, grad_ys)
net = SideEffectAssignAddnReluReturnParNet()
try:
context.set_context(mode=context.PYNATIVE_MODE)
out2 = net.grad_mindspore_impl(inputs, grad_ys)
allclose_nparray(out1[0][0].asnumpy(), out2[0]
[0].asnumpy(), 0.001, 0.001)
allclose_nparray(out1[1][0].asnumpy(), out2[1]
[0].asnumpy(), 0.001, 0.001)
finally:
context.set_context(mode=context.GRAPH_MODE)
class SideEffectPrintInHighOrdeAddnNet(Cell):
def __init__(self):
super().__init__()
self.parameter1 = Parameter(
Tensor([1.0], ms.float32), name="parameter1")
self.parameter2 = Parameter(
Tensor([3.0], ms.float32), name="parameter2")
self.assign = P.Assign()
self.addn = P.AddN()
self.mul = P.Mul()
self.print = P.Print()
def construct(self, x):
self.high_order_func()
out = self.addn((self.parameter1, x, self.parameter2))
return out
def high_order_func(self):
self.print("parameter1: ", self.parameter1)
self.print("parameter2: ", self.parameter2)
return True
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_side_effect_high_order_print_in_high_order_net():
print_file = os.getcwd()+"/test_side_effect_high_order_print_in_high_order_net.data"
context.set_context(print_file_path=print_file)
net = SideEffectPrintInHighOrdeAddnNet()
out1 = net(Tensor([9.0], ms.float32))
net = SideEffectPrintInHighOrdeAddnNet()
try:
context.set_context(mode=context.PYNATIVE_MODE)
out2 = net(Tensor([9.0], ms.float32))
allclose_nparray(out1.asnumpy(), out2.asnumpy(), 0.001, 0.001)
finally:
context.set_context(mode=context.GRAPH_MODE)
class SideEffectControlFlowAssignDependTwoIfNet(Cell):
def __init__(self):
super().__init__()
self.parameter1 = Parameter(
Tensor([3.0], ms.float32), name="parameter1")
self.assign = P.Assign()
self.mul = P.Mul()
self.addn = P.AddN()
self.depend = P.Depend()
def construct(self, x, y):
self.assign(self.parameter1, x)
if self.parameter1 > y:
x = self.mul(x, x)
p2 = self.assign(self.parameter1, x)
if self.parameter1 > y:
x = self.addn((x, self.parameter1))
p3 = self.assign(self.parameter1, x)
self.depend(p3, p2)
return x
def grad_mindspore_impl(self, params1, params2, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params1, params2, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_side_effect_grad_control_flow_assign_depend_of_two_if():
net = SideEffectControlFlowAssignDependTwoIfNet()
grad_ys = Tensor([18.0], ms.float32)
inputs1 = Tensor([9.0], ms.float32)
inputs2 = Tensor([6.0], ms.float32)
net.grad_mindspore_impl(inputs1, inputs2, grad_ys)
class SideEffectTwoAddnSwitchNet(Cell):
def __init__(self):
super().__init__()
self.addN = P.AddN()
def construct(self, x):
y = x
x = self.addN((x, x, x))
y = self.addN((y, y))
if x > y:
return x
return y
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfAllInputsAndParams(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_side_effect_grad_two_addn_switch():
net = SideEffectTwoAddnSwitchNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
out1 = net.grad_mindspore_impl(inputs, grad_ys)
net = SideEffectTwoAddnSwitchNet()
context.set_context(mode=context.PYNATIVE_MODE)
out2 = net.grad_mindspore_impl(inputs, grad_ys)
allclose_nparray(out1[0][0].asnumpy(), out2[0][0].asnumpy(), 0.001, 0.001)
class SideEffectGradIfNet(Cell):
def __init__(self):
super().__init__()
self.relu = P.ReLU()
a = np.full((1,), 5, dtype=np.float32)
self.a = Parameter(Tensor(a), name="a")
b = np.full((1,), 4, dtype=np.float32)
self.b = Parameter(Tensor(b), name="b")
def construct(self, x):
if self.a > self.b:
x = self.relu(x)
out = x
else:
out = x + 2
return out
def grad_mindspore_impl(self, params, grad_ys):
grad_net = GradOfFirstInput(self)
grad_net.set_train()
grad_out = grad_net(params, grad_ys)
return grad_out
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_side_effect_grad_if():
context.set_context(mode=context.GRAPH_MODE)
net = SideEffectGradIfNet()
grad_ys = Tensor([18.0], ms.float32)
inputs = Tensor([9.0], ms.float32)
out1 = net.grad_mindspore_impl(inputs, grad_ys)
net = SideEffectGradIfNet()
context.set_context(mode=context.PYNATIVE_MODE)
out2 = net.grad_mindspore_impl(inputs, grad_ys)
allclose_nparray(out1.asnumpy(), out2.asnumpy(), 0.001, 0.001)
class OneInputBprop(Cell):
def __init__(self):
super().__init__()
self.op = P.ReLU()
def construct(self, x):
return self.op(x)
def bprop(self, x, out, dout):
return (5 * x,)
class HighGrad(Cell):
def __init__(self, network, grad_list, sens_param=False, real_inputs_count=None):
super().__init__()
self.grads = [network]
for i in range(len(grad_list)-1):
_grad = grad_list[i](self.grads[i], sens_param=False)
self.grads.append(_grad)
self.final_grad = grad_list[-1](self.grads[-1],
sens_param=sens_param, real_inputs_count=real_inputs_count)
def construct(self, *inputs):
return self.final_grad(*inputs)
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_highgrad_one_input_sec_grad():
net = OneInputBprop()
x = Tensor(np.array([2, 2]).astype(np.float32))
grad_net = HighGrad(net, [GradOfFirstInput, GradOfFirstInput])
dxdx = grad_net(x)
assert (dxdx.asnumpy() == np.array([5, 5]).astype(np.float32)).all()
@pytest.mark.level0
@pytest.mark.platform_arm_ascend_training
@pytest.mark.platform_x86_ascend_training
@pytest.mark.env_onecard
def test_highgrad_one_input_third_grad():
net = OneInputBprop()
x = Tensor(np.array([2, 2]).astype(np.float32))
grad_net = HighGrad(
net, [GradOfFirstInput, GradOfFirstInput, GradOfFirstInput])
third_grad = grad_net(x)
assert (third_grad.asnumpy() == np.array([0, 0]).astype(np.float32)).all()