JING ZHANG created FLINK-24168:
----------------------------------

             Summary: Rowtime type is not correct for windowTableFunction or 
OverAggregate on Match because the output type does not updated after input 
rowtime attribute changed from rowtime to rowtime_ltz
                 Key: FLINK-24168
                 URL: https://issues.apache.org/jira/browse/FLINK-24168
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.0
            Reporter: JING ZHANG


Rowtime type is not correct for windowTableFunction or OverAggregate on Match 
because the output type does not updated after input rowtime attribute changed 
from rowtime to rowtime_ltz in `RelTimeIndicator`.

The bug could be reproduced by the following two cases:
{code:java}
@Test
def testWindowTVFOnMatchRecognizeOnRowtimeLTZ(): Unit = {
  val sqlQuery =
    s"""
       |SELECT
       |  *
       |FROM Ticker
       |MATCH_RECOGNIZE (
       |  PARTITION BY symbol
       |  ORDER BY ts_ltz
       |  MEASURES
       |    A.price as price,
       |    A.tax as tax,
       |    MATCH_ROWTIME() as matchRowtime
       |  ONE ROW PER MATCH
       |  PATTERN (A)
       |  DEFINE
       |    A AS A.price > 0
       |) AS T
       |""".stripMargin
  val table = util.tableEnv.sqlQuery(sqlQuery)
  util.tableEnv.registerTable("T", table)
  val sqlQuery1 =
    s"""
       |SELECT *
       |FROM TABLE(TUMBLE(TABLE T, DESCRIPTOR(matchRowtime), INTERVAL '3' 
second))
       |""".stripMargin
  util.verifyRelPlanWithType(sqlQuery1)
}

@Test
def testOverWindowOnMatchRecognizeOnRowtimeLTZ(): Unit = {
  val sqlQuery =
    s"""
       |SELECT
       |  *
       |FROM Ticker
       |MATCH_RECOGNIZE (
       |  PARTITION BY symbol
       |  ORDER BY ts_ltz
       |  MEASURES
       |    A.price as price,
       |    A.tax as tax,
       |    MATCH_ROWTIME() as matchRowtime
       |  ONE ROW PER MATCH
       |  PATTERN (A)
       |  DEFINE
       |    A AS A.price > 0
       |) AS T
       |""".stripMargin
  val table = util.tableEnv.sqlQuery(sqlQuery)
  util.tableEnv.registerTable("T", table)
  val sqlQuery1 =
    """
      |SELECT
      |  symbol,
      |  price,
      |  tax,
      |  matchRowtime,
      |  SUM(price) OVER (
      |    PARTITION BY symbol ORDER BY matchRowtime RANGE UNBOUNDED PRECEDING) 
as price_sum
      |FROM T
    """.stripMargin
  util.verifyRelPlanWithType(sqlQuery1)
}

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to