Hello Chesnay,

Thanks for the reply.

I wonder something based on your reply, if we want to use two sets of
credentials, for example to access two different AWS buckets, that would
not be feasible at the moment?
One example I have in mind would be to separate the credentials for
accessing data vs storing metadata for a given cluster.
Another use case would be to create a Flink job that consume data stored on
AWS S3 and/or on MinIO.
Would it mean that in order to achieve this, we would have to set up two
clusters and publishing to a temporary medium? For example, using two
clusters one configured to access CephFS, one for AWS S3 then publish that
to Kafka (or use Kafka Connect)?

We are requesting that, as we would like to use the Hybrid source with a
FileSystem and a Kafka consumer, and this limitation would probably make us
rethink the architecture.
As it seems that this limitation is quite an important one, is there a
place where we can find this documented? Maybe a FLIP? Or an entry in the
Flink Documentation?

Thanks again for your help,
Gil


On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ches...@apache.org> wrote:

> The FileSystem class is essentially one big singleton, with only 1
> instance of each FileSystem implementation being loaded, shared across all
> jobs.
> For that reason we do not support job-specific FileSystem configurations.
> Note that we generally also don't really support configuring the
> FileSystems at runtime. The entire codebase assumes that the initialization
> happens when the process is started.
>
> You'll need to run that job in a separate cluster.
>
> Overall, this sounds like something that should run externally; assert
> some precondition, then configure Flink appropriately, then run the job.
>
> On 08/03/2022 08:47, Gil De Grove wrote:
>
> Hello everyone,
>
> First of all, sorry for cross posting, I asked on SO, but David Anderson
> suggested me to reach out to the community via the mailing list. The link
> to the SO question is the following:
> https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>
> I'll post the answer on SO as soon as I have one :)
>
> I post here the content of the question, so if anyone can help, please let
> me know;
>
> Summary
>
> We are currently facing an issue with the FileSystem abstraction in Flink.
> We have a job that can dynamically connect to an S3 source (meaning it's
> defined at runtime). We discovered a bug in our code, and it could be due
> to a wrong assumption on the way the FileSystem works.
> Bug explanation
>
> During the initialization of the job, (so in the job manager) we
> manipulate the FS to check that some files exist in order to fail
> gracefully before the job is executed. In our case, we need to set
> dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We
> want the FS configuration to be specific for the job, and different from
> the cluster one (different access key, different endpoint, etc.).
>
> Here is an extract of the code we are using to do so:
>
>   private void validateFileSystemAccess(Configuration configuration) throws 
> IOException {
>     // Create a plugin manager from the configuration
>     PluginManager pluginManager = 
> PluginUtils.createPluginManagerFromRootFolder(configuration);
>
>     // Init the FileSystem from the configuration
>     FileSystem.initialize(configuration, pluginManager);
>
>     // Validate the FileSystem: an exception is thrown if FS configuration is 
> wrong
>     Path archiverPath = new Path(this.archiverPath);
>     archiverPath.getFileSystem().exists(new Path("/"));
>   }
>
> After starting that specific kind of job, we notice that:
>
>    1. the checkpointing does not work for this job, it throws a
>    credential error.
>    2. the job manager cannot upload the artifacts needed by the history
>    server for all jobs already running of all kind (not only this specific
>    kind of job).
>
> If we do not deploy that kind of job, the upload of artifacts and the
> checkpointing work as expected on the cluster.
>
> We think that this issue might come from the FileSystem.initialize() that
> overrides the configuration for all the FileSystems. We think that because
> of this, the next call to FileSystem.get() returns the FileSystem we
> configured in validateFileSystemAccess instead of the cluster configured
> one.
> Questions
>
> Could our hypothesis be correct? If so, how could we provide a specific
> configuration for the FileSystem without impacting the whole cluster?
> Regards,
> Gil
>
>
>

Reply via email to