Hi, We have been facing a similar problem. We have tried some different configurations, as proposed in other email thread by Flavio and Kien, but it didn't work. We have a workaround similar to the one that Flavio has, we restart the taskmanagers once they reach a memory threshold. We created a small test to remove all of our dependencies and leave only flink native libraries. This test reads data from a Kafka topic and writes it back to another topic in Kafka. We cancel the job and start another every 5 seconds. After ~30 minutes of doing this process, the cluster reaches the OS memory limit and dies.
Currently, we have a test cluster with 8 workers and 8 task slots per node. We have one job that uses 56 slots, and we cannot execute that job 5 times in a row because the whole cluster dies. If you want, we can publish our test job. Regards, On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org> wrote: > @Nico & @Piotr Could you please have a look at this? You both recently > worked on the network stack and might be most familiar with this. > > On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > We also have the same problem in production. At the moment the solution is > to restart the entire Flink cluster after every job.. > We've tried to reproduce this problem with a test (see > https://issues.apache.org/jira/browse/FLINK-7845) but we don't know > whether the error produced by the test and the leak are correlated.. > > Best, > Flavio > > On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < > b20926...@cs.hacettepe.edu.tr> wrote: > >> On 2017-11-07 16:53, Ufuk Celebi wrote: >> >>> Do you use any windowing? If yes, could you please share that code? If >>> there is no stateful operation at all, it's strange where the list >>> state instances are coming from. >>> >>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> >>> wrote: >>> >>>> Hi Ufuk, >>>> >>>> We don’t explicitly define any state descriptor. We only use map and >>>> filters >>>> operator. We thought that gc handle clearing the flink’s internal >>>> states. >>>> So how can we manage the memory if it is always increasing? >>>> >>>> - Ebru >>>> >>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: >>>> >>>> Hey Ebru, the memory usage might be increasing as long as a job is >>>> running. >>>> This is expected (also in the case of multiple running jobs). The >>>> screenshots are not helpful in that regard. :-( >>>> >>>> What kind of stateful operations are you using? Depending on your use >>>> case, >>>> you have to manually call `clear()` on the state instance in order to >>>> release the managed state. >>>> >>>> Best, >>>> >>>> Ufuk >>>> >>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr> >>>> wrote: >>>> >>>>> >>>>> >>>>> >>>>> Begin forwarded message: >>>>> >>>>> From: ebru <b20926...@cs.hacettepe.edu.tr> >>>>> Subject: Re: Flink memory leak >>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>> To: Ufuk Celebi <u...@apache.org> >>>>> >>>>> Hi Ufuk, >>>>> >>>>> There are there snapshots of htop output. >>>>> 1. snapshot is initial state. >>>>> 2. snapshot is after submitted one job. >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory >>>>> usage is always increasing over time. >>>>> >>>>> >>>>> >>>>> >>>>> <1.png><2.png><3.png> >>>>> >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: >>>>> >>>>> Hey Ebru, >>>>> >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing >>>>> this. >>>>> >>>>> Since multiple jobs are running, it will be hard to understand to >>>>> which job the state descriptors from the heap snapshot belong to. >>>>> - Is it possible to isolate the problem and reproduce the behaviour >>>>> with only a single job? >>>>> >>>>> – Ufuk >>>>> >>>>> >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> >>>>> Hi, >>>>> >>>>> We are using Flink 1.3.1 in production, we have one job manager and 3 >>>>> task >>>>> managers in standalone mode. Recently, we've noticed that we have >>>>> memory >>>>> related problems. We use docker container to serve Flink cluster. We >>>>> have >>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job >>>>> count >>>>> may be change over time. Taskmanager memory usage always increases. >>>>> After >>>>> job cancelation this memory usage doesn't decrease. We've tried to >>>>> investigate the problem and we've got the task manager jvm heap >>>>> snapshot. >>>>> According to the jam heap analysis, possible memory leak was Flink list >>>>> state descriptor. But we are not sure that is the cause of our memory >>>>> problem. How can we solve the problem? >>>>> >>>>> >>>>> >>>>> We have two types of Flink job. One has no state full operator >>>> contains only maps and filters and the other has time window with count >>>> trigger. >>>> >>> * We've analysed the jvm heaps again in different conditions. First we >> analysed the snapshot when no flink jobs running on cluster. (image 1) >> * Then, we analysed the jvm heap snapshot when the flink job that has no >> state full operator is running. And according to the results, leak suspect >> was NetworkBufferPool (image 2) >> * Last analys, there were both two types of jobs running and leak >> suspect was again NetworkBufferPool. (image 3) >> In our system jobs are regularly cancelled and resubmitted so we noticed >> that when job is submitted some amount of memory allocated and after >> cancelation this allocated memory never freed. So over time memory usage is >> always increasing and exceeded the limits. >> >>> > > >