[ 
https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

pc wang updated FLINK-25728:
----------------------------
    Description: 
We have an application that contains a broadcast process stage. The 
none-broadcast input has roughly 10 million messages per second, and the 
broadcast side is some kind of control stream, rarely has message follow 
through. 

After several hours of running, the TaskManager will run out of heap memory and 
restart. We reviewed the application code without finding any relevant issues.

We found that the running to crash time was roughly the same. Then we make a 
heap dump before the crash and found mass `CompletableFuture$UniRun` instances. 

These `CompletableFuture$UniRun` instances consume several GibaByte memory.

 

The following pic is from the heap dump we get from a mock testing stream with 
the same scenario.

!image-2022-01-20-18-43-32-816.png|width=1161,height=471!

 

After some source code research. We found that it might be caused by the 
*StreamMultipleInputProcessor.getAvailableFuture()*.

*StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
*availableFuture* got completed when any of it's input's *availableFuture* is 
complete. 
The current implementation create a new *CompletableFuture* and a new 
*CompletableFuture$UniRun* append to delegate inputProcessor's *avaiableFuture*.
The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
inputProcessor's *avaiableFuture*. 
See the source code below.
[StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]

Because the *UniRun* holds the reference of outside 
*StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
*CompletableFuture* instance which can not be recycled.

We made some modifications to the 
*StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
the issue is gone on our modified version. 

We are willing to make a PR for this fix.

 Heap Dump File [^flink-completablefuture-issue.tar.xz] 
 

  was:
We have an application that contains a broadcast process stage. The 
none-broadcast input has roughly 10 million messages per second, and the 
broadcast side is some kind of control stream, rarely has message follow 
through. 

After several hours of running, the TaskManager will run out of heap memory and 
restart. We reviewed the application code without finding any relevant issues.

We found that the running to crash time was roughly the same. Then we make a 
heap dump before the crash and found mass `CompletableFuture$UniRun` instances. 

These `CompletableFuture$UniRun` instances consume several GibaByte memory.

 

The following pic is from the heap dump we get from a mock testing stream with 
the same scenario.

!image-2022-01-20-18-43-32-816.png|width=1161,height=471!

 

After some source code research. We found that it might be caused by the 
*StreamMultipleInputProcessor.getAvailableFuture()*.

*StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
*availableFuture* got completed when any of it's input's *availableFuture* is 
complete. 
The current implementation create a new *CompletableFuture* and a new 
*CompletableFuture$UniRun* append to delegate inputProcessor's *avaiableFuture*.
The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
inputProcessor's *avaiableFuture*. 
See the source code below.
[StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]

Because the *UniRun* holds the reference of outside 
*StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
*CompletableFuture* instance which can not be recycled.

We made some modifications to the 
*StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
the issue is gone on our modified version. 

We are willing to make a PR if the community agrees.
 


> StreamMultipleInputProcessor holds mass CompletableFuture instances under 
> certain scenario
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25728
>                 URL: https://issues.apache.org/jira/browse/FLINK-25728
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.5, 1.13.5, 1.14.2
>            Reporter: pc wang
>            Priority: Major
>         Attachments: flink-completablefuture-issue.tar.xz, 
> image-2022-01-20-18-43-32-816.png
>
>
> We have an application that contains a broadcast process stage. The 
> none-broadcast input has roughly 10 million messages per second, and the 
> broadcast side is some kind of control stream, rarely has message follow 
> through. 
> After several hours of running, the TaskManager will run out of heap memory 
> and restart. We reviewed the application code without finding any relevant 
> issues.
> We found that the running to crash time was roughly the same. Then we make a 
> heap dump before the crash and found mass `CompletableFuture$UniRun` 
> instances. 
> These `CompletableFuture$UniRun` instances consume several GibaByte memory.
>  
> The following pic is from the heap dump we get from a mock testing stream 
> with the same scenario.
> !image-2022-01-20-18-43-32-816.png|width=1161,height=471!
>  
> After some source code research. We found that it might be caused by the 
> *StreamMultipleInputProcessor.getAvailableFuture()*.
> *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's 
> *availableFuture* got completed when any of it's input's *availableFuture* is 
> complete. 
> The current implementation create a new *CompletableFuture* and a new 
> *CompletableFuture$UniRun* append to delegate inputProcessor's 
> *avaiableFuture*.
> The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow 
> inputProcessor's *avaiableFuture*. 
> See the source code below.
> [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65]
> Because the *UniRun* holds the reference of outside 
> *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass 
> *CompletableFuture* instance which can not be recycled.
> We made some modifications to the 
> *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that 
> the issue is gone on our modified version. 
> We are willing to make a PR for this fix.
>  Heap Dump File [^flink-completablefuture-issue.tar.xz] 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to