For further reference, I've created this issue [1] to track the problem. [1] https://issues.apache.org/jira/browse/FLINK-17073
Cheers, Till On Thu, Apr 9, 2020 at 1:20 PM Yun Tang <myas...@live.com> wrote: > Hi Marc > > The left 'chk-X' folders, which should be discarded when removing > checkpoint at the final stage, could also prove that those not discarded > completed checkpoint meta occupied the memory. > > If we treat your average checkpoint meta size as 30KB, 20000 not-discarded > complete checkpoints would occupy about 585MB memory, which is close to > your observed scenario. > > From my point of view, the checkpoint interval of one second is really too > often and would not make much sense in production environment. > > Best > Yun Tang > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* Thursday, April 9, 2020 17:41 > *To:* Marc LEGER <maleger...@gmail.com> > *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)? > > Thanks for reporting this issue Marc. From what you've reported, I think > Yun is right and that the large memory footprint is caused by > CompletedCheckpoints which cannot be removed fast enough. One way to verify > this is to enable TRACE logging because then Flink will log for every > CompletedCheckpoint when it gets discarded. The line should look like this > "Executing discard procedure for Checkpoint". The high number of chk-X > folders on S3 could be the result of the slow discard operations. > > If you want then we can also take a look at the logs and ideally also the > heap dump if you can share them with us. > > I think one difference between Flink 1.10.0 and 1.7.2 is that we are using > a fixed thread pool for running the io operations in 1.10.0. The number of > threads equals the number of cores. In contrast, in Flink 1.7.2 we used a > fork join pool with a max parallelism of 64. This difference could explain > the lower throughput of discard operations because fewer can happen in > parallel. > > Cheers, > Till > > On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER <maleger...@gmail.com> wrote: > > Hello Yun, > > Thank you for your feedback, please find below my answers to your > questions: > > 1. I am using incremental state checkpointing with RocksDB backend and AWS > S3 as a distributed file system, everything is configured in > flink-conf.yaml as follows: > > state.backend: rocksdb > state.backend.incremental: true > # placeholders are replaced at deploy time > state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints > state.backend.rocksdb.localdir: /home/data/flink/rocksdb > > Size of _metdata file in a checkpoint folder for the 3 running jobs: > - job1: 64KB > - job2: 1K > - job3: 10K > > By the way, I have between 10000 and 20000 "chk-X" folders per job in S3. > > 2. Checkpointing is configured to be triggered every second for all the > jobs. Only the interval is set, otherwise everything is kept as default: > > executionEnvironment.enableCheckpointing(1000); > > Best Regards, > Marc > > Le mer. 8 avr. 2020 à 20:48, Yun Tang <myas...@live.com> a écrit : > > Hi Marc > > I think the occupied memory is due to the to-remove complete checkpoints > which are stored in the workQueue of io-executor [1] in > ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that > Executors#newFixedThreadPool would create a ThreadPoolExecutor with a > LinkedBlockingQueue to store runnables. > > To figure out the root cause, would you please check the information below: > > 1. How large of your checkpoint meta, you could view > {checkpoint-dir}/chk-X/_metadata to know the size, you could provide what > state backend you use to help know this. > 2. What is the interval of your checkpoints, a smaller checkpoint > interval might accumulate many completed checkpoints to subsume once a > newer checkpoint completes. > > > [1] > https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260 > [2] > https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234 > > Best > Yun Tang > > ------------------------------ > *From:* Marc LEGER <maleger...@gmail.com> > *Sent:* Wednesday, April 8, 2020 16:50 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Possible memory leak in JobManager (Flink 1.10.0)? > > Hello, > > I am currently testing Flink 1.10.0 but I am facing memory issues with > JobManagers deployed in a standalone cluster configured in HA mode with 3 > TaskManagers (and 3 running jobs). > I do not reproduce the same issues using Flink 1.7.2. > > Basically, whatever the value of "jobmanager.heap.size" property is (I > tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process > is eventually consuming all available memory and is hanging after a few > hours or days (depending on the size of the heap) before being deassociated > from the cluster. > > I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines: > openjdk version "11.0.6" 2020-01-14 > OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10) > Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux > amd64-64-Bit Compressed > > I performed a heap dump for analysis on the JobManager Java process and > generated a "Leak Suspects" report using Eclipse MAT. > The tool is detecting one main suspect (cf. attached screenshots): > > One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by > "<system class loader>" occupies 580,468,280 (92.82%) bytes. The instance > is referenced by > org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @ > 0x8041fb48 , loaded by "<system class loader>". > > Has anyone already faced such an issue ? > > Best Regards, > Marc > >