Hi Sudhansu, I think you do not need to set the config in flink-conf. Best, Guowei
On Thu, May 13, 2021 at 1:06 PM sudhansu jena <sudhansu.jena...@gmail.com> wrote: > Hi Team, > > We have recently enabled Check Pointing in our flink job using > FSStateBackend pointing to S3 bucket. > > Below is the sample code for enabling check pointing though app code and > we are using flink version - 1.12.2 . > > env.setStateBackend(new > FsStateBackend("s3://flinkcheckpointing/job-name/",true)); > env.enableCheckpointing(1000); > Class<?> unmodColl = > Class.forName("java.util.Collections$UnmodifiableCollection"); > env.getConfig().addDefaultKryoSerializer(unmodColl, > UnmodifiableCollectionsSerializer.class); > CheckpointConfig config = env.getCheckpointConfig(); > > config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > > The query is , do we still need to set the below config in flink-conf.yaml > for checkpointing to work. > > *state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/* > > > Thanks, > Sudhansu >