Hey Chesnay,

Thanks for the thorough answer, much appreciated.
Sorry for the "requesting []...", it was a loss in translation, that passed
my second reading check, the correct verb should have been "asking" :). It
was no request to the community at all, sorry again for that.

The solution to implement a `n scheme` factory that would be configured at
deploy time for our jobs seems interesting to investigate, it crossed our
mind when we discovered the limitation.

Thanks again for all your help, really appreciate it.

Regards,
Gil



On Thu, 10 Mar 2022 at 17:05, Chesnay Schepler <ches...@apache.org> wrote:

> > if we want to use two sets of credentials, for example to access two
> different AWS buckets, that would not be feasible at the moment?
>
> That is correct.
>
> > As it seems that this limitation is quite an important one, is there a
> place where we can find this documented?
>
> I don't think it is explicitly documented; likely because we assume that
> users configure the filesystem through the flink-conf.yaml (and only
> document it as such), which inherently prevents that.
>
> > Would it mean that in order to achieve this, we would have to set up two
> clusters and publishing to a temporary medium? For example, using two
> clusters one configured to access CephFS, one for AWS S3 then publish that
> to Kafka (or use Kafka Connect)?
>
> That would be one approach, yes.
>
> > We are requesting that [it should be possible to configure credentials
> per job]
>
> It is exceedingly unlikely for this to be implemented in the foreseeable
> future.
>
>
> There are some workarounds though.
> For S3 in particular you could capitalize on the fact that we have 2
> filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use at
> the same time so long as you use different schemes ( s3a (hadoop) / s3p
> (presto) ) for the different buckets.
> You could also generalize this by taking an existing filesystem plugin
> from Flink and adjusting the contained FileSystemFactory to use a different
> scheme and config keys. It's a bit annyoing, but it should work (now and in
> the future).
> Essentially you'd pretend that there are N completely different
> filesystems, but they are actually all the same implementation just with
> different configurations.
>
> On 10/03/2022 13:30, Gil De Grove wrote:
>
> Hello Chesnay,
>
> Thanks for the reply.
>
> I wonder something based on your reply, if we want to use two sets of
> credentials, for example to access two different AWS buckets, that would
> not be feasible at the moment?
> One example I have in mind would be to separate the credentials for
> accessing data vs storing metadata for a given cluster.
> Another use case would be to create a Flink job that consume data stored
> on AWS S3 and/or on MinIO.
> Would it mean that in order to achieve this, we would have to set up two
> clusters and publishing to a temporary medium? For example, using two
> clusters one configured to access CephFS, one for AWS S3 then publish that
> to Kafka (or use Kafka Connect)?
>
> We are requesting that, as we would like to use the Hybrid source with a
> FileSystem and a Kafka consumer, and this limitation would probably make us
> rethink the architecture.
> As it seems that this limitation is quite an important one, is there a
> place where we can find this documented? Maybe a FLIP? Or an entry in the
> Flink Documentation?
>
> Thanks again for your help,
> Gil
>
>
> On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ches...@apache.org> wrote:
>
>> The FileSystem class is essentially one big singleton, with only 1
>> instance of each FileSystem implementation being loaded, shared across all
>> jobs.
>> For that reason we do not support job-specific FileSystem configurations.
>> Note that we generally also don't really support configuring the
>> FileSystems at runtime. The entire codebase assumes that the initialization
>> happens when the process is started.
>>
>> You'll need to run that job in a separate cluster.
>>
>> Overall, this sounds like something that should run externally; assert
>> some precondition, then configure Flink appropriately, then run the job.
>>
>> On 08/03/2022 08:47, Gil De Grove wrote:
>>
>> Hello everyone,
>>
>> First of all, sorry for cross posting, I asked on SO, but David Anderson
>> suggested me to reach out to the community via the mailing list. The link
>> to the SO question is the following:
>> https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>>
>> I'll post the answer on SO as soon as I have one :)
>>
>> I post here the content of the question, so if anyone can help, please
>> let me know;
>>
>> Summary
>>
>> We are currently facing an issue with the FileSystem abstraction in
>> Flink. We have a job that can dynamically connect to an S3 source (meaning
>> it's defined at runtime). We discovered a bug in our code, and it could be
>> due to a wrong assumption on the way the FileSystem works.
>> Bug explanation
>>
>> During the initialization of the job, (so in the job manager) we
>> manipulate the FS to check that some files exist in order to fail
>> gracefully before the job is executed. In our case, we need to set
>> dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We
>> want the FS configuration to be specific for the job, and different from
>> the cluster one (different access key, different endpoint, etc.).
>>
>> Here is an extract of the code we are using to do so:
>>
>>   private void validateFileSystemAccess(Configuration configuration) throws 
>> IOException {
>>     // Create a plugin manager from the configuration
>>     PluginManager pluginManager = 
>> PluginUtils.createPluginManagerFromRootFolder(configuration);
>>
>>     // Init the FileSystem from the configuration
>>     FileSystem.initialize(configuration, pluginManager);
>>
>>     // Validate the FileSystem: an exception is thrown if FS configuration 
>> is wrong
>>     Path archiverPath = new Path(this.archiverPath);
>>     archiverPath.getFileSystem().exists(new Path("/"));
>>   }
>>
>> After starting that specific kind of job, we notice that:
>>
>>    1. the checkpointing does not work for this job, it throws a
>>    credential error.
>>    2. the job manager cannot upload the artifacts needed by the history
>>    server for all jobs already running of all kind (not only this specific
>>    kind of job).
>>
>> If we do not deploy that kind of job, the upload of artifacts and the
>> checkpointing work as expected on the cluster.
>>
>> We think that this issue might come from the FileSystem.initialize()
>> that overrides the configuration for all the FileSystems. We think that
>> because of this, the next call to FileSystem.get() returns the
>> FileSystem we configured in validateFileSystemAccess instead of the
>> cluster configured one.
>> Questions
>>
>> Could our hypothesis be correct? If so, how could we provide a specific
>> configuration for the FileSystem without impacting the whole cluster?
>> Regards,
>> Gil
>>
>>
>>
>

Reply via email to