Hi,

This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
And this is the python script we are using to execute the job thousands of
times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107

The cluster we used for this has this configuration:

   - Instance type: t2.large
   - Number of workers: 2
   - HeapMemory: 5500
   - Number of task slots per node: 4
   - TaskMangMemFraction: 0.5
   - NumberOfNetworkBuffers: 2000

We have tried several things, increasing the heap, reducing the heap, more
memory fraction, changes this value in the taskmanager.sh
"TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.

Thanks for your help.

On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:

> On 2017-11-08 15:20, Piotr Nowojski wrote:
>
>> Hi Ebru and Javier,
>>
>> Yes, if you could share this example job it would be helpful.
>>
>> Ebru: could you explain in a little more details how does your Job(s)
>> look like? Could you post some code? If you are just using maps and
>> filters there shouldn’t be any network transfers involved, aside
>> from Source and Sink functions.
>>
>> Piotrek
>>
>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>>>
>>> Hi Javier,
>>>
>>> It would be helpful if you share your test job with us.
>>> Which configurations did you try?
>>>
>>> -Ebru
>>>
>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>>
>>> Hi,
>>>
>>> We have been facing a similar problem. We have tried some different
>>> configurations, as proposed in other email thread by Flavio and
>>> Kien, but it didn't work. We have a workaround similar to the one
>>> that Flavio has, we restart the taskmanagers once they reach a
>>> memory threshold. We created a small test to remove all of our
>>> dependencies and leave only flink native libraries. This test reads
>>> data from a Kafka topic and writes it back to another topic in
>>> Kafka. We cancel the job and start another every 5 seconds. After
>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>>> limit and dies.
>>>
>>> Currently, we have a test cluster with 8 workers and 8 task slots
>>> per node. We have one job that uses 56 slots, and we cannot execute
>>> that job 5 times in a row because the whole cluster dies. If you
>>> want, we can publish our test job.
>>>
>>> Regards,
>>>
>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>> @Nico & @Piotr Could you please have a look at this? You both
>>> recently worked on the network stack and might be most familiar with
>>> this.
>>>
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>> We also have the same problem in production. At the moment the
>>> solution is to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see
>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>
>>> know whether the error produced by the test and the leak are
>>> correlated..
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code?
>>> If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>> Hi Ufuk,
>>>
>>> We don’t explicitly define any state descriptor. We only use map
>>> and filters
>>> operator. We thought that gc handle clearing the flink’s internal
>>> states.
>>> So how can we manage the memory if it is always increasing?
>>>
>>> - Ebru
>>>
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>> running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>>
>>> What kind of stateful operations are you using? Depending on your
>>> use case,
>>> you have to manually call `clear()` on the state instance in order
>>> to
>>> release the managed state.
>>>
>>> Best,
>>>
>>> Ufuk
>>>
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>
>>> Begin forwarded message:
>>>
>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi <u...@apache.org>
>>>
>>> Hi Ufuk,
>>>
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>> memory
>>> usage is always increasing over time.
>>>
>>> <1.png><2.png><3.png>
>>>
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> Hey Ebru,
>>>
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>> this.
>>>
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>>
>>> – Ufuk
>>>
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>
>>> Hi,
>>>
>>> We are using Flink 1.3.1 in production, we have one job manager and
>>> 3 task
>>> managers in standalone mode. Recently, we've noticed that we have
>>> memory
>>> related problems. We use docker container to serve Flink cluster. We
>>> have
>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>>> job
>>> count
>>> may be change over time. Taskmanager memory usage always increases.
>>> After
>>> job cancelation this memory usage doesn't decrease. We've tried to
>>> investigate the problem and we've got the task manager jvm heap
>>> snapshot.
>>> According to the jam heap analysis, possible memory leak was Flink
>>> list
>>> state descriptor. But we are not sure that is the cause of our
>>> memory
>>> problem. How can we solve the problem?
>>>
>>> We have two types of Flink job. One has no state full operator
>>> contains only maps and filters and the other has time window with
>>> count trigger.
>>>
>>  * We've analysed the jvm heaps again in different conditions. First
>> we analysed the snapshot when no flink jobs running on cluster. (image
>> 1)
>> * Then, we analysed the jvm heap snapshot when the flink job that has
>> no state full operator is running. And according to the results, leak
>> suspect was NetworkBufferPool (image 2)
>> *   Last analys, there were both two types of jobs running and leak
>> suspect was again NetworkBufferPool. (image 3)
>> In our system jobs are regularly cancelled and resubmitted so we
>> noticed that when job is submitted some amount of memory allocated and
>> after cancelation this allocated memory never freed. So over time
>> memory usage is always increasing and exceeded the limits.
>>
>>
>>>>
>>
>>
>> Links:
>> ------
>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>
> Hi Piotr,
>
> There are two types of jobs.
> In first, we use Kafka source and Kafka sink, there isn't any window
> operator.
> In second job, we use Kafka source, filesystem sink and elastic search
> sink and window operator for buffering.
>

Reply via email to