[ https://issues.apache.org/jira/browse/FLINK-33080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junrui Li updated FLINK-33080: ------------------------------ Description: When we configure the checkpoint storage at the job level, it can only be done through the following method: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointStorage(xxx); {code} or configure filesystem storage by config optionĀ CheckpointingOptions.CHECKPOINTS_DIRECTORY through the following method: {code:java} Configuration configuration = new Configuration(); configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);{code} However, configure the other type checkpoint storage by the job-side configuration like the following will not take effect: {code:java} Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "aaa.bbb.ccc.CustomCheckpointStorage"); configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); {code} This behavior is unexpected, we should allow this way will take effect. was: When we configure the checkpoint storage at the job level, it can only be done through the following method: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointStorage(xxx); {code} However, configure the checkpoint storage by the job-side configuration like the following will not take effect: {code:java} Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); {code} This behavior is unexpected, we should allow this way will take effect. > The checkpoint storage configured in the job level by config option will not > take effect > ---------------------------------------------------------------------------------------- > > Key: FLINK-33080 > URL: https://issues.apache.org/jira/browse/FLINK-33080 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Reporter: Junrui Li > Assignee: Junrui Li > Priority: Major > Fix For: 1.19.0 > > > When we configure the checkpoint storage at the job level, it can only be > done through the following method: > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getCheckpointConfig().setCheckpointStorage(xxx); {code} > or configure filesystem storage by config optionĀ > CheckpointingOptions.CHECKPOINTS_DIRECTORY through the following method: > {code:java} > Configuration configuration = new Configuration(); > configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration);{code} > However, configure the other type checkpoint storage by the job-side > configuration like the following will not take effect: > {code:java} > Configuration configuration = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, > "aaa.bbb.ccc.CustomCheckpointStorage"); > configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); > {code} > This behavior is unexpected, we should allow this way will take effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)