+1 a FLIP for this topic.

Best,
Feng

On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi,
>
> I would definitely expect a FLIP on this topic before moving to
> implementation.
>
> Best regards,
>
> Martijn
>
> On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote:
>
>> Hi, Prabhjot.
>>
>> IIUC, the main reasons why the community has not previously considered
>> supporting join hints only in batch mode are as follows:
>> 1. In batch mode, multiple join type algorithms were implemented quite
>> early on, and
>> 2. Stream processing represents a long-running scenario, and it is quite
>> difficult to determine whether a small table will become a large table
>> after a long period of operation.
>>
>> However, as you mentioned, join hints do indeed have their significance
>> in streaming. If you want to support the implementation of "join hints +
>> broadcast join" in streaming, the changes I can currently think of include:
>> 1. At optimizer, changing the exchange on the small table side to
>> broadcast instead of hash (InputProperty#BROADCAST).
>> 2. Unknown changes required at the table runtime level.
>>
>> You can also discuss it within the community through JIRA, FLIP, or the
>> dev mailing list.
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <user@flink.apache.org>
>> wrote:
>>
>> Hi Feng,
>>
>> Thanks for your prompt response.
>> If we were to solve this in Flink, my higher level viewpoint is:
>>
>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>> across Table api (e.g. via a `left.join(right, <predicate>,
>> join_type="broadcast")
>> 2. Then, support a Broadcast hint that would utilize this new join based
>> on the hint type
>>
>> What do you think about this ?
>> Would you have some pointers on how/where to start on the first part ?
>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>> purpose)
>>
>> Thanks,
>> Prabhjot
>>
>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>>
>>> Hi Prabhjot
>>>
>>> I think this is a reasonable scenario. If there is a large table and a
>>> very small table for regular join, without broadcasting the regular join,
>>> it can easily cause data skew.
>>> We have also encountered similar problems too. Currently, we can only
>>> copy multiple copies of the small table using the union all and append
>>> random values to alleviate data skewness.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hello folks,
>>>>
>>>>
>>>> We have a use case where we have a few stream-stream joins, requiring
>>>> us to join a very large table with a much smaller table, essentially
>>>> enriching the large table with a permutation on the smaller table (Consider
>>>> deriving all orders/sessions for a new location). Given the nature of the
>>>> dataset, if we use a typical join that uses Hash distribution to co-locate
>>>> the records for each join key, we end up with a very skewed join (a few
>>>> task slots getting all of the work, as against a good distribution).
>>>>
>>>>
>>>> We’ve internally implemented a Salting based solution where we salt the
>>>> smaller table and join it with the larger table. While this works in the
>>>> POC stage, we’d like to leverage flink as much as possible to do such a
>>>> join.
>>>>
>>>>
>>>> By the nature of the problem, a broadcast join seems theoretically
>>>> helpful. We’ve done an exploration on query hints supported in Flink,
>>>> starting with this FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
>>>> and this FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
>>>> .
>>>>
>>>>
>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>>>> `Exchange` step of the join, when creating the physical plan (Possibly
>>>> because the hint would require the stream-stream join to also support
>>>> Broadcast join with SQL)
>>>>
>>>>
>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
>>>> parsed from the query:
>>>>
>>>>
>>>> ```sql
>>>>
>>>> ...
>>>>
>>>> ...
>>>>
>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>>>> options:[gpla]]]])
>>>>
>>>> ...
>>>>
>>>> ```
>>>>
>>>>
>>>> However, the Flink optimizer ignores the hint and still represents the
>>>> join as a regular `hash` join in the `Exchange` step:
>>>>
>>>>
>>>> ```sql
>>>>
>>>> ...
>>>>
>>>> ...
>>>>
>>>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>>>
>>>> ...
>>>>
>>>> ```
>>>>
>>>>
>>>> In Flink `StreamExecExchange`, the translation happens only via the
>>>> `HASH` distribution type
>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
>>>> unlike in the Flink `BatchExecExchange`, the translation can happen via a
>>>> multitude of options
>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
>>>> (`HASH/BROADCAST`).
>>>>
>>>>
>>>>
>>>> Quoting this Flink mailing list discussion
>>>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for
>>>> the FLIP that implemented the Broadcast join hint for batch sql:
>>>>
>>>>
>>>> > But currently, only in batch the optimizer has different Join
>>>> strategies for Join and
>>>>
>>>> > there is no choice of join strategies in the stream. The join hints
>>>> listed in the current
>>>>
>>>> > flip should be ignored (maybe can be warned) in streaming mode. When
>>>> in the
>>>>
>>>> > future the stream mode has the choice of join strategies, I think
>>>> that's a good time > to discuss that the join hint can affect the streaming
>>>> SQL.
>>>>
>>>>
>>>> What do you folks think about the possibility of a Broadcast join for
>>>> Streaming Sql along with its corresponding Broadcast hint, that lets the
>>>> user choose the kind of distribution they’d want with the dataset ?
>>>>
>>>> Happy to learn more about this and hopefully implement it, if it
>>>> doesn’t sound like a terrible idea.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Prabhjot
>>>>
>>>>
>>>>
>>>>

Reply via email to