[ https://issues.apache.org/jira/browse/FLINK-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136362#comment-17136362 ]
Hyeonseop Lee commented on FLINK-18119: --------------------------------------- [~libenchao] {{RowTimeRange*Unbounded*PrecedingFunction}} is not the case. {{RowTimeRange*Bounded*PrecedingFunction}} in blink runtime 1.11 still has the issue. It performs cleanup using processing timer when proper {{minRetentionTime}} and {{maxRetentionTime}} are configured, but what I want to improve is to retract records that is no longer required even the state retention is not set (indefinite). In my case, I first tried to set non-zero {{minRetentionTime}} to enable cleanup by retention, but that was applied to whole query and ended up with the retract stream instead of append stream. I understand setting state retention can be a walkaround to prevent OOM but I think functions must keep state as efficiently as possible. > Fix unlimitedly growing state for time range bounded over aggregate > ------------------------------------------------------------------- > > Key: FLINK-18119 > URL: https://issues.apache.org/jira/browse/FLINK-18119 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Affects Versions: 1.10.1 > Reporter: Hyeonseop Lee > Priority: Major > > For time range bounded over aggregation in streaming query, like below, > {code:java} > table > .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w) > .groupBy('w) > .select('a, aggregateFunction('b)) > {code} > the operator must hold incoming records over the preceding time range in the > state, but older records are no longer required and can be cleaned up. > Current implementation cleans the old records up only when newer records come > in and so the operator knows that enough time has passed. However, the clean > up never happens unless a new record with the same key comes in and this > causes a state that perhaps will never be cleaned up, which leads to an > unlimitedly growing state especially when the keyspace mutates over time. > Since aggregate over bounded preceding time interval doesn't require old > records by its nature, we can improve this by adding a timer that notifies > the operator to clean up old records, resulting in no changes in query result > or severe performance degrade. > This is a distinct feature from state retention: state retention is to forget > some states that are expected to be less important to reduce state memory, so > it possibly changes query results. Enabling and disabling state retention > both make sense with this change. > This issue applies to both row time range bound and proc time range bound. > That is, we are going to have changes in both > RowTimeRangeBoundedPrecedingFunction and > ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already > have a running-in-production version with this change and would be glad to > contribute. -- This message was sent by Atlassian Jira (v8.3.4#803005)