[ https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479328#comment-17479328 ]
pc wang edited comment on FLINK-25728 at 1/21/22, 3:39 AM: ----------------------------------------------------------- The issue has the same symptoms as FLINK-24300, beside the momery issue, the mass *CompletableFuture$UniRun* will also cause long period of CPU busy after long idle. was (Author: wpc009): The issue has the same symptoms as FLINK-24300, beside the momery issue, the mass *CompletableFuture$UniRun* will also cause CPU busy after long idle. > Protential memory leeks in StreamMultipleInputProcessor > ------------------------------------------------------- > > Key: FLINK-25728 > URL: https://issues.apache.org/jira/browse/FLINK-25728 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2 > Reporter: pc wang > Priority: Blocker > Attachments: flink-completablefuture-issue.tar.xz, > image-2022-01-20-18-43-32-816.png > > > We have an application that contains a broadcast process stage. The > none-broadcast input has roughly 10 million messages per second, and the > broadcast side is some kind of control stream, rarely has message follow > through. > After several hours of running, the TaskManager will run out of heap memory > and restart. We reviewed the application code without finding any relevant > issues. > We found that the running to crash time was roughly the same. Then we make a > heap dump before the crash and found mass `CompletableFuture$UniRun` > instances. > These `CompletableFuture$UniRun` instances consume several GibaByte memory. > > The following pic is from the heap dump we get from a mock testing stream > with the same scenario. > !image-2022-01-20-18-43-32-816.png|width=1161,height=471! > > After some source code research. We found that it might be caused by the > *StreamMultipleInputProcessor.getAvailableFuture()*. > *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's > *availableFuture* got completed when any of it's input's *availableFuture* is > complete. > The current implementation create a new *CompletableFuture* and a new > *CompletableFuture$UniRun* append to delegate inputProcessor's > *avaiableFuture*. > The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow > inputProcessor's *avaiableFuture*. > See the source code below. > [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65] > Because the *UniRun* holds the reference of outside > *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass > *CompletableFuture* instance which can not be recycled. > We made some modifications to the > *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that > the issue is gone on our modified version. > We are willing to make a PR for this fix. > Heap Dump File [^flink-completablefuture-issue.tar.xz] > PS: This is a YourKit heap dump. may be not compatible HPROF files. > -- This message was sent by Atlassian Jira (v8.20.1#820001)