Hi Francesco, Finalization also includes the removal of older checkpoints (we're only keeping the last N checkpoints), which could be pretty costly in the case of RocksDB (many small files). Can you check how long the removal of old checkpoint files from S3 takes (there might be some rate limiting involved, for example)?
Best, D. On Tue, May 2, 2023 at 1:59 PM Francesco Leone <[email protected]> wrote: > Hi There, > > We are facing a problem with the flink checkpoints in our flink cluster > made of 38 task managers and 2 job managers (HA). We are running Flink > 1.15.2 on OpenJdk 11 and the checkpoints are stored in AWS S3 with presto. > > The overall checkpoint duration is reported as 8 seconds in the Flink UI > (screenshot attached), but the job manager is reporting a duration of ~328 > seconds in its logs and the checkpoints are taken roughly every 6 minutes > instead of the configured interval of 1 minute. > > JobManager logs are reporting > -_message__:__"Completed checkpoint 2515895 for job > fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes, > checkpointDuration=150047 ms, finalizationTime=178119 ms).",_ > _logger_name__:__"o.a.f.r.c.CheckpointCoordinator" > > --------------------------------------------------------------- > The checkpoint configuration is > > CheckpointConfig config = ... > config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > config.setMaxConcurrentCheckpoints(1); > > checkArgument(maxFailures >= 0); > config.setTolerableCheckpointFailureNumber(maxFailures); > > config.setCheckpointInterval(1000 * 60); //1 minute > config.setCheckpointTimeout(15 * 1000 * 60); //15 minutes > config.setMinPauseBetweenCheckpoints(5000); > > In the flink-conf.yaml > > ----------------------------------------------------------------------------------- > presto.s3.ssl.enabled: true > presto.s3.sse.enabled: true > presto.s3.sse.type: S3 > fs.s3a.buffer.dir: /mnt/data/tmp > > s3.entropy.key: HASH > s3.entropy.length: 4 > > state.backend: rocksdb > state.checkpoints.dir: s3p://xxxxxxxxxxxxxxxxxxxx/checkpoints/HASH/ > > state.storage.fs.memory-threshold: 1048576 > state.storage.fs.write-buffer-size: 4194304 > state.savepoints.dir: s3p://xxxxxxxxxxxxxxxxxxxxxx/savepoints/xxxxxxxx > > state.checkpoints.num-retained: 4 > state.backend.local-recovery: false > state.backend.incremental: true > state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM > > state.backend.rocksdb.options-factory: custom.RocksDbOptionsFactory > state.backend.rocksdb.compression: ZSTD_COMPRESSION > > execution.checkpointing.unaligned: true > taskmanager.state.local.root-dirs: /mnt/data/tmp/local > > ------------------------------------------------------------------------------------- > > Thanks > Kind Regards > > Francesco >
