> if we want to use two sets of credentials, for example to access two
different AWS buckets, that would not be feasible at the moment?
That is correct.
> As it seems that this limitation is quite an important one, is there
a place where we can find this documented?
I don't think it is explicitly documented; likely because we assume that
users configure the filesystem through the flink-conf.yaml (and only
document it as such), which inherently prevents that.
> Would it mean that in order to achieve this, we would have to set up
two clusters and publishing to a temporary medium? For example, using
two clusters one configured to access CephFS, one for AWS S3 then
publish that to Kafka (or use Kafka Connect)?
That would be one approach, yes.
> We are requesting that [it should be possible to configure
credentials per job]
It is exceedingly unlikely for this to be implemented in the foreseeable
future.
There are some workarounds though.
For S3 in particular you could capitalize on the fact that we have 2
filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use
at the same time so long as you use different schemes ( s3a (hadoop) /
s3p (presto) ) for the different buckets.
You could also generalize this by taking an existing filesystem plugin
from Flink and adjusting the contained FileSystemFactory to use a
different scheme and config keys. It's a bit annyoing, but it should
work (now and in the future).
Essentially you'd pretend that there are N completely different
filesystems, but they are actually all the same implementation just with
different configurations.
On 10/03/2022 13:30, Gil De Grove wrote:
Hello Chesnay,
Thanks for the reply.
I wonder something based on your reply, if we want to use two sets of
credentials, for example to access two different AWS buckets, that
would not be feasible at the moment?
One example I have in mind would be to separate the credentials for
accessing data vs storing metadata for a given cluster.
Another use case would be to create a Flink job that consume data
stored on AWS S3 and/or on MinIO.
Would it mean that in order to achieve this, we would have to set up
two clusters and publishing to a temporary medium? For example, using
two clusters one configured to access CephFS, one for AWS S3 then
publish that to Kafka (or use Kafka Connect)?
We are requesting that, as we would like to use the Hybrid source with
a FileSystem and a Kafka consumer, and this limitation would probably
make us rethink the architecture.
As it seems that this limitation is quite an important one, is there a
place where we can find this documented? Maybe a FLIP? Or an entry in
the Flink Documentation?
Thanks again for your help,
Gil
On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ches...@apache.org> wrote:
The FileSystem class is essentially one big singleton, with only 1
instance of each FileSystem implementation being loaded, shared
across all jobs.
For that reason we do not support job-specific FileSystem
configurations.
Note that we generally also don't really support configuring the
FileSystems at runtime. The entire codebase assumes that the
initialization happens when the process is started.
You'll need to run that job in a separate cluster.
Overall, this sounds like something that should run externally;
assert some precondition, then configure Flink appropriately, then
run the job.
On 08/03/2022 08:47, Gil De Grove wrote:
Hello everyone,
First of all, sorry for cross posting, I asked on SO, but David
Anderson suggested me to reach out to the community via the
mailing list. The link to the SO question is the following:
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
I'll post the answer on SO as soon as I have one :)
I post here the content of the question, so if anyone can help,
please let me know;
Summary
We are currently facing an issue with the FileSystem abstraction
in Flink. We have a job that can dynamically connect to an S3
source (meaning it's defined at runtime). We discovered a bug in
our code, and it could be due to a wrong assumption on the way
the FileSystem works.
Bug explanation
During the initialization of the job, (so in the job manager) we
manipulate the FS to check that some files exist in order to fail
gracefully before the job is executed. In our case, we need to
set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on
MinIO. We want the FS configuration to be specific for the job,
and different from the cluster one (different access key,
different endpoint, etc.).
Here is an extract of the code we are using to do so:
|private void validateFileSystemAccess(Configuration
configuration) throws IOException { // Create a plugin manager
from the configuration PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration); //
Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager); // Validate
the FileSystem: an exception is thrown if FS configuration is
wrong Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/")); } |
After starting that specific kind of job, we notice that:
1. the checkpointing does not work for this job, it throws a
credential error.
2. the job manager cannot upload the artifacts needed by the
history server for all jobs already running of all kind (not
only this specific kind of job).
If we do not deploy that kind of job, the upload of artifacts and
the checkpointing work as expected on the cluster.
We think that this issue might come from the
|FileSystem.initialize()| that overrides the configuration for
all the FileSystems. We think that because of this, the next call
to |FileSystem.get()| returns the FileSystem we configured in
|validateFileSystemAccess| instead of the cluster configured one.
Questions
Could our hypothesis be correct? If so, how could we provide a
specific configuration for the FileSystem without impacting the
whole cluster?
Regards,
Gil