[ https://issues.apache.org/jira/browse/FLINK-33192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vidya Sagar Kalvakunta updated FLINK-33192: ------------------------------------------- Labels: easyfix (was: ) > State memory leak in the Window Operator due to unregistered cleanup timer > -------------------------------------------------------------------------- > > Key: FLINK-33192 > URL: https://issues.apache.org/jira/browse/FLINK-33192 > Project: Flink > Issue Type: Bug > Affects Versions: 1.14.6, 1.15.4, 1.16.2, 1.17.1 > Reporter: Vidya Sagar Kalvakunta > Priority: Major > Labels: easyfix > > I have encountered a state memory leak issue in the default window operator. > The cleanup timer for a window is not registered when it does not emit a > result if it’s fired immediately after creation. The window is already added > to the window state. > *Steps to Reproduce:* > # Write a custom trigger that triggers for every element. > # Write a custom aggregate function that never produces a result. > # Use a default tumbling event time window with this custom trigger and > aggregate function. > # Publish events spanning multiple time windows. > # The window state will contain all the windows even after their > expiry/cleanup time. > *Code with the bug:* > [https://github.com/apache/flink/blob/cd95b560d0c11a64b42bf6b98107314d32a4de86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L399-L417] > {code:java} > windowState.add(element.getValue()); > if (triggerResult.isFire()) { > ACC contents = windowState.get(); > if (contents == null) { > continue; > } > emitWindowContents(window, contents); > } > if (triggerResult.isPurge()) { > windowState.clear(); > } > registerCleanupTimer(window);{code} > > *Expected Result:* > The cleanup timer should be registered for every window that's added to the > window state regardless of it emitting a result after it’s fired. > *Actual Result:* > The cleanup timer is not registered for a window when it does not emit a > result after it’s fired, causing the window state that is already created to > live on indefinitely. > *Impact:* > This issue led to a huge state memory leak in our applications and was very > challenging to identify. > > *Fix:* > There are two ways to fix this issue. I'm willing to create a PR with the fix > if approved. > # Register the cleanup timer immediately after a window is added to the > state. > {code:java} > windowState.add(element.getValue()); > registerCleanupTimer(window);{code} > # Emit the results when the contents are not null and remove the continue > statement. > {code:java} > if (triggerResult.isFire()) { > ACC contents = windowState.get(); > if (contents != null) { > emitWindowContents(window, contents); > } > } {code} > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)