wuchong commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442216042



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
##########
@@ -158,6 +158,21 @@ public void processElement(
                                // register event time timer
                                
ctx.timerService().registerEventTimeTimer(triggeringTs);
                        }
+                       registerCleanupTimer(ctx, triggeringTs);
+               }
+       }
+
+       private void registerCleanupTimer(
+                       KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+                       long timestamp) throws Exception {
+               // calculate safe timestamp to cleanup states
+               long cleanupTimestamp = timestamp + precedingOffset + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
cleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing

Review comment:
       This may cause some performance problem if we register a timer for each 
record, because each timer is an entry in this state. A better solution might 
be to use `AbstractStreamOperator` provides `InternalTimerService` which can 
register timer by namespace. We can separate the namespace between cleanup and 
data processing. 
   
   Besides, it would also be better if we can make the cleanup timestamp in a 
range instead of a point, e.g. if the current cleanup timer is in `(timestamp + 
precedingOffset, precedingOffset + precedingOffset * 1.5)` (similar to 
`CleanupState#registerProcessingCleanupTimer`) , then we don't need to register 
a new one. This can avoid to remove/register for each record and be friendly to 
statebackend. 
   
   This might be a big refactoring. Thus I'm fine to add TODO comment here and 
create a following issue to do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to