|
|
|
@ -88,6 +88,68 @@ class NormalNumpy(DistributionNumpy):
|
|
|
|
|
return 0.5 * (var_ratio + t1 - 1 - np.log(var_ratio))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CategoricalNumpy(DistributionNumpy):
|
|
|
|
|
def __init__(self, logits):
|
|
|
|
|
self.logits = np.array(logits).astype('float32')
|
|
|
|
|
|
|
|
|
|
def entropy(self):
|
|
|
|
|
logits = self.logits - np.max(self.logits, axis=-1, keepdims=True)
|
|
|
|
|
e_logits = np.exp(logits)
|
|
|
|
|
z = np.sum(e_logits, axis=-1, keepdims=True)
|
|
|
|
|
prob = e_logits / z
|
|
|
|
|
return -1. * np.sum(prob * (logits - np.log(z)), axis=-1, keepdims=True)
|
|
|
|
|
|
|
|
|
|
def kl_divergence(self, other):
|
|
|
|
|
logits = self.logits - np.max(self.logits, axis=-1, keepdims=True)
|
|
|
|
|
other_logits = other.logits - np.max(
|
|
|
|
|
other.logits, axis=-1, keepdims=True)
|
|
|
|
|
e_logits = np.exp(logits)
|
|
|
|
|
other_e_logits = np.exp(other_logits)
|
|
|
|
|
z = np.sum(e_logits, axis=-1, keepdims=True)
|
|
|
|
|
other_z = np.sum(other_e_logits, axis=-1, keepdims=True)
|
|
|
|
|
prob = e_logits / z
|
|
|
|
|
return np.sum(prob * (logits - np.log(z) - other_logits \
|
|
|
|
|
+ np.log(other_z)), axis=-1, keepdims=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MultivariateNormalDiagNumpy(DistributionNumpy):
|
|
|
|
|
def __init__(self, loc, scale):
|
|
|
|
|
self.loc = np.array(loc).astype('float32')
|
|
|
|
|
self.scale = np.array(scale).astype('float32')
|
|
|
|
|
|
|
|
|
|
def _det(self, value):
|
|
|
|
|
batch_shape = list(value.shape)
|
|
|
|
|
one_all = np.ones(shape=batch_shape, dtype='float32')
|
|
|
|
|
one_diag = np.eye(batch_shape[0], dtype='float32')
|
|
|
|
|
det_diag = np.prod(value + one_all - one_diag)
|
|
|
|
|
|
|
|
|
|
return det_diag
|
|
|
|
|
|
|
|
|
|
def _inv(self, value):
|
|
|
|
|
batch_shape = list(value.shape)
|
|
|
|
|
one_all = np.ones(shape=batch_shape, dtype='float32')
|
|
|
|
|
one_diag = np.eye(batch_shape[0], dtype='float32')
|
|
|
|
|
inv_diag = np.power(value, (one_all - 2 * one_diag))
|
|
|
|
|
|
|
|
|
|
return inv_diag
|
|
|
|
|
|
|
|
|
|
def entropy(self):
|
|
|
|
|
return 0.5 * (self.scale.shape[0] *
|
|
|
|
|
(1.0 + np.log(np.array(2 * math.pi).astype('float32'))
|
|
|
|
|
) + np.log(self._det(self.scale)))
|
|
|
|
|
|
|
|
|
|
def kl_divergence(self, other):
|
|
|
|
|
tr_cov_matmul = np.sum(self._inv(other.scale) * self.scale)
|
|
|
|
|
loc_matmul_cov = np.matmul((other.loc - self.loc),
|
|
|
|
|
self._inv(other.scale))
|
|
|
|
|
tri_matmul = np.matmul(loc_matmul_cov, (other.loc - self.loc))
|
|
|
|
|
k = list(self.scale.shape)[0]
|
|
|
|
|
ln_cov = np.log(self._det(other.scale)) - np.log(self._det(self.scale))
|
|
|
|
|
kl = 0.5 * (tr_cov_matmul + tri_matmul - k + ln_cov)
|
|
|
|
|
|
|
|
|
|
return kl
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DistributionTest(unittest.TestCase):
|
|
|
|
|
def setUp(self, use_gpu=False):
|
|
|
|
|
self.use_gpu = use_gpu
|
|
|
|
@ -410,6 +472,105 @@ class DistributionTest(unittest.TestCase):
|
|
|
|
|
np.testing.assert_allclose(
|
|
|
|
|
output_lp_variable, gt_lp, rtol=tolerance, atol=tolerance)
|
|
|
|
|
|
|
|
|
|
def test_categorical_distribution(self,
|
|
|
|
|
batch_size=2,
|
|
|
|
|
dims=3,
|
|
|
|
|
tolerance=1e-6):
|
|
|
|
|
test_program = fluid.Program()
|
|
|
|
|
|
|
|
|
|
logits_np = np.random.randn(batch_size, dims).astype('float32')
|
|
|
|
|
other_logits_np = np.random.randn(batch_size, dims).astype('float32')
|
|
|
|
|
|
|
|
|
|
with fluid.program_guard(test_program):
|
|
|
|
|
logits = layers.data(name='logits', shape=[dims], dtype='float32')
|
|
|
|
|
other_logits = layers.data(
|
|
|
|
|
name='other_logits', shape=[dims], dtype='float32')
|
|
|
|
|
|
|
|
|
|
categorical_np = Categorical(logits_np)
|
|
|
|
|
other_categorical_np = Categorical(other_logits_np)
|
|
|
|
|
|
|
|
|
|
entropy_np = categorical_np.entropy()
|
|
|
|
|
kl_np = categorical_np.kl_divergence(other_categorical_np)
|
|
|
|
|
|
|
|
|
|
self.executor.run(fluid.default_main_program())
|
|
|
|
|
|
|
|
|
|
np_categorical = CategoricalNumpy(logits_np)
|
|
|
|
|
np_other_categorical = CategoricalNumpy(other_logits_np)
|
|
|
|
|
gt_entropy_np = np_categorical.entropy()
|
|
|
|
|
gt_kl_np = np_categorical.kl_divergence(np_other_categorical)
|
|
|
|
|
|
|
|
|
|
# result calculated by paddle
|
|
|
|
|
[output_entropy_np,
|
|
|
|
|
output_kl_np] = self.executor.run(program=test_program,
|
|
|
|
|
feed={'logits': logits_np},
|
|
|
|
|
fetch_list=[entropy_np, kl_np])
|
|
|
|
|
np.testing.assert_allclose(
|
|
|
|
|
output_entropy_np, gt_entropy_np, rtol=tolerance)
|
|
|
|
|
np.testing.assert_allclose(output_kl_np, gt_kl_np, rtol=tolerance)
|
|
|
|
|
|
|
|
|
|
def test_multivariateNormalDiag_distribution(self,
|
|
|
|
|
batch_size=2,
|
|
|
|
|
tolerance=1e-6):
|
|
|
|
|
test_program = fluid.Program()
|
|
|
|
|
|
|
|
|
|
loc_np = np.random.random(batch_size, ).astype('float32')
|
|
|
|
|
scale_np = np.diag(np.random.random(batch_size, )).astype('float32')
|
|
|
|
|
other_loc_np = np.random.random(batch_size, ).astype('float32')
|
|
|
|
|
other_scale_np = np.diag(np.random.random(batch_size, )).astype(
|
|
|
|
|
'float32')
|
|
|
|
|
|
|
|
|
|
with fluid.program_guard(test_program):
|
|
|
|
|
loc = layers.data(
|
|
|
|
|
name='loc',
|
|
|
|
|
shape=[batch_size, ],
|
|
|
|
|
dtype='float32',
|
|
|
|
|
append_batch_size=False)
|
|
|
|
|
scale = layers.data(
|
|
|
|
|
name='scale',
|
|
|
|
|
shape=[batch_size, batch_size],
|
|
|
|
|
dtype='float32',
|
|
|
|
|
append_batch_size=False)
|
|
|
|
|
other_loc = layers.data(
|
|
|
|
|
name='other_loc',
|
|
|
|
|
shape=[batch_size, ],
|
|
|
|
|
dtype='float32',
|
|
|
|
|
append_batch_size=False)
|
|
|
|
|
other_scale = layers.data(
|
|
|
|
|
name='other_scale',
|
|
|
|
|
shape=[batch_size, batch_size],
|
|
|
|
|
dtype='float32',
|
|
|
|
|
append_batch_size=False)
|
|
|
|
|
|
|
|
|
|
multivariate_np = MultivariateNormalDiag(loc, scale)
|
|
|
|
|
other_multivariate_np = MultivariateNormalDiag(other_loc,
|
|
|
|
|
other_scale)
|
|
|
|
|
|
|
|
|
|
entropy_np = multivariate_np.entropy()
|
|
|
|
|
other_entropy_np = other_multivariate_np.entropy()
|
|
|
|
|
kl_np = multivariate_np.kl_divergence(other_multivariate_np)
|
|
|
|
|
|
|
|
|
|
self.executor.run(fluid.default_main_program())
|
|
|
|
|
|
|
|
|
|
np_multivariate = MultivariateNormalDiagNumpy(loc_np, scale_np)
|
|
|
|
|
np_other_multivariate = MultivariateNormalDiagNumpy(other_loc_np,
|
|
|
|
|
other_scale_np)
|
|
|
|
|
gt_entropy_np = np_multivariate.entropy()
|
|
|
|
|
gt_kl_np = np_multivariate.kl_divergence(np_other_multivariate)
|
|
|
|
|
|
|
|
|
|
# result calculated by paddle
|
|
|
|
|
[output_entropy_np,
|
|
|
|
|
output_kl_np] = self.executor.run(program=test_program,
|
|
|
|
|
feed={
|
|
|
|
|
'loc': loc_np,
|
|
|
|
|
'scale': scale_np,
|
|
|
|
|
'other_loc': other_loc_np,
|
|
|
|
|
'other_scale': other_scale_np
|
|
|
|
|
},
|
|
|
|
|
fetch_list=[entropy_np, kl_np])
|
|
|
|
|
np.testing.assert_allclose(
|
|
|
|
|
output_entropy_np, gt_entropy_np, rtol=tolerance)
|
|
|
|
|
np.testing.assert_allclose(output_kl_np, gt_kl_np, rtol=tolerance)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest.main()
|
|
|
|
|