[ 
https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587268#comment-16587268
 ] 

Timo Walther commented on FLINK-8897:
-------------------------------------

The following query is also affected by this bug:

{code}
@Test
  def testRunningLast2() = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    StreamITCase.clear

    val data = new mutable.MutableList[(String, Timestamp, Int, Int)]
    data.+=(("ACME", new Timestamp(1000L), 12, 1))
    data.+=(("ACME", new Timestamp(2000L), 17, 2))
    data.+=(("ACME", new Timestamp(3000L), 13, 4))
    data.+=(("ACME", new Timestamp(4000L), 11, 3))
    data.+=(("ACME", new Timestamp(5000L), 20, 4))
    data.+=(("ACME", new Timestamp(6000L), 24, 4))
    data.+=(("ACME", new Timestamp(7000L), 25, 3))
    data.+=(("ACME", new Timestamp(8000L), 19, 8))

    val t = env.fromCollection(data).assignAscendingTimestamps(e => 
e._2.toInstant.toEpochMilli).toTable(tEnv, 'symbol, 'tstamp.rowtime, 'price, 
'tax)
    tEnv.registerTable("Ticker", t)

    val sqlQuery =
      s"""
         |SELECT *
         |FROM (
         |   SELECT symbol, SUM(price) as price, TUMBLE_ROWTIME(tstamp, 
interval '1' second) as rowTime, TUMBLE_START(tstamp, interval '1' second) as 
startTime, TUMBLE_END(tstamp, interval '1' second) as endTime
         |   FROM Ticker
         |   GROUP BY symbol, TUMBLE(tstamp, interval '1' second)
         |)
         |WHERE startTime < endTime
         |""".stripMargin

    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])
    env.execute()

    val expected = List("2,4,5", "2,4,6", "3,4,5", "3,4,6")
    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
  }
{code}

> Rowtime materialization causes "mismatched type" AssertionError
> ---------------------------------------------------------------
>
>                 Key: FLINK-8897
>                 URL: https://issues.apache.org/jira/browse/FLINK-8897
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: Xingcan Cui
>            Assignee: Timo Walther
>            Priority: Major
>             Fix For: 1.5.4
>
>
> As raised in [this 
> thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E],
>  the query created by the following code will throw a calcite "mismatch type" 
> ({{Timestamp(3)}} and {{TimeIndicator}}) exception.
> {code:java}
> String sql1 = "select id, eventTs as t1, count(*) over (partition by id order 
> by eventTs rows between 100 preceding and current row) as cnt1 from myTable1";
> String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over 
> (partition by id order by eventTs rows between 50 preceding and current row) 
> as cnt2 from myTable2";
> Table left = tableEnv.sqlQuery(sql1);
> Table right = tableEnv.sqlQuery(sql2);
> left.join(right).where("id === r_id && t1 === t2").select("id, 
> t1").writeToSink(...)
> {code}
> The logical plan is as follows.
> {code}
> LogicalProject(id=[$0], t1=[$1])
>   LogicalFilter(condition=[AND(=($0, $3), =($1, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalAggregate(group=[{0, 1, 2}])
>         LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>           LogicalProject(id=[$0], eventTs=[$3])
>             LogicalTableScan(table=[[_DataStreamTable_0]])
>       LogicalAggregate(group=[{0, 1, 2}])
>         LogicalWindow(window#0=[window(partition {0} order by [1] rows 
> between $2 PRECEDING and CURRENT ROW aggs [COUNT()])])
>           LogicalProject(id=[$0], eventTs=[$3])
>             LogicalTableScan(table=[[_DataStreamTable_0]])
> {code}
> That is because the the rowtime field after an aggregation will be 
> materialized while the {{RexInputRef}} type for the filter's operands ({{t1 
> === t2}}) is still {{TimeIndicator}}. We should make them unified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to