[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579681#comment-17579681 ]
Konstantin Knauf commented on FLINK-19589: ------------------------------------------ In terms of the scope of this ticket, I think, it would be good to solve this a) for all filesystems (at least Hadoop & Presto S3, Azure, HDFS?) b) so that different configurations can be applied to each source/sink operator (not only only per job). The configuration in the flink-conf.yaml would act as a default for connectors and would be used by the runtime itself (HA, Checkpointing). The implementation can happen in multiple iterations, but should follow a common strategy otherwise I fear we'll build multiple island solutions that don't really go together well. [~jmahonin] Do you think your approach could be extended to cover all filesystems? What alternatives are there? [~jmahonin] Does you approach support different configurations within the same Job? > Support per-connector FileSystem configuration > ---------------------------------------------- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems > Affects Versions: 1.12.0 > Reporter: Padarn Wilson > Assignee: Josh Mahonin > Priority: Major > Labels: pull-request-available > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)