+1 a FLIP to clarify the idea.

Please be careful to choose which type of state you use here. The doc[1]
says the broadcast state doesn't support RocksDB backend here.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations

David Anderson <dander...@apache.org> 于2024年2月2日周五 23:57写道:

> I've seen enough demand for a streaming broadcast join in the community to
> justify a FLIP -- I think it's a good idea, and look forward to the
> discussion.
>
> David
>
> On Fri, Feb 2, 2024 at 6:55 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>
>> +1 a FLIP for this topic.
>>
>>
>> Best,
>> Feng
>>
>> On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I would definitely expect a FLIP on this topic before moving to
>>> implementation.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote:
>>>
>>>> Hi, Prabhjot.
>>>>
>>>> IIUC, the main reasons why the community has not previously considered
>>>> supporting join hints only in batch mode are as follows:
>>>> 1. In batch mode, multiple join type algorithms were implemented quite
>>>> early on, and
>>>> 2. Stream processing represents a long-running scenario, and it is
>>>> quite difficult to determine whether a small table will become a large
>>>> table after a long period of operation.
>>>>
>>>> However, as you mentioned, join hints do indeed have their significance
>>>> in streaming. If you want to support the implementation of "join hints +
>>>> broadcast join" in streaming, the changes I can currently think of include:
>>>> 1. At optimizer, changing the exchange on the small table side to
>>>> broadcast instead of hash (InputProperty#BROADCAST).
>>>> 2. Unknown changes required at the table runtime level.
>>>>
>>>> You can also discuss it within the community through JIRA, FLIP, or the
>>>> dev mailing list.
>>>>
>>>>
>>>> --
>>>>     Best!
>>>>     Xuyang
>>>>
>>>>
>>>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <
>>>> user@flink.apache.org> wrote:
>>>>
>>>> Hi Feng,
>>>>
>>>> Thanks for your prompt response.
>>>> If we were to solve this in Flink, my higher level viewpoint is:
>>>>
>>>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>>>> across Table api (e.g. via a `left.join(right, <predicate>,
>>>> join_type="broadcast")
>>>> 2. Then, support a Broadcast hint that would utilize this new join
>>>> based on the hint type
>>>>
>>>> What do you think about this ?
>>>> Would you have some pointers on how/where to start on the first part ?
>>>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>>>> purpose)
>>>>
>>>> Thanks,
>>>> Prabhjot
>>>>
>>>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>>>>
>>>>> Hi Prabhjot
>>>>>
>>>>> I think this is a reasonable scenario. If there is a large table and a
>>>>> very small table for regular join, without broadcasting the regular join,
>>>>> it can easily cause data skew.
>>>>> We have also encountered similar problems too. Currently, we can only
>>>>> copy multiple copies of the small table using the union all and append
>>>>> random values to alleviate data skewness.
>>>>>
>>>>>
>>>>> Best,
>>>>> Feng
>>>>>
>>>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
>>>>> user@flink.apache.org> wrote:
>>>>>
>>>>>> Hello folks,
>>>>>>
>>>>>>
>>>>>> We have a use case where we have a few stream-stream joins, requiring
>>>>>> us to join a very large table with a much smaller table, essentially
>>>>>> enriching the large table with a permutation on the smaller table 
>>>>>> (Consider
>>>>>> deriving all orders/sessions for a new location). Given the nature of the
>>>>>> dataset, if we use a typical join that uses Hash distribution to 
>>>>>> co-locate
>>>>>> the records for each join key, we end up with a very skewed join (a few
>>>>>> task slots getting all of the work, as against a good distribution).
>>>>>>
>>>>>>
>>>>>> We’ve internally implemented a Salting based solution where we salt
>>>>>> the smaller table and join it with the larger table. While this works in
>>>>>> the POC stage, we’d like to leverage flink as much as possible to do 
>>>>>> such a
>>>>>> join.
>>>>>>
>>>>>>
>>>>>> By the nature of the problem, a broadcast join seems theoretically
>>>>>> helpful. We’ve done an exploration on query hints supported in Flink,
>>>>>> starting with this FLIP
>>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
>>>>>> and this FLIP
>>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
>>>>>> .
>>>>>>
>>>>>>
>>>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>>>>>> `Exchange` step of the join, when creating the physical plan (Possibly
>>>>>> because the hint would require the stream-stream join to also support
>>>>>> Broadcast join with SQL)
>>>>>>
>>>>>>
>>>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast
>>>>>> hint parsed from the query:
>>>>>>
>>>>>>
>>>>>> ```sql
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>>>>>> options:[gpla]]]])
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> However, the Flink optimizer ignores the hint and still represents
>>>>>> the join as a regular `hash` join in the `Exchange` step:
>>>>>>
>>>>>>
>>>>>> ```sql
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> In Flink `StreamExecExchange`, the translation happens only via the
>>>>>> `HASH` distribution type
>>>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
>>>>>> unlike in the Flink `BatchExecExchange`, the translation can happen via a
>>>>>> multitude of options
>>>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
>>>>>> (`HASH/BROADCAST`).
>>>>>>
>>>>>>
>>>>>>
>>>>>> Quoting this Flink mailing list discussion
>>>>>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z>
>>>>>> for the FLIP that implemented the Broadcast join hint for batch sql:
>>>>>>
>>>>>>
>>>>>> > But currently, only in batch the optimizer has different Join
>>>>>> strategies for Join and
>>>>>>
>>>>>> > there is no choice of join strategies in the stream. The join hints
>>>>>> listed in the current
>>>>>>
>>>>>> > flip should be ignored (maybe can be warned) in streaming mode.
>>>>>> When in the
>>>>>>
>>>>>> > future the stream mode has the choice of join strategies, I think
>>>>>> that's a good time > to discuss that the join hint can affect the 
>>>>>> streaming
>>>>>> SQL.
>>>>>>
>>>>>>
>>>>>> What do you folks think about the possibility of a Broadcast join for
>>>>>> Streaming Sql along with its corresponding Broadcast hint, that lets the
>>>>>> user choose the kind of distribution they’d want with the dataset ?
>>>>>>
>>>>>> Happy to learn more about this and hopefully implement it, if it
>>>>>> doesn’t sound like a terrible idea.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Prabhjot
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to