Hello, I am trying to configure my Flink (1.18.1) jobs to store checkpoints on s3 but I am getting the below error.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. ... Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. The checkpoint configurations on the Flink cluster are these: execution.checkpointing.storage: 'filesystem' execution.checkpointing.dir: 's3a://my_checkpoints/checkpoints' execution.checkpointing.incremental: true execution.checkpointing.interval: '3 min' execution.checkpointing.mode: 'EXACTLY_ONCE' execution.checkpointing.timeout: '10 min' execution.checkpointing.min-pause: '1 min' execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.externalized-checkpoint-retention: 'RETAIN_ON_CANCELLATION' fs.s3a.aws.credentials.provider: 'org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider’ And in the Flink jobs: stream_exec_env.enable_checkpointing(checkpoint_interval_ms) stream_exec_env.get_checkpoint_config().set_checkpoint_timeout(checkpoint_timeout_ms) stream_exec_env.get_checkpoint_config().set_max_concurrent_checkpoints(max_concurrent_checkpoints) stream_exec_env.get_checkpoint_config().set_min_pause_between_checkpoints(min_pause_between_checkpoints_ms) stream_exec_env.get_checkpoint_config().enable_externalized_checkpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ) checkpoint_location = "s3a://my_checkpoints/checkpoints" rocks_db_backend = RocksDBStateBackend(checkpoint_location, True) stream_exec_env.set_state_backend(rocks_db_backend) I have also installed these plugins: RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.1/flink-s3-fs-hadoop-1.18.1.jar /opt/flink/plugins/flink-s3-fs-hadoop/ RUN mkdir -p /opt/flink/plugins/flink-s3-fs-presto ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-presto/1.18.1/flink-s3-fs-presto-1.18.1.jar /opt/flink/plugins/flink-s3-fs-presto/ Are there any misconfigurations or anything missing? Kind regards Phil