[ https://issues.apache.org/jira/browse/FLINK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481909#comment-17481909 ]
Piotr Nowojski commented on FLINK-25728: ---------------------------------------- Thanks for reporting and analysing the issue [~wpc009]. Could you maybe rephrase why {code:java} inputProcessors[i] .getAvailableFuture() .thenRun(() -> anyInputAvailable.complete(null)) {code} is causing this memory leak? Or maybe let me rephrase it. Let's assume we have two inputs, one is flipping between available/unavailable status, the other is continuously unavailable. Now per each {{StreamMultipleInputProcessor#getAvailableFuture}} call, we will create one {{CompletableFuture<?> anyInputAvailable}}, referenced by TWO instances {{CompletableFuture$UniRun}}, for {{inputProcessors[i].getAvailableFuture()}} from each of the inputs. {{anyInputAvailable}} will be returned, and it will be completed by first input sooner or later, waking up {{SteamTask}} in the process. That's fine. Now the problem is that each of those {{anyInputAvailable}} instance will be in this scenario referenced forever by the second input's {{inputProcessors[i].getAvailableFuture()}}. As long as the second input is not available, we are keep building up the memory leak. Did I understand the problem correctly? > Potential memory leaks in StreamMultipleInputProcessor > ------------------------------------------------------ > > Key: FLINK-25728 > URL: https://issues.apache.org/jira/browse/FLINK-25728 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.12.5, 1.15.0, 1.13.5, 1.14.2 > Reporter: pc wang > Priority: Critical > Labels: pull-request-available > Attachments: flink-completablefuture-issue.tar.xz, > image-2022-01-20-18-43-32-816.png > > > We have an application that contains a broadcast process stage. The > none-broadcast input has roughly 10 million messages per second, and the > broadcast side is some kind of control stream, rarely has message follow > through. > After several hours of running, the TaskManager will run out of heap memory > and restart. We reviewed the application code without finding any relevant > issues. > We found that the running to crash time was roughly the same. Then we make a > heap dump before the crash and found mass `CompletableFuture$UniRun` > instances. > These `CompletableFuture$UniRun` instances consume several gigabytes memories. > > The following pic is from the heap dump we get from a mock testing stream > with the same scenario. > !image-2022-01-20-18-43-32-816.png|width=1161,height=471! > > After some source code research. We found that it might be caused by the > *StreamMultipleInputProcessor.getAvailableFuture()*. > *StreamMultipleInputProcessor* has multiple *inputProcessors* , it's > *availableFuture* got completed when any of it's input's *availableFuture* is > complete. > The current implementation create a new *CompletableFuture* and a new > *CompletableFuture$UniRun* append to delegate inputProcessor's > *avaiableFuture*. > The issue is caused by the stacking of *CompletableFuture$UniRun* on the slow > inputProcessor's *avaiableFuture*. > See the source code below. > [StreamMultipleInputProcessor.java#L65|https://github.com/wpc009/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java#L65] > Because the *UniRun* holds the reference of outside > *StreamMultipleInputProcessor*'s avaiableFuture, that cause mass > *CompletableFuture* instance which can not be recycled. > We made some modifications to the > *StreamMultipleInputProcessor*.*getAvaiableFuture* function, and verify that > the issue is gone on our modified version. > We are willing to make a PR for this fix. > Heap Dump File [^flink-completablefuture-issue.tar.xz] > PS: This is a YourKit heap dump. may be not compatible HPROF files. > [Sample Code to reproduce the > issue|https://github.com/wpc009/flink/blob/FLINK-25728/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/MultipleInputStreamMemoryIssueTest.java] > -- This message was sent by Atlassian Jira (v8.20.1#820001)