lincoln-lil commented on code in PR #25119:
URL: https://github.com/apache/flink/pull/25119#discussion_r1700398264


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java:
##########
@@ -152,6 +152,7 @@ public WindowRankOperatorBuilder windowEndIndex(int 
windowEndIndex) {
                         outputRankNumber,
                         windowEndIndex,
                         shiftTimeZone);
-        return new WindowAggOperator<>(windowProcessor);
+        // Processing time Window TopN is not supported yet.
+        return new WindowAggOperator<>(windowProcessor, true);

Review Comment:
   Is there any attribute representing eventtime? If yes, we can use a 
judgement expression instead of a literal `true` value.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java:
##########
@@ -222,7 +225,10 @@ public void processElement(StreamRecord<RowData> element) 
throws Exception {
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         if (mark.getTimestamp() > currentWatermark) {
-            windowProcessor.advanceProgress(mark.getTimestamp());
+            // advance the window processor by timer if this window agg is 
based on proctime

Review Comment:
   -> 'If this is a proctime window, progress should not be advanced by 
watermark, or it'll disturb timer-based processing'?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala:
##########
@@ -687,6 +688,51 @@ class WindowAggregateHarnessTest(backend: 
StateBackendMode, shiftTimeZone: ZoneI
     testHarness1.close()
   }
 
+  @TestTemplate
+  def testProcessingTimeWindowAggWithLargeWatermarkArrivesFirst(): Unit = {

Review Comment:
   -> `testProcessingTimeTumbleWindowWithFutureWatermark` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to