Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David Anderson
suggested me to reach out to the community via the mailing list. The link
to the SO question is the following:
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help, please let
me know;

Summary

We are currently facing an issue with the FileSystem abstraction in Flink.
We have a job that can dynamically connect to an S3 source (meaning it's
defined at runtime). We discovered a bug in our code, and it could be due
to a wrong assumption on the way the FileSystem works.
Bug explanation

During the initialization of the job, (so in the job manager) we manipulate
the FS to check that some files exist in order to fail gracefully before
the job is executed. In our case, we need to set dynamically the FS. It can
be either HDFS, S3 on AWS or S3 on MinIO. We want the FS configuration to
be specific for the job, and different from the cluster one (different
access key, different endpoint, etc.).

Here is an extract of the code we are using to do so:

  private void validateFileSystemAccess(Configuration configuration)
throws IOException {
    // Create a plugin manager from the configuration
    PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);

    // Init the FileSystem from the configuration
    FileSystem.initialize(configuration, pluginManager);

    // Validate the FileSystem: an exception is thrown if FS
configuration is wrong
    Path archiverPath = new Path(this.archiverPath);
    archiverPath.getFileSystem().exists(new Path("/"));
  }

After starting that specific kind of job, we notice that:

   1. the checkpointing does not work for this job, it throws a credential
   error.
   2. the job manager cannot upload the artifacts needed by the history
   server for all jobs already running of all kind (not only this specific
   kind of job).

If we do not deploy that kind of job, the upload of artifacts and the
checkpointing work as expected on the cluster.

We think that this issue might come from the FileSystem.initialize() that
overrides the configuration for all the FileSystems. We think that because
of this, the next call to FileSystem.get() returns the FileSystem we
configured in validateFileSystemAccess instead of the cluster configured
one.
Questions

Could our hypothesis be correct? If so, how could we provide a specific
configuration for the FileSystem without impacting the whole cluster?
Regards,
Gil

Reply via email to