[ 
https://issues.apache.org/jira/browse/FLINK-33192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vidya Sagar Kalvakunta updated FLINK-33192:
-------------------------------------------
    Labels: easyfix  (was: )

> State memory leak in the Window Operator due to unregistered cleanup timer
> --------------------------------------------------------------------------
>
>                 Key: FLINK-33192
>                 URL: https://issues.apache.org/jira/browse/FLINK-33192
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.6, 1.15.4, 1.16.2, 1.17.1
>            Reporter: Vidya Sagar Kalvakunta
>            Priority: Major
>              Labels: easyfix
>
> I have encountered a state memory leak issue in the default window operator. 
> The cleanup timer for a window is not registered when it does not emit a 
> result if it’s fired immediately after creation. The window is already added 
> to the window state.
> *Steps to Reproduce:*
>  # Write a custom trigger that triggers for every element.
>  # Write a custom aggregate function that never produces a result.
>  # Use a default tumbling event time window with this custom trigger and 
> aggregate function.
>  # Publish events spanning multiple time windows.
>  # The window state will contain all the windows even after their 
> expiry/cleanup time.
> *Code with the bug:*
> [https://github.com/apache/flink/blob/cd95b560d0c11a64b42bf6b98107314d32a4de86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L399-L417]
> {code:java}
> windowState.add(element.getValue());
> if (triggerResult.isFire()) {
>     ACC contents = windowState.get();
>     if (contents == null) {
>         continue;
>     }
>     emitWindowContents(window, contents);
> }
> if (triggerResult.isPurge()) {
>     windowState.clear();
> }
> registerCleanupTimer(window);{code}
>  
> *Expected Result:*
> The cleanup timer should be registered for every window that's added to the 
> window state regardless of it emitting a result after it’s fired.
> *Actual Result:*
> The cleanup timer is not registered for a window when it does not emit a 
> result after it’s fired, causing the window state that is already created to 
> live on indefinitely.
> *Impact:*
> This issue led to a huge state memory leak in our applications and was very 
> challenging to identify.
>  
> *Fix:*
> There are two ways to fix this issue. I'm willing to create a PR with the fix 
> if approved.
>  # Register the cleanup timer immediately after a window is added to the 
> state.
> {code:java}
> windowState.add(element.getValue());
> registerCleanupTimer(window);{code}
>  # Emit the results when the contents are not null and remove the continue 
> statement.
> {code:java}
> if (triggerResult.isFire()) {
>     ACC contents = windowState.get();
>     if (contents != null) {         
>         emitWindowContents(window, contents);
>     }
> } {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to