Which Flink version are you using?

On 5/11/2021 4:09 PM, sudhansu jena wrote:
Hi Team,

We have recently enabled Check Pointing in our flink job using FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The query is each time we cancel the job and restart from the flink dashboard, a new folder is getting created along with the old checkpointing folder in the S3 bucket, So is there a way to get rid of these old checkpointed folders automatically assuming they are not gonna be used for restoring the state except the latest folder?

env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu


Reply via email to