Hi, Thanks for sharing this job.
Do I need to feed some data to the Kafka to reproduce this issue with your script? Does this OOM issue also happen when you are not using the Kafka source/sink? Piotrek > On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> wrote: > > Hi, > > This is the test flink job we created to trigger this leak > https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 > <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6> > And this is the python script we are using to execute the job thousands of > times to get the OOM problem > https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 > <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107> > > The cluster we used for this has this configuration: > Instance type: t2.large > Number of workers: 2 > HeapMemory: 5500 > Number of task slots per node: 4 > TaskMangMemFraction: 0.5 > NumberOfNetworkBuffers: 2000 > We have tried several things, increasing the heap, reducing the heap, more > memory fraction, changes this value in the taskmanager.sh > "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work. > > Thanks for your help. > > On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > On 2017-11-08 15:20, Piotr Nowojski wrote: > Hi Ebru and Javier, > > Yes, if you could share this example job it would be helpful. > > Ebru: could you explain in a little more details how does your Job(s) > look like? Could you post some code? If you are just using maps and > filters there shouldn’t be any network transfers involved, aside > from Source and Sink functions. > > Piotrek > > On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr > <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > > Hi Javier, > > It would be helpful if you share your test job with us. > Which configurations did you try? > > -Ebru > > On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de > <mailto:javier.lo...@zalando.de>> > wrote: > > Hi, > > We have been facing a similar problem. We have tried some different > configurations, as proposed in other email thread by Flavio and > Kien, but it didn't work. We have a workaround similar to the one > that Flavio has, we restart the taskmanagers once they reach a > memory threshold. We created a small test to remove all of our > dependencies and leave only flink native libraries. This test reads > data from a Kafka topic and writes it back to another topic in > Kafka. We cancel the job and start another every 5 seconds. After > ~30 minutes of doing this process, the cluster reaches the OS memory > limit and dies. > > Currently, we have a test cluster with 8 workers and 8 task slots > per node. We have one job that uses 56 slots, and we cannot execute > that job 5 times in a row because the whole cluster dies. If you > want, we can publish our test job. > > Regards, > > On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> > wrote: > > @Nico & @Piotr Could you please have a look at this? You both > recently worked on the network stack and might be most familiar with > this. > > On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it > <mailto:pomperma...@okkam.it>> > wrote: > > We also have the same problem in production. At the moment the > solution is to restart the entire Flink cluster after every job.. > We've tried to reproduce this problem with a test (see > https://issues.apache.org/jira/browse/FLINK-7845 > <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't > > know whether the error produced by the test and the leak are > correlated.. > > Best, > Flavio > > On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > On 2017-11-07 16:53, Ufuk Celebi wrote: > Do you use any windowing? If yes, could you please share that code? > If > there is no stateful operation at all, it's strange where the list > state instances are coming from. > > On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr > <mailto:b20926...@cs.hacettepe.edu.tr>> > wrote: > Hi Ufuk, > > We don’t explicitly define any state descriptor. We only use map > and filters > operator. We thought that gc handle clearing the flink’s internal > states. > So how can we manage the memory if it is always increasing? > > - Ebru > > On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org > <mailto:u...@apache.org>> wrote: > > Hey Ebru, the memory usage might be increasing as long as a job is > running. > This is expected (also in the case of multiple running jobs). The > screenshots are not helpful in that regard. :-( > > What kind of stateful operations are you using? Depending on your > use case, > you have to manually call `clear()` on the state instance in order > to > release the managed state. > > Best, > > Ufuk > > On Tue, Nov 7, 2017 at 12:43 PM, ebru > <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > > Begin forwarded message: > > From: ebru <b20926...@cs.hacettepe.edu.tr > <mailto:b20926...@cs.hacettepe.edu.tr>> > Subject: Re: Flink memory leak > Date: 7 November 2017 at 14:09:17 GMT+3 > To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>> > > Hi Ufuk, > > There are there snapshots of htop output. > 1. snapshot is initial state. > 2. snapshot is after submitted one job. > 3. Snapshot is the output of the one job with 15000 EPS. And the > memory > usage is always increasing over time. > > <1.png><2.png><3.png> > > On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org > <mailto:u...@apache.org>> wrote: > > Hey Ebru, > > let me pull in Aljoscha (CC'd) who might have an idea what's causing > this. > > Since multiple jobs are running, it will be hard to understand to > which job the state descriptors from the heap snapshot belong to. > - Is it possible to isolate the problem and reproduce the behaviour > with only a single job? > > – Ufuk > > On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > > Hi, > > We are using Flink 1.3.1 in production, we have one job manager and > 3 task > managers in standalone mode. Recently, we've noticed that we have > memory > related problems. We use docker container to serve Flink cluster. We > have > 300 slots and 20 jobs are running with parallelism of 10. Also the > job > count > may be change over time. Taskmanager memory usage always increases. > After > job cancelation this memory usage doesn't decrease. We've tried to > investigate the problem and we've got the task manager jvm heap > snapshot. > According to the jam heap analysis, possible memory leak was Flink > list > state descriptor. But we are not sure that is the cause of our > memory > problem. How can we solve the problem? > > We have two types of Flink job. One has no state full operator > contains only maps and filters and the other has time window with > count trigger. > * We've analysed the jvm heaps again in different conditions. First > we analysed the snapshot when no flink jobs running on cluster. (image > 1) > * Then, we analysed the jvm heap snapshot when the flink job that has > no state full operator is running. And according to the results, leak > suspect was NetworkBufferPool (image 2) > * Last analys, there were both two types of jobs running and leak > suspect was again NetworkBufferPool. (image 3) > In our system jobs are regularly cancelled and resubmitted so we > noticed that when job is submitted some amount of memory allocated and > after cancelation this allocated memory never freed. So over time > memory usage is always increasing and exceeded the limits. > > > > > > Links: > ------ > [1] https://issues.apache.org/jira/browse/FLINK-7845 > <https://issues.apache.org/jira/browse/FLINK-7845> > Hi Piotr, > > There are two types of jobs. > In first, we use Kafka source and Kafka sink, there isn't any window operator. > In second job, we use Kafka source, filesystem sink and elastic search sink > and window operator for buffering. >