Leonard Xu created FLINK-19271: ---------------------------------- Summary: wrong HOP_PROCTIME output when materialize proctime Key: FLINK-19271 URL: https://issues.apache.org/jira/browse/FLINK-19271 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Leonard Xu
The HOP_PROCTIME is bigger than HOP_END in the following case. The reason is we materialize the process time(HOP_PROCTIME) in the downstream of `WindowAggregate` rather than internal of `WindowAggregate`, this lead the HOP_PROCTIME is bigger than HOP_END forever. And I believe this problem exists in TUMBLE_PROCTIME and SESSION_PROCTIME too, We should use the `HOP_END - 1` as the HOP_PROCTIME when the proctime need materialization. {code:java} # {code} *WindowAggregateITCase* {code:java} @Test def testEventTimeSlidingWindowProcTime(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'a, 'int, 'double, 'float, 'bigdec, 'string, 'name, 'proctime.proctime()) tEnv.registerTable("T1", table) val sql = """ |SELECT | HOP_START(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_END(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_PROCTIME(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) |FROM T1 |GROUP BY `string`, HOP(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val expected = Seq( "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.352, 2020-09-17T07:13:40.357, 2020-09-17T07:13:44.030") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)