[ 
https://issues.apache.org/jira/browse/FLINK-19595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-19595:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Flink SQL support S3 select
> ---------------------------
>
>                 Key: FLINK-19595
>                 URL: https://issues.apache.org/jira/browse/FLINK-19595
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystems, Table SQL / Ecosystem
>            Reporter: liuxiaolong
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>         Attachments: image-2020-11-02-18-08-11-461.png, 
> image-2020-11-02-18-18-14-961.png
>
>
> h4. Summarize
> Flink is based on S3AInputStream.java to select datas stored in Tencent COS, 
> it will call the getObject function of AmazonS3Client.java. 
> Now, Tencent COS  have already support to pushdown the CSV and Parquert file 
> format.
> In these cases, using getObject to select datas will wastes a lots of 
> bandwidth.
> So, I think Flink SQL should support S3 Select, to reduce the waste of 
> bandwidth.
>  
> h4. Design
> 1. In HiveMapredSplitReader.java , we used int[] selectedFields to construct 
> S3 SELECT SQL. And we created a new Class named S3SelectCsvReader which used 
> AmazonS3Client.selectObjectContent function to readLine CSV File.
> !image-2020-11-02-18-08-11-461.png|width=535,height=967!
>  
> !image-2020-11-02-18-18-14-961.png|width=629,height=284!
>  
> 2.  Flink Demo Table:
> 1) Table schema
> Flink SQL> desc cos.test_s3a;
>  root
> |– name: STRING (col1)|
> |– age: INT           (col2)|
> |– dt: STRING      (col3,it's a partition column)|
>  
> 2) Conversion relationship (FLINK SQL Convert To S3 SELECT SQL)
> FlinkSQL                                                                      
>                         S3 SELECT SQL
> select name from cos.test_s3a;                                             => 
>       SELECT s._1, null FROM S3Object s
> select age from cos.test_s3a;                                                 
> =>      SELECT null, s._2 FROM S3Object s
> select dt, name, age from cos.test_s3a;                                =>     
>   SELECT s._1, s._2 FROM S3Object s
> select dt from cos.test_s3a;                                                  
>   =>      SELECT null, null FROM S3Object s
> select * from cos.test_s3a;                                                   
>    =>      SELECT s._1, s._2 FROM S3Object s
> select name from cos.test_s3a where dt='2020-07-15';      =>      SELECT 
> s._1, null FROM S3Object s
>  
> 3) Patch Commit
> https://github.com/Coderlxl/flink/commit/b211f4830a7301bf9283a6d37209000b176913ad



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to