Hi, This is the test flink job we created to trigger this leak https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 And this is the python script we are using to execute the job thousands of times to get the OOM problem https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
The cluster we used for this has this configuration: - Instance type: t2.large - Number of workers: 2 - HeapMemory: 5500 - Number of task slots per node: 4 - TaskMangMemFraction: 0.5 - NumberOfNetworkBuffers: 2000 We have tried several things, increasing the heap, reducing the heap, more memory fraction, changes this value in the taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work. Thanks for your help. On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < b20926...@cs.hacettepe.edu.tr> wrote: > On 2017-11-08 15:20, Piotr Nowojski wrote: > >> Hi Ebru and Javier, >> >> Yes, if you could share this example job it would be helpful. >> >> Ebru: could you explain in a little more details how does your Job(s) >> look like? Could you post some code? If you are just using maps and >> filters there shouldn’t be any network transfers involved, aside >> from Source and Sink functions. >> >> Piotrek >> >> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote: >>> >>> Hi Javier, >>> >>> It would be helpful if you share your test job with us. >>> Which configurations did you try? >>> >>> -Ebru >>> >>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de> >>> wrote: >>> >>> Hi, >>> >>> We have been facing a similar problem. We have tried some different >>> configurations, as proposed in other email thread by Flavio and >>> Kien, but it didn't work. We have a workaround similar to the one >>> that Flavio has, we restart the taskmanagers once they reach a >>> memory threshold. We created a small test to remove all of our >>> dependencies and leave only flink native libraries. This test reads >>> data from a Kafka topic and writes it back to another topic in >>> Kafka. We cancel the job and start another every 5 seconds. After >>> ~30 minutes of doing this process, the cluster reaches the OS memory >>> limit and dies. >>> >>> Currently, we have a test cluster with 8 workers and 8 task slots >>> per node. We have one job that uses 56 slots, and we cannot execute >>> that job 5 times in a row because the whole cluster dies. If you >>> want, we can publish our test job. >>> >>> Regards, >>> >>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>> @Nico & @Piotr Could you please have a look at this? You both >>> recently worked on the network stack and might be most familiar with >>> this. >>> >>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>> We also have the same problem in production. At the moment the >>> solution is to restart the entire Flink cluster after every job.. >>> We've tried to reproduce this problem with a test (see >>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't >>> >>> know whether the error produced by the test and the leak are >>> correlated.. >>> >>> Best, >>> Flavio >>> >>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <b20926...@cs.hacettepe.edu.tr> wrote: >>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>> Do you use any windowing? If yes, could you please share that code? >>> If >>> there is no stateful operation at all, it's strange where the list >>> state instances are coming from. >>> >>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> >>> wrote: >>> Hi Ufuk, >>> >>> We don’t explicitly define any state descriptor. We only use map >>> and filters >>> operator. We thought that gc handle clearing the flink’s internal >>> states. >>> So how can we manage the memory if it is always increasing? >>> >>> - Ebru >>> >>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: >>> >>> Hey Ebru, the memory usage might be increasing as long as a job is >>> running. >>> This is expected (also in the case of multiple running jobs). The >>> screenshots are not helpful in that regard. :-( >>> >>> What kind of stateful operations are you using? Depending on your >>> use case, >>> you have to manually call `clear()` on the state instance in order >>> to >>> release the managed state. >>> >>> Best, >>> >>> Ufuk >>> >>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>> <b20926...@cs.hacettepe.edu.tr> wrote: >>> >>> Begin forwarded message: >>> >>> From: ebru <b20926...@cs.hacettepe.edu.tr> >>> Subject: Re: Flink memory leak >>> Date: 7 November 2017 at 14:09:17 GMT+3 >>> To: Ufuk Celebi <u...@apache.org> >>> >>> Hi Ufuk, >>> >>> There are there snapshots of htop output. >>> 1. snapshot is initial state. >>> 2. snapshot is after submitted one job. >>> 3. Snapshot is the output of the one job with 15000 EPS. And the >>> memory >>> usage is always increasing over time. >>> >>> <1.png><2.png><3.png> >>> >>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: >>> >>> Hey Ebru, >>> >>> let me pull in Aljoscha (CC'd) who might have an idea what's causing >>> this. >>> >>> Since multiple jobs are running, it will be hard to understand to >>> which job the state descriptors from the heap snapshot belong to. >>> - Is it possible to isolate the problem and reproduce the behaviour >>> with only a single job? >>> >>> – Ufuk >>> >>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <b20926...@cs.hacettepe.edu.tr> wrote: >>> >>> Hi, >>> >>> We are using Flink 1.3.1 in production, we have one job manager and >>> 3 task >>> managers in standalone mode. Recently, we've noticed that we have >>> memory >>> related problems. We use docker container to serve Flink cluster. We >>> have >>> 300 slots and 20 jobs are running with parallelism of 10. Also the >>> job >>> count >>> may be change over time. Taskmanager memory usage always increases. >>> After >>> job cancelation this memory usage doesn't decrease. We've tried to >>> investigate the problem and we've got the task manager jvm heap >>> snapshot. >>> According to the jam heap analysis, possible memory leak was Flink >>> list >>> state descriptor. But we are not sure that is the cause of our >>> memory >>> problem. How can we solve the problem? >>> >>> We have two types of Flink job. One has no state full operator >>> contains only maps and filters and the other has time window with >>> count trigger. >>> >> * We've analysed the jvm heaps again in different conditions. First >> we analysed the snapshot when no flink jobs running on cluster. (image >> 1) >> * Then, we analysed the jvm heap snapshot when the flink job that has >> no state full operator is running. And according to the results, leak >> suspect was NetworkBufferPool (image 2) >> * Last analys, there were both two types of jobs running and leak >> suspect was again NetworkBufferPool. (image 3) >> In our system jobs are regularly cancelled and resubmitted so we >> noticed that when job is submitted some amount of memory allocated and >> after cancelation this allocated memory never freed. So over time >> memory usage is always increasing and exceeded the limits. >> >> >>>> >> >> >> Links: >> ------ >> [1] https://issues.apache.org/jira/browse/FLINK-7845 >> > Hi Piotr, > > There are two types of jobs. > In first, we use Kafka source and Kafka sink, there isn't any window > operator. > In second job, we use Kafka source, filesystem sink and elastic search > sink and window operator for buffering. >