wuchong commented on a change in pull request #12680: URL: https://github.com/apache/flink/pull/12680#discussion_r442216042
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java ########## @@ -158,6 +158,21 @@ public void processElement( // register event time timer ctx.timerService().registerEventTimeTimer(triggeringTs); } + registerCleanupTimer(ctx, triggeringTs); + } + } + + private void registerCleanupTimer( + KeyedProcessFunction<K, RowData, RowData>.Context ctx, + long timestamp) throws Exception { + // calculate safe timestamp to cleanup states + long cleanupTimestamp = timestamp + precedingOffset + 1; + // update timestamp and register timer if needed + Long curCleanupTimestamp = cleanupTsState.value(); + if (curCleanupTimestamp == null || curCleanupTimestamp < cleanupTimestamp) { + // we don't delete existing timer since it may delete timer for data processing Review comment: This may cause some performance problem if we register a timer for each record, because each timer is an entry in this state. A better solution might be to use `AbstractStreamOperator` provides `InternalTimerService` which can register timer by namespace. We can separate the namespace between cleanup and data processing. Besides, it would also be better if we can make the cleanup timestamp in a range instead of a point, e.g. if the current cleanup timer is in `(timestamp + precedingOffset, precedingOffset + precedingOffset * 1.5)` (similar to `CleanupState#registerProcessingCleanupTimer`) , then we don't need to register a new one. This can avoid to remove/register for each record and be friendly to statebackend. This might be a big refactoring. Thus I'm fine to add TODO comment here and create a following issue to do that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org