Unfortunately the issue I've opened [1] was not a problem of Flink but was
just caused by an ever increasing job plan.
So no help from that..Let's hope to find out the real source of the problem.
Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it
yet)

Best,
Flavio

[1] https://issues.apache.org/jira/browse/FLINK-7845

On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong <duckientru...@gmail.com>
wrote:

> Hi,
>
> We saw a similar issue in one of our job due to ByteBuffer memory leak[1].
>
> We fixed it using the solution in the article, setting -D
> jdk.nio.maxCachedBufferSize
>
> This variable is available for Java > 8u102
>
> Best regards,
>
> Kien
>
> [1]http://www.evanjones.ca/java-bytebuffer-leak.html
>
> On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
>
> We also faced the same problem, but the number of jobs we can run before
> restarting the cluster depends on the volume of the data to shuffle around
> the network. We even had problems with a single job and in order to avoid
> OOM issues we had to put some configuration to limit Netty memory usage,
> i.e.:
>  - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacit
> y.default=1
>  - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g
>
> At this purpose we wrote a small test to reproduce the problem and we
> opened an issue for that [1].
> We still don't know if the problems are related however..
>
> I hope that could be helpful,
> Flavio
>
> [1] https://issues.apache.org/jira/browse/FLINK-7845
>
> On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez <javier.lo...@zalando.de>
> wrote:
>
>> Hi Robert,
>>
>> Sorry to reply this late. We did a lot of tests, trying to identify if
>> the problem was in our custom sources/sinks. We figured out that none of
>> our custom components is causing this problem. We came up with a small
>> test, and realized that the Flink nodes run out of non-heap JVM memory and
>> crash after deployment of thousands of jobs.
>>
>> When rapidly deploying thousands or hundreds of thousands of Flink jobs -
>> depending on job complexity in terms of resource consumption - Flink nodes
>> non-heap JVM memory consumption grows until there is no more memory left on
>> the machine and the Flink process crashes. Both TaskManagers and JobManager
>> exhibit the same behavior. The TaskManagers die faster though. The memory
>> consumption doesn't decrease after stopping the deployment of new jobs,
>> with the cluster being idle (no running jobs).
>>
>> We could replicate the behavior by the rapid deployment of the WordCount
>> Job provided in the Quickstart with a Python script.  We started 24
>> instances of the deployment script to run in parallel.
>>
>> The non-heap JVM memory consumption grows faster with more complex jobs,
>> i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less
>> deployed jobs are needed until the TaskManagers/JobManager dies.
>>
>> We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with
>> 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and
>> 1 JobManager.
>>
>> ( * ) a slightly changed Python script was used, which waited after
>> deployment 15 seconds for the 10K events to be read from Kafka, then it
>> canceled the freshly deployed job via Flink REST API.
>>
>> If you want we can provide the Scripts and Jobs we used for this test. We
>> have a workaround for this, which restarts the Flink nodes once a memory
>> threshold is reached. But this has lowered the availability of our services.
>>
>> Thanks for your help.
>>
>> On 30 August 2017 at 10:39, Robert Metzger <rmetz...@apache.org> wrote:
>>
>>> I just saw that your other email is about the same issue.
>>>
>>> Since you've done a heapdump already, did you see any pattern in the
>>> allocated objects? Ideally none of the classes from your user code should
>>> stick around when no job is running.
>>> What's the size of the heap dump? I'm happy to take a look at it if it's
>>> reasonably small.
>>>
>>> On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Javier,
>>>>
>>>> I'm not aware of such issues with Flink, but if you could give us some
>>>> more details on your setup, I might get some more ideas on what to look 
>>>> for.
>>>>
>>>> are you using the RocksDBStateBackend? (RocksDB is doing some JNI
>>>> allocations, that could potentially leak memory)
>>>> Also, are you passing any special garbage collector options? (Maybe
>>>> some classes are not unloaded)
>>>> Are you using anything else that is special (such as protobuf or avro
>>>> formats, or any other big library)?
>>>>
>>>> Regards,
>>>> Robert
>>>>
>>>>
>>>>
>>>> On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez <javier.lo...@zalando.de>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> we are starting a lot of Flink jobs (streaming), and after we have
>>>>> started 200 or more jobs we see that the non-heap memory in the
>>>>> taskmanagers increases a lot, to the point of killing the instances. We
>>>>> found out that every time we start a new job, the committed non-heap 
>>>>> memory
>>>>> increases by 5 to 10MB. Is this an expected behavior? Are there ways to
>>>>> prevent this?
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Reply via email to