Hi all,

>From a user point of view, I think it makes sense to go for
DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther <twal...@apache.org> wrote:
>
> Hi Jing,
>
> I agree this is confusing. THe Spark API calls it bucketBy in the
> programmatic API. But anyway, we should discuss the SQL semantics here.
> It's like a "WHERE" is called "filter" in the programmatic world. Or a
> "SELECT" is called "projection" in code.
>
> And looking at all the Hive tutorials[1], distributed by should be more
> consistent. By using the "INTO n BUCKETS", we still include the
> bucketing terminology in the syntax for better understanding.
>
> If there are no other objections to this topic, I would still prefer to
> go with DISTRIBUTED BY.
>
> Regards,
> Timo
>
> [1]
> https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/
>
>
>
> On 01.11.23 11:55, Jing Ge wrote:
> > Hi Timo,
> >
> > Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
> > 6" or "BUCKETED INTO 6".
> >
> > Not really used in SQL, but afaiu Spark uses the concept[1].
> >
> > [1]
> > https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html
> >
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 5:25 PM Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Jing,
> >>
> >>   > Have you considered using BUCKET BY directly?
> >>
> >> Which vendor uses this syntax? Most vendors that I checked call this
> >> concept "distribution".
> >>
> >> In any case, the "BY" is optional, so certain DDL statements would
> >> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
> >> we should use the passive voice.
> >>
> >>   > Did you mean users can use their own algorithm? How to do it?
> >>
> >> "own algorithm" only refers to deciding between a list of partitioning
> >> strategies (namely hash and range partitioning) if the connector offers
> >> more than one.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 30.10.23 12:39, Jing Ge wrote:
> >>> Hi Timo,
> >>>
> >>> The FLIP looks great! Thanks for bringing it to our attention! In order
> >> to
> >>> make sure we are on the same page, I would ask some questions:
> >>>
> >>> 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
> >> mentioned
> >>> which is used to distribute rows amond reducers, i.e. focusing on the
> >>> shuffle during the computation. The FLIP is focusing more on storage, if
> >> I
> >>> am not mistaken. Have you considered using BUCKET BY directly?
> >>>
> >>> 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
> >> STRING)
> >>> DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >>>
> >>>      - For advanced users, the algorithm can be defined explicitly.
> >>>      - Currently, either HASH() or RANGE().
> >>>
> >>> "
> >>> Did you mean users can use their own algorithm? How to do it?
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>> On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org>
> >> wrote:
> >>>
> >>>> Let me reply to the feedback from Yunfan:
> >>>>
> >>>>    > Distribute by in DML is also supported by Hive
> >>>>
> >>>> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
> >>>> discussion is about DDL. For DDL, we have more freedom as every vendor
> >>>> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
> >>>> connector to the connector implementation, not the engine. However, for
> >>>> DML we need to watch out for standard compliance and introduce changes
> >>>> with high caution.
> >>>>
> >>>> How a LookupTableSource interprets the DISTRIBUTED BY is
> >>>> connector-dependent in my opinion. In general this FLIP is a sink
> >>>> ability, but we could have a follow FLIP that helps in distributing load
> >>>> of lookup joins.
> >>>>
> >>>>    > to avoid data skew problem
> >>>>
> >>>> I understand the use case and that it is important to solve it
> >>>> eventually. Maybe a solution might be to introduce helper Polymorphic
> >>>> Table Functions [1] in the future instead of new syntax.
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> >>>>
> >>>>
> >>>> Let me reply to the feedback from Benchao:
> >>>>
> >>>>    > Do you think it's useful to add some extensibility for the hash
> >>>> strategy
> >>>>
> >>>> The hash strategy is fully determined by the connector, not the Flink
> >>>> SQL engine. We are not using Flink's hash strategy in any way. If the
> >>>> hash strategy for the regular Flink file system connector should be
> >>>> changed, this should be expressed via config option. Otherwise we should
> >>>> offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 30.10.23 10:44, Timo Walther wrote:
> >>>>> Hi Jark,
> >>>>>
> >>>>> my intention was to avoid too complex syntax in the first version. In
> >>>>> the past years, we could enable use cases also without this clause, so
> >>>>> we should be careful with overloading it with too functionality in the
> >>>>> first version. We can still iterate on it later, the interfaces are
> >>>>> flexible enough to support more in the future.
> >>>>>
> >>>>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
> >>>>> the bucket number optional.
> >>>>>
> >>>>> I updated the FLIP accordingly. Now the SupportsBucketing interface
> >>>>> declares more methods that help in validation and proving helpful error
> >>>>> messages to users.
> >>>>>
> >>>>> Let me know what you think.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 27.10.23 10:20, Jark Wu wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> Thanks for starting this discussion. I really like it!
> >>>>>> The FLIP is already in good shape, I only have some minor comments.
> >>>>>>
> >>>>>> 1. Could we also support HASH and RANGE distribution kind on the DDL
> >>>>>> syntax?
> >>>>>> I noticed that HASH and UNKNOWN are introduced in the Java API, but
> >>>>>> not in
> >>>>>> the syntax.
> >>>>>>
> >>>>>> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
> >>>> TABLE?
> >>>>>> Some storage engines support automatically determining the bucket
> >> number
> >>>>>> based on
> >>>>>> the cluster resources and data size of the table. For example,
> >>>>>> StarRocks[1]
> >>>>>> and Paimon[2].
> >>>>>>
> >>>>>> Best,
> >>>>>> Jark
> >>>>>>
> >>>>>> [1]:
> >>>>>>
> >>>>
> >> https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
> >>>>>> [2]:
> >>>>>>
> >>>>
> >> https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket
> >>>>>>
> >>>>>> On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Very thanks Timo for starting this discussion.
> >>>>>>>
> >>>>>>> Big +1 for this.
> >>>>>>>
> >>>>>>> The design looks good to me!
> >>>>>>>
> >>>>>>> We can add some documentation for connector developers. For example:
> >>>>>>> for sink, If there needs some keyby, please finish the keyby by the
> >>>>>>> connector itself. SupportsBucketing is just a marker interface.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jingsong
> >>>>>>>
> >>>>>>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> >>>>>>>> clause [1].
> >>>>>>>>
> >>>>>>>> Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> >>>>>>>> Clustering. This FLIP continues the work of previous FLIPs and would
> >>>>>>>> like to introduce the concept of "Bucketing" to Flink.
> >>>>>>>>
> >>>>>>>> This is a pure connector characteristic and helps both Apache Kafka
> >>>> and
> >>>>>>>> Apache Paimon connectors in avoiding a complex WITH clause by
> >>>> providing
> >>>>>>>> improved syntax.
> >>>>>>>>
> >>>>>>>> Here is an example:
> >>>>>>>>
> >>>>>>>> CREATE TABLE MyTable
> >>>>>>>>       (
> >>>>>>>>         uid BIGINT,
> >>>>>>>>         name STRING
> >>>>>>>>       )
> >>>>>>>>       DISTRIBUTED BY (uid) INTO 6 BUCKETS
> >>>>>>>>       WITH (
> >>>>>>>>         'connector' = 'kafka'
> >>>>>>>>       )
> >>>>>>>>
> >>>>>>>> The full syntax specification can be found in the document. The
> >> clause
> >>>>>>>> should be optional and fully backwards compatible.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>

Reply via email to