Hi Ramya,

Increasing the memory of your pod will not give you more JVM heap space.
You will need to configure Flink so it launches the JVM process with more
memory.

In Flink 1.7, this could be achieved by configuring 'jobmanager.heap.size'
& 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by
default 1024m.

Please also note that, you should not configure these two options two as
large as your Kubernetes pod. Because Flink may also have some off-heap
memory overhead, so the total memory consumed by the Flink processes might
be larger than configured. This may cause your pods getting killed by
Kubernetes due to memory exceeding.

According to our experience, leaving around 20~25% of your pod memory for
such overhead might be a good practice. In your case, that means
configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
workload, you may need to further increase the off-heap memory size.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <hair...@gmail.com> wrote:

> Thanks Till.
> Actually, i have around 5GB pods for each TM, and each pod with only one
> slot.
> But the metrics i have pulled is as below, which is slightly confusing.
> It says only ~50MB of Heap is committed for the tasks. Would you be able
> to point me to the right configuration to be set.
>
> Thanks
> ~Ramya.
>
> [image: image.png]
>
> On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Ramya,
>>
>> it looks as if you should give your Flink pods and also the Flink process
>> a
>> bit more memory as the process fails with an out of memory error. You
>> could
>> also try Flink's latest version which comes with native Kubernetes
>> support.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <hair...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > My flink jobs are constantly going down beyond an hour with the below
>> > exception.
>> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
>> >
>> > AsynchronousException{java.lang.Exception: Could not materialize
>> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
>> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
>> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
>> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
>> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> >         at
>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >         at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >         at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >         at java.lang.Thread.run(Thread.java:748)
>> > Caused by: java.lang.Exception: Could not materialize checkpoint 21
>> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
>> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
>> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
>> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
>> > -> time attribute: (ts) (5/6).
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> >         ... 6 more
>> > Caused by: java.util.concurrent.ExecutionException:
>> > java.lang.OutOfMemoryError: Java heap space
>> >         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> >         at
>> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>> >         at
>> >
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> >         ... 5 more
>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
>> >         ... 4 more
>> >
>> >
>> >
>> > Any help here in understanding this would be highly appreciated.
>> >
>> >
>> > Thanks.
>> >
>>
>

Reply via email to