Hi all, >From a user point of view, I think it makes sense to go for DISTRIBUTED BY with how Timo explained it. +1 for his proposal
Best regards, Martijn On Thu, Nov 2, 2023 at 11:00 AM Timo Walther <twal...@apache.org> wrote: > > Hi Jing, > > I agree this is confusing. THe Spark API calls it bucketBy in the > programmatic API. But anyway, we should discuss the SQL semantics here. > It's like a "WHERE" is called "filter" in the programmatic world. Or a > "SELECT" is called "projection" in code. > > And looking at all the Hive tutorials[1], distributed by should be more > consistent. By using the "INTO n BUCKETS", we still include the > bucketing terminology in the syntax for better understanding. > > If there are no other objections to this topic, I would still prefer to > go with DISTRIBUTED BY. > > Regards, > Timo > > [1] > https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ > > > > On 01.11.23 11:55, Jing Ge wrote: > > Hi Timo, > > > > Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY > > 6" or "BUCKETED INTO 6". > > > > Not really used in SQL, but afaiu Spark uses the concept[1]. > > > > [1] > > https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html > > > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 5:25 PM Timo Walther <twal...@apache.org> wrote: > > > >> Hi Jing, > >> > >> > Have you considered using BUCKET BY directly? > >> > >> Which vendor uses this syntax? Most vendors that I checked call this > >> concept "distribution". > >> > >> In any case, the "BY" is optional, so certain DDL statements would > >> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, > >> we should use the passive voice. > >> > >> > Did you mean users can use their own algorithm? How to do it? > >> > >> "own algorithm" only refers to deciding between a list of partitioning > >> strategies (namely hash and range partitioning) if the connector offers > >> more than one. > >> > >> Regards, > >> Timo > >> > >> > >> On 30.10.23 12:39, Jing Ge wrote: > >>> Hi Timo, > >>> > >>> The FLIP looks great! Thanks for bringing it to our attention! In order > >> to > >>> make sure we are on the same page, I would ask some questions: > >>> > >>> 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > >> mentioned > >>> which is used to distribute rows amond reducers, i.e. focusing on the > >>> shuffle during the computation. The FLIP is focusing more on storage, if > >> I > >>> am not mistaken. Have you considered using BUCKET BY directly? > >>> > >>> 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > >> STRING) > >>> DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > >>> > >>> - For advanced users, the algorithm can be defined explicitly. > >>> - Currently, either HASH() or RANGE(). > >>> > >>> " > >>> Did you mean users can use their own algorithm? How to do it? > >>> > >>> Best regards, > >>> Jing > >>> > >>> On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org> > >> wrote: > >>> > >>>> Let me reply to the feedback from Yunfan: > >>>> > >>>> > Distribute by in DML is also supported by Hive > >>>> > >>>> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > >>>> discussion is about DDL. For DDL, we have more freedom as every vendor > >>>> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > >>>> connector to the connector implementation, not the engine. However, for > >>>> DML we need to watch out for standard compliance and introduce changes > >>>> with high caution. > >>>> > >>>> How a LookupTableSource interprets the DISTRIBUTED BY is > >>>> connector-dependent in my opinion. In general this FLIP is a sink > >>>> ability, but we could have a follow FLIP that helps in distributing load > >>>> of lookup joins. > >>>> > >>>> > to avoid data skew problem > >>>> > >>>> I understand the use case and that it is important to solve it > >>>> eventually. Maybe a solution might be to introduce helper Polymorphic > >>>> Table Functions [1] in the future instead of new syntax. > >>>> > >>>> [1] > >>>> > >>>> > >> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > >>>> > >>>> > >>>> Let me reply to the feedback from Benchao: > >>>> > >>>> > Do you think it's useful to add some extensibility for the hash > >>>> strategy > >>>> > >>>> The hash strategy is fully determined by the connector, not the Flink > >>>> SQL engine. We are not using Flink's hash strategy in any way. If the > >>>> hash strategy for the regular Flink file system connector should be > >>>> changed, this should be expressed via config option. Otherwise we should > >>>> offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> On 30.10.23 10:44, Timo Walther wrote: > >>>>> Hi Jark, > >>>>> > >>>>> my intention was to avoid too complex syntax in the first version. In > >>>>> the past years, we could enable use cases also without this clause, so > >>>>> we should be careful with overloading it with too functionality in the > >>>>> first version. We can still iterate on it later, the interfaces are > >>>>> flexible enough to support more in the future. > >>>>> > >>>>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > >>>>> the bucket number optional. > >>>>> > >>>>> I updated the FLIP accordingly. Now the SupportsBucketing interface > >>>>> declares more methods that help in validation and proving helpful error > >>>>> messages to users. > >>>>> > >>>>> Let me know what you think. > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>>> > >>>>> On 27.10.23 10:20, Jark Wu wrote: > >>>>>> Hi Timo, > >>>>>> > >>>>>> Thanks for starting this discussion. I really like it! > >>>>>> The FLIP is already in good shape, I only have some minor comments. > >>>>>> > >>>>>> 1. Could we also support HASH and RANGE distribution kind on the DDL > >>>>>> syntax? > >>>>>> I noticed that HASH and UNKNOWN are introduced in the Java API, but > >>>>>> not in > >>>>>> the syntax. > >>>>>> > >>>>>> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > >>>> TABLE? > >>>>>> Some storage engines support automatically determining the bucket > >> number > >>>>>> based on > >>>>>> the cluster resources and data size of the table. For example, > >>>>>> StarRocks[1] > >>>>>> and Paimon[2]. > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> [1]: > >>>>>> > >>>> > >> https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > >>>>>> [2]: > >>>>>> > >>>> > >> https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > >>>>>> > >>>>>> On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com> > >>>> wrote: > >>>>>> > >>>>>>> Very thanks Timo for starting this discussion. > >>>>>>> > >>>>>>> Big +1 for this. > >>>>>>> > >>>>>>> The design looks good to me! > >>>>>>> > >>>>>>> We can add some documentation for connector developers. For example: > >>>>>>> for sink, If there needs some keyby, please finish the keyby by the > >>>>>>> connector itself. SupportsBucketing is just a marker interface. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jingsong > >>>>>>> > >>>>>>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org> > >>>> wrote: > >>>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > >>>>>>>> clause [1]. > >>>>>>>> > >>>>>>>> Many SQL vendors expose the concepts of Partitioning, Bucketing, and > >>>>>>>> Clustering. This FLIP continues the work of previous FLIPs and would > >>>>>>>> like to introduce the concept of "Bucketing" to Flink. > >>>>>>>> > >>>>>>>> This is a pure connector characteristic and helps both Apache Kafka > >>>> and > >>>>>>>> Apache Paimon connectors in avoiding a complex WITH clause by > >>>> providing > >>>>>>>> improved syntax. > >>>>>>>> > >>>>>>>> Here is an example: > >>>>>>>> > >>>>>>>> CREATE TABLE MyTable > >>>>>>>> ( > >>>>>>>> uid BIGINT, > >>>>>>>> name STRING > >>>>>>>> ) > >>>>>>>> DISTRIBUTED BY (uid) INTO 6 BUCKETS > >>>>>>>> WITH ( > >>>>>>>> 'connector' = 'kafka' > >>>>>>>> ) > >>>>>>>> > >>>>>>>> The full syntax specification can be found in the document. The > >> clause > >>>>>>>> should be optional and fully backwards compatible. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> > >>>>>>> > >>>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > >