Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and Kien, but
it didn't work. We have a workaround similar to the one that Flavio has, we
restart the taskmanagers once they reach a memory threshold. We created a
small test to remove all of our dependencies and leave only flink native
libraries. This test reads data from a Kafka topic and writes it back to
another topic in Kafka. We cancel the job and start another every 5
seconds. After ~30 minutes of doing this process, the cluster reaches the
OS memory limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots per node.
We have one job that uses 56 slots, and we cannot execute that job 5 times
in a row because the whole cluster dies. If you want, we can publish our
test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org> wrote:

> @Nico & @Piotr Could you please have a look at this? You both recently
> worked on the network stack and might be most familiar with this.
>
> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> wrote:
>
> We also have the same problem in production. At the moment the solution is
> to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845) but we don't know
> whether the error produced by the test and the leak are correlated..
>
> Best,
> Flavio
>
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926...@cs.hacettepe.edu.tr> wrote:
>
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>>
>>>> Hi Ufuk,
>>>>
>>>> We don’t explicitly define any state descriptor. We only use map and
>>>> filters
>>>> operator. We thought that gc handle clearing the flink’s internal
>>>> states.
>>>> So how can we manage the memory if it is always increasing?
>>>>
>>>> - Ebru
>>>>
>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>> running.
>>>> This is expected (also in the case of multiple running jobs). The
>>>> screenshots are not helpful in that regard. :-(
>>>>
>>>> What kind of stateful operations are you using? Depending on your use
>>>> case,
>>>> you have to manually call `clear()` on the state instance in order to
>>>> release the managed state.
>>>>
>>>> Best,
>>>>
>>>> Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> Begin forwarded message:
>>>>>
>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>>> Subject: Re: Flink memory leak
>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>> To: Ufuk Celebi <u...@apache.org>
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> There are there snapshots of htop output.
>>>>> 1. snapshot is initial state.
>>>>> 2. snapshot is after submitted one job.
>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>>>> usage is always increasing over time.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> <1.png><2.png><3.png>
>>>>>
>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>>
>>>>> Hey Ebru,
>>>>>
>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>>> this.
>>>>>
>>>>> Since multiple jobs are running, it will be hard to understand to
>>>>> which job the state descriptors from the heap snapshot belong to.
>>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>>> with only a single job?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>>
>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Flink 1.3.1 in production, we have one job manager and 3
>>>>> task
>>>>> managers in standalone mode. Recently, we've noticed that we have
>>>>> memory
>>>>> related problems. We use docker container to serve Flink cluster. We
>>>>> have
>>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>>>>> count
>>>>> may be change over time. Taskmanager memory usage always increases.
>>>>> After
>>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>>> investigate the problem and we've got the task manager jvm heap
>>>>> snapshot.
>>>>> According to the jam heap analysis, possible memory leak was Flink list
>>>>> state descriptor. But we are not sure that is the cause of our memory
>>>>> problem. How can we solve the problem?
>>>>>
>>>>>
>>>>>
>>>>> We have two types of Flink job. One has no state full operator
>>>> contains only maps and filters and the other has time window with count
>>>> trigger.
>>>>
>>> * We've analysed the jvm heaps again in different conditions. First we
>> analysed the snapshot when no flink jobs running on cluster. (image 1)
>> * Then, we analysed the jvm heap snapshot when the flink job that has no
>> state full operator is running. And according to the results, leak suspect
>> was NetworkBufferPool (image 2)
>> *   Last analys, there were both two types of jobs running and leak
>> suspect was again NetworkBufferPool. (image 3)
>> In our system jobs are regularly cancelled and resubmitted so we noticed
>> that when job is submitted some amount of memory allocated and after
>> cancelation this allocated memory never freed. So over time memory usage is
>> always increasing and exceeded the limits.
>>
>>>
>
>
>

Reply via email to