wuchong commented on a change in pull request #15485: URL: https://github.com/apache/flink/pull/15485#discussion_r609455324
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java ########## @@ -133,10 +134,9 @@ public boolean processElement(RowData key, RowData element) throws Exception { long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService); if (!isEventTime) { // always register processing time for every element when processing time mode - timerService.registerProcessingTimeTimer( - sliceEnd, TimeWindowUtil.toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone)); + windowTimerService.registerProcessingTimeWindowTimer(sliceEnd); } - if (isEventTime && sliceEnd - 1 <= currentProgress) { + if (isEventTime && toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone) <= currentProgress) { Review comment: I noticed that we have many places calling this logic. I think we can extract the comparison into a util method in `TimeWindowUtils`, e.g. ```java // TODO: add javadocs public static boolean isWindowFired( long windowEnd, long currentProgress, ZoneId shiftTimeZone) { long windowTriggerTime = toEpochMillsForTimer(windowEnd - 1, shiftTimeZone); return currentProgress >= windowTriggerTime; } ``` ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java ########## @@ -81,7 +86,7 @@ public void addElement(RowData key, long sliceEnd, RowData element) throws Excep @Override public void advanceProgress(long progress) throws Exception { - if (progress >= minTriggerTime) { + if (progress >= TimeWindowUtil.toEpochMillsForTimer(minSliceEnd, shiftTimeZone)) { Review comment: Should be `toEpochMillsForTimer(minSliceEnd - 1, shiftTimeZone)`. Would be better to have a local variable name, e.g. `minTriggerTime`, which will help to understand the meaning. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java ########## @@ -177,7 +192,7 @@ private long computeMemorySize() { // ------------------------------------------------------------------------ /** Method to get the next watermark to trigger window. */ private static long getNextTriggerWatermark(long currentWatermark, long interval) { - long start = TimeWindow.getWindowStartWithOffset(currentWatermark, 0L, interval); + long start = getWindowStartWithOffset(currentWatermark, 0L, interval); Review comment: Need to shift `currentWatermark` to UTC millis before getting window start? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java ########## @@ -42,30 +44,33 @@ private final WindowBytesMultiMap recordsBuffer; private final WindowKey reuseWindowKey; private final AbstractRowDataSerializer<RowData> recordSerializer; + private final ZoneId shiftTimeZone; - private long minTriggerTime = Long.MAX_VALUE; + private long minSliceEnd = Long.MAX_VALUE; public RecordsWindowBuffer( Object operatorOwner, MemoryManager memoryManager, long memorySize, WindowCombineFunction combineFunction, PagedTypeSerializer<RowData> keySer, - AbstractRowDataSerializer<RowData> inputSer) { + AbstractRowDataSerializer<RowData> inputSer, + ZoneId shiftTimeZone) { this.combineFunction = combineFunction; this.recordsBuffer = new WindowBytesMultiMap( operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity()); this.recordSerializer = inputSer; this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance(); + this.shiftTimeZone = shiftTimeZone; } @Override public void addElement(RowData key, long sliceEnd, RowData element) throws Exception { // track the lowest trigger time, if watermark exceeds the trigger time, // it means there are some elements in the buffer belong to a window going to be fired, // and we need to flush the buffer into state for firing. - minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime); + minSliceEnd = Math.min(sliceEnd - 1, minSliceEnd); Review comment: Should be `sliceEnd` instead of `sliceEnd - 1`. Otherwise, we shouldn't call this `minSliceEnd`. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java ########## @@ -60,13 +66,17 @@ private final long rankEnd; private final boolean outputRankNumber; private final int windowEndIndex; + private final ZoneId shiftTimeZone; // ---------------------------------------------------------------------------------------- private transient long currentProgress; private transient Context<Long> ctx; + private transient InternalTimerService<Long> timerService; Review comment: This member field is not needed anymore. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java ########## @@ -67,6 +64,8 @@ protected transient InternalTimerService<Long> timerService; Review comment: We don't need this member field anymore. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java ########## @@ -94,28 +95,33 @@ public void open(TriggerContext ctx) throws Exception { @Override public boolean onElement(Object element, long timestamp, W window) throws Exception { - if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + if (triggerTime(window) <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return true; } else { - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer(triggerTime(window)); return false; } } + private long triggerTime(W window) { + return toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); + } + @Override public boolean onProcessingTime(long time, W window) throws Exception { return false; } @Override public boolean onEventTime(long time, W window) throws Exception { - return time == window.maxTimestamp(); + return time == toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); Review comment: Please replace all the `toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())` with `triggerTime(window)`. We can have an abstract class for the triggers with a protected `triggerTime` method in it to avoid duplicate implement this method. Same to `ProcessingTimeTriggers`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org