Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions:
1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org> wrote: > Let me reply to the feedback from Yunfan: > > > Distribute by in DML is also supported by Hive > > I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > discussion is about DDL. For DDL, we have more freedom as every vendor > has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > connector to the connector implementation, not the engine. However, for > DML we need to watch out for standard compliance and introduce changes > with high caution. > > How a LookupTableSource interprets the DISTRIBUTED BY is > connector-dependent in my opinion. In general this FLIP is a sink > ability, but we could have a follow FLIP that helps in distributing load > of lookup joins. > > > to avoid data skew problem > > I understand the use case and that it is important to solve it > eventually. Maybe a solution might be to introduce helper Polymorphic > Table Functions [1] in the future instead of new syntax. > > [1] > > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > > > Let me reply to the feedback from Benchao: > > > Do you think it's useful to add some extensibility for the hash > strategy > > The hash strategy is fully determined by the connector, not the Flink > SQL engine. We are not using Flink's hash strategy in any way. If the > hash strategy for the regular Flink file system connector should be > changed, this should be expressed via config option. Otherwise we should > offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > > Regards, > Timo > > > On 30.10.23 10:44, Timo Walther wrote: > > Hi Jark, > > > > my intention was to avoid too complex syntax in the first version. In > > the past years, we could enable use cases also without this clause, so > > we should be careful with overloading it with too functionality in the > > first version. We can still iterate on it later, the interfaces are > > flexible enough to support more in the future. > > > > I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > > the bucket number optional. > > > > I updated the FLIP accordingly. Now the SupportsBucketing interface > > declares more methods that help in validation and proving helpful error > > messages to users. > > > > Let me know what you think. > > > > Regards, > > Timo > > > > > > On 27.10.23 10:20, Jark Wu wrote: > >> Hi Timo, > >> > >> Thanks for starting this discussion. I really like it! > >> The FLIP is already in good shape, I only have some minor comments. > >> > >> 1. Could we also support HASH and RANGE distribution kind on the DDL > >> syntax? > >> I noticed that HASH and UNKNOWN are introduced in the Java API, but > >> not in > >> the syntax. > >> > >> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > TABLE? > >> Some storage engines support automatically determining the bucket number > >> based on > >> the cluster resources and data size of the table. For example, > >> StarRocks[1] > >> and Paimon[2]. > >> > >> Best, > >> Jark > >> > >> [1]: > >> > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > >> [2]: > >> > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > >> > >> On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com> > wrote: > >> > >>> Very thanks Timo for starting this discussion. > >>> > >>> Big +1 for this. > >>> > >>> The design looks good to me! > >>> > >>> We can add some documentation for connector developers. For example: > >>> for sink, If there needs some keyby, please finish the keyby by the > >>> connector itself. SupportsBucketing is just a marker interface. > >>> > >>> Best, > >>> Jingsong > >>> > >>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org> > wrote: > >>>> > >>>> Hi everyone, > >>>> > >>>> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > >>>> clause [1]. > >>>> > >>>> Many SQL vendors expose the concepts of Partitioning, Bucketing, and > >>>> Clustering. This FLIP continues the work of previous FLIPs and would > >>>> like to introduce the concept of "Bucketing" to Flink. > >>>> > >>>> This is a pure connector characteristic and helps both Apache Kafka > and > >>>> Apache Paimon connectors in avoiding a complex WITH clause by > providing > >>>> improved syntax. > >>>> > >>>> Here is an example: > >>>> > >>>> CREATE TABLE MyTable > >>>> ( > >>>> uid BIGINT, > >>>> name STRING > >>>> ) > >>>> DISTRIBUTED BY (uid) INTO 6 BUCKETS > >>>> WITH ( > >>>> 'connector' = 'kafka' > >>>> ) > >>>> > >>>> The full syntax specification can be found in the document. The clause > >>>> should be optional and fully backwards compatible. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> [1] > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > >>> > >> > > > >