[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579681#comment-17579681
 ] 

Konstantin Knauf commented on FLINK-19589:
------------------------------------------

In terms of the scope of this ticket, I think, it would be good to solve this 

a) for all filesystems (at least Hadoop & Presto S3, Azure, HDFS?)
b) so that different configurations can be applied to each source/sink operator 
(not only only per job). The configuration in the flink-conf.yaml would act as 
a default for connectors and would be used by the runtime itself (HA, 
Checkpointing). 

The implementation can happen in multiple iterations, but should follow a 
common strategy otherwise I fear we'll build multiple island solutions that 
don't really go together well. 

[~jmahonin] Do you think your approach could be extended to cover all 
filesystems? What alternatives are there?
[~jmahonin] Does you approach support different configurations within the same 
Job?

> Support per-connector FileSystem configuration
> ----------------------------------------------
>
>                 Key: FLINK-19589
>                 URL: https://issues.apache.org/jira/browse/FLINK-19589
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystems
>    Affects Versions: 1.12.0
>            Reporter: Padarn Wilson
>            Assignee: Josh Mahonin
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to