Yes, I tested with just printing the stream. But it could take a lot of time to fail.
On Wednesday, 8 November 2017, Piotr Nowojski <pi...@data-artisans.com> wrote: > Thanks for quick answer. > So it will also fail after some time with `fromElements` source instead of Kafka, right? > Did you try it also without a Kafka producer? > Piotrek > > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de> wrote: > Hi, > You don't need data. With data it will die faster. I tested as well with a small data set, using the fromElements source, but it will take some time to die. It's better with some data. > On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com> wrote: >> >> Hi, >> Thanks for sharing this job. >> Do I need to feed some data to the Kafka to reproduce this issue with your script? >> Does this OOM issue also happen when you are not using the Kafka source/sink? >> Piotrek >> >> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> wrote: >> Hi, >> This is the test flink job we created to trigger this leak https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >> And this is the python script we are using to execute the job thousands of times to get the OOM problem https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >> The cluster we used for this has this configuration: >> >> Instance type: t2.large >> Number of workers: 2 >> HeapMemory: 5500 >> Number of task slots per node: 4 >> TaskMangMemFraction: 0.5 >> NumberOfNetworkBuffers: 2000 >> >> We have tried several things, increasing the heap, reducing the heap, more memory fraction, changes this value in the taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work. >> Thanks for your help. >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < b20926...@cs.hacettepe.edu.tr> wrote: >>> >>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>> >>>> Hi Ebru and Javier, >>>> >>>> Yes, if you could share this example job it would be helpful. >>>> >>>> Ebru: could you explain in a little more details how does your Job(s) >>>> look like? Could you post some code? If you are just using maps and >>>> filters there shouldn’t be any network transfers involved, aside >>>> from Source and Sink functions. >>>> >>>> Piotrek >>>> >>>>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> >>>>> Hi Javier, >>>>> >>>>> It would be helpful if you share your test job with us. >>>>> Which configurations did you try? >>>>> >>>>> -Ebru >>>>> >>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> We have been facing a similar problem. We have tried some different >>>>> configurations, as proposed in other email thread by Flavio and >>>>> Kien, but it didn't work. We have a workaround similar to the one >>>>> that Flavio has, we restart the taskmanagers once they reach a >>>>> memory threshold. We created a small test to remove all of our >>>>> dependencies and leave only flink native libraries. This test reads >>>>> data from a Kafka topic and writes it back to another topic in >>>>> Kafka. We cancel the job and start another every 5 seconds. After >>>>> ~30 minutes of doing this process, the cluster reaches the OS memory >>>>> limit and dies. >>>>> >>>>> Currently, we have a test cluster with 8 workers and 8 task slots >>>>> per node. We have one job that uses 56 slots, and we cannot execute >>>>> that job 5 times in a row because the whole cluster dies. If you >>>>> want, we can publish our test job. >>>>> >>>>> Regards, >>>>> >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>> @Nico & @Piotr Could you please have a look at this? You both >>>>> recently worked on the network stack and might be most familiar with >>>>> this. >>>>> >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> >>>>> wrote: >>>>> >>>>> We also have the same problem in production. At the moment the >>>>> solution is to restart the entire Flink cluster after every job.. >>>>> We've tried to reproduce this problem with a test (see >>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't >>>>> know whether the error produced by the test and the leak are >>>>> correlated.. >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>> Do you use any windowing? If yes, could you please share that code? >>>>> If >>>>> there is no stateful operation at all, it's strange where the list >>>>> state instances are coming from. >>>>> >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> >>>>> wrote: >>>>> Hi Ufuk, >>>>> >>>>> We don’t explicitly define any state descriptor. We only use map >>>>> and filters >>>>> operator. We thought that gc handle clearing the flink’s internal >>>>> states. >>>>> So how can we manage the memory if it is always increasing? >>>>> >>>>> - Ebru >>>>> >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: >>>>> >>>>> Hey Ebru, the memory usage might be increasing as long as a job is >>>>> running. >>>>> This is expected (also in the case of multiple running jobs). The >>>>> screenshots are not helpful in that regard. :-( >>>>> >>>>> What kind of stateful operations are you using? Depending on your >>>>> use case, >>>>> you have to manually call `clear()` on the state instance in order >>>>> to >>>>> release the managed state. >>>>> >>>>> Best, >>>>> >>>>> Ufuk >>>>> >>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> >>>>> Begin forwarded message: >>>>> >>>>> From: ebru <b20926...@cs.hacettepe.edu.tr> >>>>> Subject: Re: Flink memory leak >>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>> To: Ufuk Celebi <u...@apache.org> >>>>> >>>>> Hi Ufuk, >>>>> >>>>> There are there snapshots of htop output. >>>>> 1. snapshot is initial state. >>>>> 2. snapshot is after submitted one job. >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the >>>>> memory >>>>> usage is always increasing over time. >>>>> >>>>> <1.png><2.png><3.png> >>>>> >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: >>>>> >>>>> Hey Ebru, >>>>> >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing >>>>> this. >>>>> >>>>> Since multiple jobs are running, it will be hard to understand to >>>>> which job the state descriptors from the heap snapshot belong to. >>>>> - Is it possible to isolate the problem and reproduce the behaviour >>>>> with only a single job? >>>>> >>>>> – Ufuk >>>>> >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> >>>>> Hi, >>>>> >>>>> We are using Flink 1.3.1 in production, we have one job manager and >>>>> 3 task >>>>> managers in standalone mode. Recently, we've noticed that we have >>>>> memory >>>>> related problems. We use docker container to serve Flink cluster. We >>>>> have >>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the >>>>> job >>>>> count >>>>> may be change over time. Taskmanager memory usage always increases. >>>>> After >>>>> job cancelation this memory usage doesn't decrease. We've tried to >>>>> investigate the problem and we've got the task manager jvm heap >>>>> snapshot. >>>>> According to the jam heap analysis, possible memory leak was Flink >>>>> list >>>>> state descriptor. But we are not sure that is the cause of our >>>>> memory >>>>> problem. How can we solve the problem? >>>>> >>>>> We have two types of Flink job. One has no state full operator >>>>> contains only maps and filters and the other has time window with >>>>> count trigger. >>>> >>>> * We've analysed the jvm heaps again in different conditions. First >>>> we analysed the snapshot when no flink jobs running on cluster. (image >>>> 1) >>>> * Then, we analysed the jvm heap snapshot when the flink job that has >>>> no state full operator is running. And according to the results, leak >>>> suspect was NetworkBufferPool (image 2) >>>> * Last analys, there were both two types of jobs running and leak >>>> suspect was again NetworkBufferPool. (image 3) >>>> In our system jobs are regularly cancelled and resubmitted so we >>>> noticed that when job is submitted some amount of memory allocated and >>>> after cancelation this allocated memory never freed. So over time >>>> memory usage is always increasing and exceeded the limits. >>>> >>>>>> >>>> >>>> >>>> >>>> Links: >>>> ------ >>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>> >>> Hi Piotr, >>> >>> There are two types of jobs. >>> In first, we use Kafka source and Kafka sink, there isn't any window operator. >>> In second job, we use Kafka source, filesystem sink and elastic search sink and window operator for buffering. >> >> > > >