Hi Sudhansu, if you don't set RETAIN_ON_CANCELLATION, the folder should be cleaned up automatically. If you explicitly want to retain the checkpoint, then there is not much that Flink can do or I may have misunderstood you.
On Tue, May 11, 2021 at 4:09 PM sudhansu jena <sudhansu.jena...@gmail.com> wrote: > Hi Team, > > We have recently enabled Check Pointing in our flink job using > FSStateBackend pointing to S3 bucket. > > Below is the sample code for enabling the checkpointing for the job. The > query is each time we cancel the job and restart from the flink dashboard, > a new folder is getting created along with the old checkpointing folder in > the S3 bucket, So is there a way to get rid of these old checkpointed > folders automatically assuming they are not gonna be used for restoring the > state except the latest folder? > > env.setStateBackend(new > FsStateBackend("s3://flinkcheckpointing/job-name/",true)); > env.enableCheckpointing(1000); > Class<?> unmodColl = > Class.forName("java.util.Collections$UnmodifiableCollection"); > env.getConfig().addDefaultKryoSerializer(unmodColl, > UnmodifiableCollectionsSerializer.class); > CheckpointConfig config = env.getCheckpointConfig(); > > config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > > > Thanks, > Sudhansu >