> if we want to use two sets of credentials, for example to access two different AWS buckets, that would not be feasible at the moment?

That is correct.

> As it seems that this limitation is quite an important one, is there a place where we can find this documented?

I don't think it is explicitly documented; likely because we assume that users configure the filesystem through the flink-conf.yaml (and only document it as such), which inherently prevents that.

> Would it mean that in order to achieve this, we would have to set up two clusters and publishing to a temporary medium? For example, using two clusters one configured to access CephFS, one for AWS S3 then publish that to Kafka (or use Kafka Connect)?

That would be one approach, yes.

> We are requesting that [it should be possible to configure credentials per job]

It is exceedingly unlikely for this to be implemented in the foreseeable future.


There are some workarounds though.
For S3 in particular you could capitalize on the fact that we have 2 filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use at the same time so long as you use different schemes ( s3a (hadoop) / s3p (presto) ) for the different buckets. You could also generalize this by taking an existing filesystem plugin from Flink and adjusting the contained FileSystemFactory to use a different scheme and config keys. It's a bit annyoing, but it should work (now and in the future). Essentially you'd pretend that there are N completely different filesystems, but they are actually all the same implementation just with different configurations.

On 10/03/2022 13:30, Gil De Grove wrote:
Hello Chesnay,

Thanks for the reply.

I wonder something based on your reply, if we want to use two sets of credentials, for example to access two different AWS buckets, that would not be feasible at the moment? One example I have in mind would be to separate the credentials for accessing data vs storing metadata for a given cluster. Another use case would be to create a Flink job that consume data stored on AWS S3 and/or on MinIO. Would it mean that in order to achieve this, we would have to set up two clusters and publishing to a temporary medium? For example, using two clusters one configured to access CephFS, one for AWS S3 then publish that to Kafka (or use Kafka Connect)?

We are requesting that, as we would like to use the Hybrid source with a FileSystem and a Kafka consumer, and this limitation would probably make us rethink the architecture. As it seems that this limitation is quite an important one, is there a place where we can find this documented? Maybe a FLIP? Or an entry in the Flink Documentation?

Thanks again for your help,
Gil


On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ches...@apache.org> wrote:

    The FileSystem class is essentially one big singleton, with only 1
    instance of each FileSystem implementation being loaded, shared
    across all jobs.
    For that reason we do not support job-specific FileSystem
    configurations.
    Note that we generally also don't really support configuring the
    FileSystems at runtime. The entire codebase assumes that the
    initialization happens when the process is started.

    You'll need to run that job in a separate cluster.

    Overall, this sounds like something that should run externally;
    assert some precondition, then configure Flink appropriately, then
    run the job.

    On 08/03/2022 08:47, Gil De Grove wrote:
    Hello everyone,

    First of all, sorry for cross posting, I asked on SO, but David
    Anderson suggested me to reach out to the community via the
    mailing list. The link to the SO question is the following:
    
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

    I'll post the answer on SO as soon as I have one :)

    I post here the content of the question, so if anyone can help,
    please let me know;


          Summary

    We are currently facing an issue with the FileSystem abstraction
    in Flink. We have a job that can dynamically connect to an S3
    source (meaning it's defined at runtime). We discovered a bug in
    our code, and it could be due to a wrong assumption on the way
    the FileSystem works.


          Bug explanation

    During the initialization of the job, (so in the job manager) we
    manipulate the FS to check that some files exist in order to fail
    gracefully before the job is executed. In our case, we need to
    set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on
    MinIO. We want the FS configuration to be specific for the job,
    and different from the cluster one (different access key,
    different endpoint, etc.).

    Here is an extract of the code we are using to do so:

    |private void validateFileSystemAccess(Configuration
    configuration) throws IOException { // Create a plugin manager
    from the configuration PluginManager pluginManager =
    PluginUtils.createPluginManagerFromRootFolder(configuration); //
    Init the FileSystem from the configuration
    FileSystem.initialize(configuration, pluginManager); // Validate
    the FileSystem: an exception is thrown if FS configuration is
    wrong Path archiverPath = new Path(this.archiverPath);
    archiverPath.getFileSystem().exists(new Path("/")); } |

    After starting that specific kind of job, we notice that:

     1. the checkpointing does not work for this job, it throws a
        credential error.
     2. the job manager cannot upload the artifacts needed by the
        history server for all jobs already running of all kind (not
        only this specific kind of job).

    If we do not deploy that kind of job, the upload of artifacts and
    the checkpointing work as expected on the cluster.

    We think that this issue might come from the
    |FileSystem.initialize()| that overrides the configuration for
    all the FileSystems. We think that because of this, the next call
    to |FileSystem.get()| returns the FileSystem we configured in
    |validateFileSystemAccess| instead of the cluster configured one.


          Questions

    Could our hypothesis be correct? If so, how could we provide a
    specific configuration for the FileSystem without impacting the
    whole cluster?

    Regards,
    Gil


Reply via email to