[ 
https://issues.apache.org/jira/browse/FLINK-19926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350250#comment-17350250
 ] 

Shuo Cheng commented on FLINK-19926:
------------------------------------

Hi, [~satyamshekhar], I've test the case you present above, and the result 
looks correct. Add the following code to ` 
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase` to 
reproduce. I think there is something else affects the result in your case, 
Could you provide a complete and runnable test example?
{code:java}
@Before
def setup(): Unit = {
  val data = List(
    rowOf(0L, LocalDateTime.parse("1970-01-01T00:00:00")),
    rowOf(1L, LocalDateTime.parse("1970-01-02T00:00:00"))
  )
  val dataId = TestValuesTableFactory.registerData(data)
  tEnv.executeSql(
    s"""
       |create table T0(
       |  amount bigint,
       |  rowtime timestamp(3),
       |  watermark for rowtime as rowtime
       |) with (
       |  'connector' = 'values',
       |  'data-id' = '$dataId'
       |)
       |""".stripMargin)
}

@Test
def testCascadingTumbleWindow1(): Unit = {
  val innerSql =
    """
      |create view CTE as
      |SELECT sum(`amount`) as _output, tumble_end(rowtime, interval '1' 
second) _dim0
      |FROM T0
      |GROUP BY TUMBLE(rowtime, INTERVAL '1' SECOND)
      |""".stripMargin
  tEnv.executeSql(innerSql)

  val sql =
    """
      |SELECT V0._output as V0_output, V1._output AS V1_output,
      |               V0._dim0 as V0_time, V1._dim0 as V1_time
      |    FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0
      |""".stripMargin

  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
  env.execute()

  val expected = Seq(
    "0,0,1970-01-01T00:00:01,1970-01-01T00:00:01",
    "1,1,1970-01-02T00:00:01,1970-01-02T00:00:01")
  assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
{code}

> Wrong results for join post tumble grouping
> -------------------------------------------
>
>                 Key: FLINK-19926
>                 URL: https://issues.apache.org/jira/browse/FLINK-19926
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>         Environment: Flink version: 1.11.1
>            Reporter: Satyam Shekhar
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> I have a table T0 with the following schema -
> {code:java}
> root
>   |-- amount: BIGINT
>   |-- timestamp: TIMESTAMP(3)
> {code}
>  
> The table T0 has two rows -
> |amount|timestamp|
> |0|0|
> |1|86400000|
>  
> The following query with tumble grouping returns the wrong result -
> {code:java}
> WITH CTE AS 
>     (SELECT SUM(amount) AS _output, 
>                    TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
>            FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND))
> SELECT V0._output as V0_output, V1._output AS V1_output, 
>                V0._dim0 as V0_time, V1._dim0 as V1_time
>     FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0
> {code}
>  
>  The returned result is -
> |V0_output|V1_output|V0_time|V1_time|
> |1|1|86401000|86401000|
>  
> The expected result is -
> |V0_output|V1_output|V0_time|V1_time|
> |0|0|1000|1000|
> |1|1|86401000|86401000|
>  
> Running subquery for `CTE` returns the correct result -
> {code:java}
> SELECT SUM(amount) AS _output,
>               TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
>    FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND)
> {code}
>  
> Result (this is correct) -
> |_output|_dim0|
> |0|1000|
> |1|86401000|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to