lincoln-lil commented on code in PR #25119: URL: https://github.com/apache/flink/pull/25119#discussion_r1700398264
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java: ########## @@ -152,6 +152,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { outputRankNumber, windowEndIndex, shiftTimeZone); - return new WindowAggOperator<>(windowProcessor); + // Processing time Window TopN is not supported yet. + return new WindowAggOperator<>(windowProcessor, true); Review Comment: Is there any attribute representing eventtime? If yes, we can use a judgement expression instead of a literal `true` value. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java: ########## @@ -222,7 +225,10 @@ public void processElement(StreamRecord<RowData> element) throws Exception { @Override public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() > currentWatermark) { - windowProcessor.advanceProgress(mark.getTimestamp()); + // advance the window processor by timer if this window agg is based on proctime Review Comment: -> 'If this is a proctime window, progress should not be advanced by watermark, or it'll disturb timer-based processing'? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala: ########## @@ -687,6 +688,51 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI testHarness1.close() } + @TestTemplate + def testProcessingTimeWindowAggWithLargeWatermarkArrivesFirst(): Unit = { Review Comment: -> `testProcessingTimeTumbleWindowWithFutureWatermark` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org