Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high
that it takes a bit of time until GC is triggered. Have you tried whether
the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM
needs to do a bit more bookkeeping in order to track the completed
checkpoints. If you are using the HeapStateBackend, then all states smaller
than state.backend.fs.memory-threshold will get inlined, meaning that they
are sent to the JM and stored in the checkpoint meta file. This can
increase the memory usage of the JM process. Depending on
state.checkpoints.num-retained this can grow as large as number retained
checkpoints times the checkpoint size. However, I doubt that this adds up
to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could
be helpful. Also a heap dump might be able to point us towards the
component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin <eleanore....@gmail.com> wrote:

> Hi all,
>
> I have a flink job running version 1.10.2, it simply read from a kafka
> topic with 96 partitions, and output to another kafka topic.
>
> It is running in k8s, with 1 JM (not in HA mode), 12 task managers each
> has 4 slots.
> The checkpoint persists the snapshot to azure blob storage, checkpoints
> interval every 3 seconds, with 10 seconds timeout and minimum pause of 1
> second.
>
> I observed that the job manager pod memory usage grows over time, any
> hints on why this is the case? And the memory usage for JM is significantly
> more compared to no checkpoint enabled.
> [image: image.png]
>
> Thanks a lot!
> Eleanore
>

Reply via email to