The FileSystem class is essentially one big singleton, with only 1 instance of each FileSystem implementation being loaded, shared across all jobs.
For that reason we do not support job-specific FileSystem configurations.
Note that we generally also don't really support configuring the FileSystems at runtime. The entire codebase assumes that the initialization happens when the process is started.

You'll need to run that job in a separate cluster.

Overall, this sounds like something that should run externally; assert some precondition, then configure Flink appropriately, then run the job.

On 08/03/2022 08:47, Gil De Grove wrote:
Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David Anderson suggested me to reach out to the community via the mailing list. The link to the SO question is the following: https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help, please let me know;


      Summary

We are currently facing an issue with the FileSystem abstraction in Flink. We have a job that can dynamically connect to an S3 source (meaning it's defined at runtime). We discovered a bug in our code, and it could be due to a wrong assumption on the way the FileSystem works.


      Bug explanation

During the initialization of the job, (so in the job manager) we manipulate the FS to check that some files exist in order to fail gracefully before the job is executed. In our case, we need to set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We want the FS configuration to be specific for the job, and different from the cluster one (different access key, different endpoint, etc.).

Here is an extract of the code we are using to do so:

|private void validateFileSystemAccess(Configuration configuration) throws IOException { // Create a plugin manager from the configuration PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); // Init the FileSystem from the configuration FileSystem.initialize(configuration, pluginManager); // Validate the FileSystem: an exception is thrown if FS configuration is wrong Path archiverPath = new Path(this.archiverPath); archiverPath.getFileSystem().exists(new Path("/")); } |

After starting that specific kind of job, we notice that:

 1. the checkpointing does not work for this job, it throws a
    credential error.
 2. the job manager cannot upload the artifacts needed by the history
    server for all jobs already running of all kind (not only this
    specific kind of job).

If we do not deploy that kind of job, the upload of artifacts and the checkpointing work as expected on the cluster.

We think that this issue might come from the |FileSystem.initialize()| that overrides the configuration for all the FileSystems. We think that because of this, the next call to |FileSystem.get()| returns the FileSystem we configured in |validateFileSystemAccess| instead of the cluster configured one.


      Questions

Could our hypothesis be correct? If so, how could we provide a specific configuration for the FileSystem without impacting the whole cluster?

Regards,
Gil

Reply via email to