Hello everyone, First of all, sorry for cross posting, I asked on SO, but David Anderson suggested me to reach out to the community via the mailing list. The link to the SO question is the following: https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
I'll post the answer on SO as soon as I have one :) I post here the content of the question, so if anyone can help, please let me know; Summary We are currently facing an issue with the FileSystem abstraction in Flink. We have a job that can dynamically connect to an S3 source (meaning it's defined at runtime). We discovered a bug in our code, and it could be due to a wrong assumption on the way the FileSystem works. Bug explanation During the initialization of the job, (so in the job manager) we manipulate the FS to check that some files exist in order to fail gracefully before the job is executed. In our case, we need to set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We want the FS configuration to be specific for the job, and different from the cluster one (different access key, different endpoint, etc.). Here is an extract of the code we are using to do so: private void validateFileSystemAccess(Configuration configuration) throws IOException { // Create a plugin manager from the configuration PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); // Init the FileSystem from the configuration FileSystem.initialize(configuration, pluginManager); // Validate the FileSystem: an exception is thrown if FS configuration is wrong Path archiverPath = new Path(this.archiverPath); archiverPath.getFileSystem().exists(new Path("/")); } After starting that specific kind of job, we notice that: 1. the checkpointing does not work for this job, it throws a credential error. 2. the job manager cannot upload the artifacts needed by the history server for all jobs already running of all kind (not only this specific kind of job). If we do not deploy that kind of job, the upload of artifacts and the checkpointing work as expected on the cluster. We think that this issue might come from the FileSystem.initialize() that overrides the configuration for all the FileSystems. We think that because of this, the next call to FileSystem.get() returns the FileSystem we configured in validateFileSystemAccess instead of the cluster configured one. Questions Could our hypothesis be correct? If so, how could we provide a specific configuration for the FileSystem without impacting the whole cluster? Regards, Gil