Which Flink version are you using?
On 5/11/2021 4:09 PM, sudhansu jena wrote:
Hi Team,
We have recently enabled Check Pointing in our flink job using
FSStateBackend pointing to S3 bucket.
Below is the sample code for enabling the checkpointing for the job.
The query is each time we cancel the job and restart from the flink
dashboard, a new folder is getting created along with the old
checkpointing folder in the S3 bucket, So is there a way to get rid of
these old checkpointed folders automatically assuming they are not
gonna be used for restoring the state except the latest folder?
env.setStateBackend(new
FsStateBackend("s3://flinkcheckpointing/job-name/",true));
env.enableCheckpointing(1000);
Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
Thanks,
Sudhansu