Hi Jingsong, I would like to understand this FLIP (?) a bit better, but I am missing some background, I believe. So, some basic questions:
1) Does the PARTITION BY clause only have an effect for sink tables defining how data should be partitioning the sink system or does it also make a difference for source tables? My understanding is that it also makes a difference for source tables (e.g. if the source system supports partition pruning). I suppose, for source tables Flink does not check/enforce this, but trusts that the partitioning information is correct?! 2) I suppose it is up to the connector implementation whether/how to interpret the partition information. How will this work? 3) For Kafka, I suppose, the most common partitioning strategy is by key. FLIP-107 contains a proposal on how to define the key (which fields of the schema should become part of the key) when writing to Kafka via Flink SQL. How does this relate to the PARTITION BY clause? Thanks, Konstantin On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[email protected]> wrote: > Hi all, > > ## Motivation > > FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an > extent that let us support Hive's partitioning. > But this partition definition is completely specific to Hive/File > systems, with the continuous development of the system, there are new > requirements: > > - FLIP-107 [2] requirements: A common requirement is to create a custom > partitioning of the data. We should have a way to specify/compute target > partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the > only way to control partitioning. > > - Apache Iceberg partitioning [3] requirements: Iceberg produces partition > values by taking a column value and optionally transforming it. Iceberg is > responsible for converting event_time into event_date, and keeps track of > the relationship. > > So I think it is better to introduce partitioning strategies to Flink, > the partitioning strategies is similar to partitioning in traditional > database like Oracle [4]. > > ## Proposed Partitioning DDL > > Hash Partitioning Tables: > > CREATE TABLE kafka_table ( > id STRING, > name STRING, > date: DATE ... ) > PARTITIONED BY (HASH(id, name)) > > Explicit Partitioning Tables (Introduced in FLIP-63): > > CREATE TABLE fs_table ( > name STRING, > date: DATE ... ) > PARTITIONED BY (date) > > (Can we remove the brackets when there is only a single layer partition? => > "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" ) > > Composite Partitioning Tables: > > CREATE TABLE fs_table ( > name STRING, > date: Date > ... ) > PARTITIONED BY (year(date), month(date), day(date)) > > Composite Explicit Partitioning Tables (Introduced in FLIP-63): > > CREATE TABLE fs_table ( > name STRING, > date: Date, > y: STRING,' > m: STRING, > d: STRING, > ... ) > PARTITIONED BY (y, m, d) > > ## Rejected Alternatives > > Composite Partitioning Tables DDL like Oracle: > > CREATE TABLE invoices ( > invoice_no NUMBER NOT NULL, > invoice_date DATE NOT NULL, > comments VARCHAR2(500)) > PARTITION BY RANGE (invoice_date) > SUBPARTITION BY HASH (invoice_no) > SUBPARTITIONS 8 ( > PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001', > 'DD/MM/YYYY')), > PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001', > 'DD/MM/YYYY')), > PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001', > 'DD/MM/YYYY')), > PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002', > 'DD/MM/YYYY')); > > - First, Multi level partitioning is a common thing in big data systems. > - Second, the syntax of "SUBPARTITIONS" is not only more complex, but also > completely different from big data systems such as hive. Big data systems > need to specify less partition information than traditional ones, so it is > more natural to write all partitions in one bracket. > > ## Other Interface changes > > It can be imagined that this change will involve many Catalog / Table > related interfaces, and it is necessary to replace the previous > `List<String> partitionKeys` with `partitioning strategies`. > > What do you think? > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records > [3]http://iceberg.apache.org/partitioning/ > [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes > > Best, > Jingsong > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk
