wuchong commented on a change in pull request #15485:
URL: https://github.com/apache/flink/pull/15485#discussion_r609455324



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
##########
@@ -133,10 +134,9 @@ public boolean processElement(RowData key, RowData 
element) throws Exception {
         long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
         if (!isEventTime) {
             // always register processing time for every element when 
processing time mode
-            timerService.registerProcessingTimeTimer(
-                    sliceEnd, TimeWindowUtil.toEpochMillsForTimer(sliceEnd - 
1, shiftTimeZone));
+            windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
         }
-        if (isEventTime && sliceEnd - 1 <= currentProgress) {
+        if (isEventTime && toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone) 
<= currentProgress) {

Review comment:
       I noticed that we have many places calling this logic. I think we can 
extract the comparison into a util method in `TimeWindowUtils`, e.g. 
   
   ```java
       // TODO: add javadocs
       public static boolean isWindowFired(
               long windowEnd, long currentProgress, ZoneId shiftTimeZone) {
           long windowTriggerTime = toEpochMillsForTimer(windowEnd - 1, 
shiftTimeZone);
           return currentProgress >= windowTriggerTime;
       }
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -81,7 +86,7 @@ public void addElement(RowData key, long sliceEnd, RowData 
element) throws Excep
 
     @Override
     public void advanceProgress(long progress) throws Exception {
-        if (progress >= minTriggerTime) {
+        if (progress >= TimeWindowUtil.toEpochMillsForTimer(minSliceEnd, 
shiftTimeZone)) {

Review comment:
       Should be `toEpochMillsForTimer(minSliceEnd - 1, shiftTimeZone)`.
   Would be better to have a local variable name, e.g. `minTriggerTime`, which 
will help to understand the meaning. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
##########
@@ -177,7 +192,7 @@ private long computeMemorySize() {
     // ------------------------------------------------------------------------
     /** Method to get the next watermark to trigger window. */
     private static long getNextTriggerWatermark(long currentWatermark, long 
interval) {
-        long start = TimeWindow.getWindowStartWithOffset(currentWatermark, 0L, 
interval);
+        long start = getWindowStartWithOffset(currentWatermark, 0L, interval);

Review comment:
       Need to shift `currentWatermark` to UTC millis before getting window 
start?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -42,30 +44,33 @@
     private final WindowBytesMultiMap recordsBuffer;
     private final WindowKey reuseWindowKey;
     private final AbstractRowDataSerializer<RowData> recordSerializer;
+    private final ZoneId shiftTimeZone;
 
-    private long minTriggerTime = Long.MAX_VALUE;
+    private long minSliceEnd = Long.MAX_VALUE;
 
     public RecordsWindowBuffer(
             Object operatorOwner,
             MemoryManager memoryManager,
             long memorySize,
             WindowCombineFunction combineFunction,
             PagedTypeSerializer<RowData> keySer,
-            AbstractRowDataSerializer<RowData> inputSer) {
+            AbstractRowDataSerializer<RowData> inputSer,
+            ZoneId shiftTimeZone) {
         this.combineFunction = combineFunction;
         this.recordsBuffer =
                 new WindowBytesMultiMap(
                         operatorOwner, memoryManager, memorySize, keySer, 
inputSer.getArity());
         this.recordSerializer = inputSer;
         this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
+        this.shiftTimeZone = shiftTimeZone;
     }
 
     @Override
     public void addElement(RowData key, long sliceEnd, RowData element) throws 
Exception {
         // track the lowest trigger time, if watermark exceeds the trigger 
time,
         // it means there are some elements in the buffer belong to a window 
going to be fired,
         // and we need to flush the buffer into state for firing.
-        minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);
+        minSliceEnd = Math.min(sliceEnd - 1, minSliceEnd);

Review comment:
       Should be `sliceEnd` instead of `sliceEnd - 1`. Otherwise, we shouldn't 
call this `minSliceEnd`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
##########
@@ -60,13 +66,17 @@
     private final long rankEnd;
     private final boolean outputRankNumber;
     private final int windowEndIndex;
+    private final ZoneId shiftTimeZone;
 
     // 
----------------------------------------------------------------------------------------
 
     private transient long currentProgress;
 
     private transient Context<Long> ctx;
 
+    private transient InternalTimerService<Long> timerService;

Review comment:
       This member field is not needed anymore. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
##########
@@ -67,6 +64,8 @@
 
     protected transient InternalTimerService<Long> timerService;

Review comment:
       We don't need this member field anymore. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java
##########
@@ -94,28 +95,33 @@ public void open(TriggerContext ctx) throws Exception {
 
         @Override
         public boolean onElement(Object element, long timestamp, W window) 
throws Exception {
-            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+            if (triggerTime(window) <= ctx.getCurrentWatermark()) {
                 // if the watermark is already past the window fire immediately
                 return true;
             } else {
-                ctx.registerEventTimeTimer(window.maxTimestamp());
+                ctx.registerEventTimeTimer(triggerTime(window));
                 return false;
             }
         }
 
+        private long triggerTime(W window) {
+            return toEpochMillsForTimer(window.maxTimestamp(), 
ctx.getShiftTimeZone());
+        }
+
         @Override
         public boolean onProcessingTime(long time, W window) throws Exception {
             return false;
         }
 
         @Override
         public boolean onEventTime(long time, W window) throws Exception {
-            return time == window.maxTimestamp();
+            return time == toEpochMillsForTimer(window.maxTimestamp(), 
ctx.getShiftTimeZone());

Review comment:
       Please replace all the `toEpochMillsForTimer(window.maxTimestamp(), 
ctx.getShiftTimeZone())` with `triggerTime(window)`. We can have an abstract 
class for the triggers with a protected `triggerTime` method in it to avoid 
duplicate implement this method.
   
   
   Same to `ProcessingTimeTriggers`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to