The flink version we are using is 1.12.2
Thanks, Sudhansu On Tue, May 11, 2021 at 7:48 PM Chesnay Schepler <ches...@apache.org> wrote: > Which Flink version are you using? > > On 5/11/2021 4:09 PM, sudhansu jena wrote: > > Hi Team, > > > > We have recently enabled Check Pointing in our flink job using > > FSStateBackend pointing to S3 bucket. > > > > Below is the sample code for enabling the checkpointing for the job. > > The query is each time we cancel the job and restart from the flink > > dashboard, a new folder is getting created along with the old > > checkpointing folder in the S3 bucket, So is there a way to get rid of > > these old checkpointed folders automatically assuming they are not > > gonna be used for restoring the state except the latest folder? > > > > env.setStateBackend(new > > FsStateBackend("s3://flinkcheckpointing/job-name/",true)); > > env.enableCheckpointing(1000); > > Class<?> unmodColl = > > Class.forName("java.util.Collections$UnmodifiableCollection"); > > env.getConfig().addDefaultKryoSerializer(unmodColl, > > UnmodifiableCollectionsSerializer.class); > > CheckpointConfig config = env.getCheckpointConfig(); > > > config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > > > > > > Thanks, > > Sudhansu > > >