Hi Team,

We have recently enabled Check Pointing in our flink job using
FSStateBackend pointing to S3 bucket.

Below is the sample code for enabling the checkpointing for the job. The
query is each time we cancel the job and restart from the flink dashboard,
a new folder is getting created along with the old checkpointing folder in
the S3 bucket, So is there a way to get rid of these old checkpointed
folders automatically assuming they are not gonna be used for restoring the
state except the latest folder?

env.setStateBackend(new
FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);


Thanks,
Sudhansu

Reply via email to