#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import sys
import unittest
import numpy as np
from op_test import OpTest
from test_softmax_op import stable_softmax
import paddle.fluid as fluid


def CTCAlign(input, lod, blank, merge_repeated, padding=0, input_length=None):
    if input_length is None:
        lod0 = lod[0]
        result = []
        cur_offset = 0
        for i in range(len(lod0)):
            prev_token = -1
            for j in range(cur_offset, cur_offset + lod0[i]):
                token = input[j][0]
                if (token != blank) and not (merge_repeated and
                                             token == prev_token):
                    result.append(token)
                prev_token = token
            cur_offset += lod0[i]
        result = np.array(result).reshape([len(result), 1]).astype("int32")
        if len(result) == 0:
            result = np.array([-1])
        return result
    else:
        result = [[] for i in range(len(input))]
        output_length = []
        for i in range(len(input)):
            prev_token = -1
            for j in range(input_length[i][0]):
                token = input[i][j]
                if (token != blank) and not (merge_repeated and
                                             token == prev_token):
                    result[i].append(token)
                prev_token = token
            start = len(result[i])
            output_length.append([start])
            for j in range(start, len(input[i])):
                result[i].append(padding)
        result = np.array(result).reshape(
            [len(input), len(input[0])]).astype("int32")
        output_length = np.array(output_length).reshape(
            [len(input), 1]).astype("int32")

    return result, output_length


class TestCTCAlignOp(OpTest):
    def config(self):
        self.op_type = "ctc_align"
        self.input_lod = [[11, 7]]
        self.blank = 0
        self.merge_repeated = False
        self.input = np.array(
            [0, 1, 2, 2, 0, 4, 0, 4, 5, 0, 6, 6, 0, 0, 7, 7, 7, 0]).reshape(
                [18, 1]).astype("int32")

    def setUp(self):
        self.config()
        output = CTCAlign(self.input, self.input_lod, self.blank,
                          self.merge_repeated)

        self.inputs = {"Input": (self.input, self.input_lod), }
        self.outputs = {"Output": output}
        self.attrs = {
            "blank": self.blank,
            "merge_repeated": self.merge_repeated
        }

    def test_check_output(self):
        self.check_output()
        pass


class TestCTCAlignOpCase1(TestCTCAlignOp):
    def config(self):
        self.op_type = "ctc_align"
        self.input_lod = [[11, 8]]
        self.blank = 0
        self.merge_repeated = True
        self.input = np.array(
            [0, 1, 2, 2, 0, 4, 0, 4, 5, 0, 6, 6, 0, 0, 7, 7, 7, 0, 0]).reshape(
                [19, 1]).astype("int32")


class TestCTCAlignOpCase2(TestCTCAlignOp):
    def config(self):
        self.op_type = "ctc_align"
        self.input_lod = [[4]]
        self.blank = 0
        self.merge_repeated = True
        self.input = np.array([0, 0, 0, 0]).reshape([4, 1]).astype("int32")


class TestCTCAlignPaddingOp(OpTest):
    def config(self):
        self.op_type = "ctc_align"
        self.input_lod = []
        self.blank = 0
        self.padding_value = 0
        self.merge_repeated = True
        self.input = np.array([[0, 2, 4, 4, 0, 6, 3, 6, 6, 0, 0],
                               [1, 1, 3, 0, 0, 4, 5, 6, 0, 0, 0]]).reshape(
                                   [2, 11]).astype("int32")
        self.input_length = np.array([[9], [8]]).reshape([2, 1]).astype("int32")

    def setUp(self):
        self.config()
        output, output_length = CTCAlign(self.input, self.input_lod, self.blank,
                                         self.merge_repeated,
                                         self.padding_value, self.input_length)
        self.inputs = {
            "Input": (self.input, self.input_lod),
            "InputLength": self.input_length
        }
        self.outputs = {"Output": output, "OutputLength": output_length}
        self.attrs = {
            "blank": self.blank,
            "merge_repeated": self.merge_repeated,
            "padding_value": self.padding_value
        }

    def test_check_output(self):
        self.check_output()


class TestCTCAlignOpCase3(TestCTCAlignPaddingOp):
    def config(self):
        self.op_type = "ctc_align"
        self.blank = 0
        self.input_lod = []
        self.merge_repeated = True
        self.padding_value = 0
        self.input = np.array([[0, 1, 2, 2, 0, 4], [0, 4, 5, 0, 6, 0],
                               [0, 7, 7, 7, 0, 0]]).reshape(
                                   [3, 6]).astype("int32")
        self.input_length = np.array([[6], [5],
                                      [4]]).reshape([3, 1]).astype("int32")


class TestCTCAlignOpCase4(TestCTCAlignPaddingOp):
    '''
    # test tensor input which has attr input padding_value
    '''

    def config(self):
        self.op_type = "ctc_align"
        self.blank = 0
        self.input_lod = []
        self.merge_repeated = False
        self.padding_value = 0
        self.input = np.array([[0, 1, 2, 2, 0, 4], [0, 4, 5, 0, 6, 0],
                               [0, 7, 7, 7, 0, 0]]).reshape(
                                   [3, 6]).astype("int32")
        self.input_length = np.array([[6], [5],
                                      [4]]).reshape([3, 1]).astype("int32")


class TestCTCAlignOpCase5(TestCTCAlignPaddingOp):
    def config(self):
        self.op_type = "ctc_align"
        self.blank = 0
        self.input_lod = []
        self.merge_repeated = False
        self.padding_value = 1
        self.input = np.array([[0, 1, 2, 2, 0, 4], [0, 4, 5, 0, 6, 0],
                               [0, 7, 1, 7, 0, 0]]).reshape(
                                   [3, 6]).astype("int32")
        self.input_length = np.array([[6], [5],
                                      [4]]).reshape([3, 1]).astype("int32")


class TestCTCAlignOpApi(unittest.TestCase):
    def test_api(self):
        x = fluid.layers.data('x', shape=[4], dtype='float32')
        y = fluid.layers.ctc_greedy_decoder(x, blank=0)

        x_pad = fluid.layers.data('x_pad', shape=[4, 4], dtype='float32')
        x_pad_len = fluid.layers.data('x_pad_len', shape=[1], dtype='int64')
        y_pad, y_pad_len = fluid.layers.ctc_greedy_decoder(
            x_pad, blank=0, input_length=x_pad_len)

        place = fluid.CPUPlace()
        x_tensor = fluid.create_lod_tensor(
            np.random.rand(8, 4).astype("float32"), [[4, 4]], place)

        x_pad_tensor = np.random.rand(2, 4, 4).astype("float32")
        x_pad_len_tensor = np.array([[4], [4]]).reshape([2, 1]).astype("int64")

        exe = fluid.Executor(place)

        exe.run(fluid.default_startup_program())
        ret = exe.run(feed={
            'x': x_tensor,
            'x_pad': x_pad_tensor,
            'x_pad_len': x_pad_len_tensor
        },
                      fetch_list=[y, y_pad, y_pad_len],
                      return_numpy=False)


if __name__ == "__main__":
    unittest.main()