Hello,

I am trying to configure my Flink (1.18.1) jobs to store checkpoints on s3 but 
I am getting the below error.

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create 
checkpoint storage at checkpoint coordinator side.
...
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3a'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please 
ensure that each plugin resides within its own subfolder within the plugins 
directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

The checkpoint configurations on the Flink cluster are these:
      execution.checkpointing.storage: 'filesystem'
      execution.checkpointing.dir: 's3a://my_checkpoints/checkpoints'
      execution.checkpointing.incremental: true
      execution.checkpointing.interval: '3 min'
      execution.checkpointing.mode: 'EXACTLY_ONCE'
      execution.checkpointing.timeout: '10 min'
      execution.checkpointing.min-pause: '1 min'
      execution.checkpointing.max-concurrent-checkpoints: 1
      execution.checkpointing.externalized-checkpoint-retention: 
'RETAIN_ON_CANCELLATION'
      fs.s3a.aws.credentials.provider: 
'org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider’

And in the Flink jobs:
     stream_exec_env.enable_checkpointing(checkpoint_interval_ms)

    
stream_exec_env.get_checkpoint_config().set_checkpoint_timeout(checkpoint_timeout_ms)
 
    
stream_exec_env.get_checkpoint_config().set_max_concurrent_checkpoints(max_concurrent_checkpoints)
    
stream_exec_env.get_checkpoint_config().set_min_pause_between_checkpoints(min_pause_between_checkpoints_ms)

    stream_exec_env.get_checkpoint_config().enable_externalized_checkpoints(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    )

    checkpoint_location = "s3a://my_checkpoints/checkpoints"
    rocks_db_backend = RocksDBStateBackend(checkpoint_location, True)
    stream_exec_env.set_state_backend(rocks_db_backend)
I have also installed these plugins:
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
ADD 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.1/flink-s3-fs-hadoop-1.18.1.jar
 /opt/flink/plugins/flink-s3-fs-hadoop/

RUN mkdir -p /opt/flink/plugins/flink-s3-fs-presto
ADD 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-s3-fs-presto/1.18.1/flink-s3-fs-presto-1.18.1.jar
 /opt/flink/plugins/flink-s3-fs-presto/

Are there any misconfigurations or anything missing?

Kind regards
Phil

Reply via email to