For further reference, I've created this issue [1] to track the problem.

[1] https://issues.apache.org/jira/browse/FLINK-17073

Cheers,
Till

On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <myas...@live.com> wrote:

> Hi Marc
>
> The left 'chk-X' folders, which should be discarded when removing
> checkpoint at the final stage, could also prove that those not discarded
> completed checkpoint meta occupied the memory.
>
> If we treat your average checkpoint meta size as 30KB, 20000 not-discarded
> complete checkpoints would occupy about 585MB memory, which is close to
> your observed scenario.
>
> From my point of view, the checkpoint interval of one second is really too
> often and would not make much sense in production environment.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* Thursday, April 9, 2020 17:41
> *To:* Marc LEGER <maleger...@gmail.com>
> *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)?
>
> Thanks for reporting this issue Marc. From what you've reported, I think
> Yun is right and that the large memory footprint is caused by
> CompletedCheckpoints which cannot be removed fast enough. One way to verify
> this is to enable TRACE logging because then Flink will log for every
> CompletedCheckpoint when it gets discarded. The line should look like this
> "Executing discard procedure for Checkpoint". The high number of chk-X
> folders on S3 could be the result of the slow discard operations.
>
> If you want then we can also take a look at the logs and ideally also the
> heap dump if you can share them with us.
>
> I think one difference between Flink 1.10.0 and 1.7.2 is that we are using
> a fixed thread pool for running the io operations in 1.10.0. The number of
> threads equals the number of cores. In contrast, in Flink 1.7.2 we used a
> fork join pool with a max parallelism of 64. This difference could explain
> the lower throughput of discard operations because fewer can happen in
> parallel.
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <maleger...@gmail.com> wrote:
>
> Hello Yun,
>
> Thank you for your feedback, please find below my answers to your
> questions:
>
> 1. I am using incremental state checkpointing with RocksDB backend and AWS
> S3 as a distributed file system, everything is configured in
> flink-conf.yaml as follows:
>
> state.backend: rocksdb
> state.backend.incremental: true
> # placeholders are replaced at deploy time
> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
> state.backend.rocksdb.localdir: /home/data/flink/rocksdb
>
> Size of _metdata file in a checkpoint folder for the 3 running jobs:
> - job1: 64KB
> - job2: 1K
> - job3: 10K
>
> By the way, I have between 10000 and 20000 "chk-X" folders per job in S3.
>
> 2. Checkpointing is configured to be triggered every second for all the
> jobs. Only the interval is set, otherwise everything is kept as default:
>
> executionEnvironment.enableCheckpointing(1000);
>
> Best Regards,
> Marc
>
> Le mer. 8 avr. 2020 à 20:48, Yun Tang <myas...@live.com> a écrit :
>
> Hi Marc
>
> I think the occupied memory is due to the to-remove complete checkpoints
> which are stored in the workQueue of io-executor [1] in
> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
> LinkedBlockingQueue to store runnables.
>
> To figure out the root cause, would you please check the information below:
>
>    1. How large of your checkpoint meta, you could view
>    {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>    state backend you use to help know this.
>    2. What is the interval of your checkpoints, a smaller checkpoint
>    interval might accumulate many completed checkpoints to subsume once a
>    newer checkpoint completes.
>
>
> [1]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
> [2]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Marc LEGER <maleger...@gmail.com>
> *Sent:* Wednesday, April 8, 2020 16:50
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)?
>
> Hello,
>
> I am currently testing Flink 1.10.0 but I am facing memory issues with
> JobManagers deployed in a standalone cluster configured in HA mode with 3
> TaskManagers (and 3 running jobs).
> I do not reproduce the same issues using Flink 1.7.2.
>
> Basically, whatever the value of "jobmanager.heap.size" property is (I
> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process
> is eventually consuming all available memory and is hanging after a few
> hours or days (depending on the size of the heap) before being deassociated
> from the cluster.
>
> I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
> openjdk version "11.0.6" 2020-01-14
> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
> Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux
> amd64-64-Bit Compressed
>
> I performed a heap dump for analysis on the JobManager Java process and
> generated a "Leak Suspects" report using Eclipse MAT.
> The tool is detecting one main suspect (cf. attached screenshots):
>
> One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by
> "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance
> is referenced by
> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @
> 0x8041fb48 , loaded by "<system class loader>".
>
> Has anyone already faced such an issue ?
>
> Best Regards,
> Marc
>
>

Reply via email to