Hi Feng,

Thanks for your prompt response.
If we were to solve this in Flink, my higher level viewpoint is:

1. First to implement Broadcast join in Flink Streaming SQL, that works
across Table api (e.g. via a `left.join(right, <predicate>,
join_type="broadcast")
2. Then, support a Broadcast hint that would utilize this new join based on
the hint type

What do you think about this ?
Would you have some pointers on how/where to start on the first part ? (I'm
thinking we'd have to extend the Broadcast state pattern for this purpose)

Thanks,
Prabhjot

On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote:

> Hi Prabhjot
>
> I think this is a reasonable scenario. If there is a large table and a
> very small table for regular join, without broadcasting the regular join,
> it can easily cause data skew.
> We have also encountered similar problems too. Currently, we can only copy
> multiple copies of the small table using the union all and append random
> values to alleviate data skewness.
>
>
> Best,
> Feng
>
> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
> user@flink.apache.org> wrote:
>
>> Hello folks,
>>
>>
>> We have a use case where we have a few stream-stream joins, requiring us
>> to join a very large table with a much smaller table, essentially enriching
>> the large table with a permutation on the smaller table (Consider deriving
>> all orders/sessions for a new location). Given the nature of the dataset,
>> if we use a typical join that uses Hash distribution to co-locate the
>> records for each join key, we end up with a very skewed join (a few task
>> slots getting all of the work, as against a good distribution).
>>
>>
>> We’ve internally implemented a Salting based solution where we salt the
>> smaller table and join it with the larger table. While this works in the
>> POC stage, we’d like to leverage flink as much as possible to do such a
>> join.
>>
>>
>> By the nature of the problem, a broadcast join seems theoretically
>> helpful. We’ve done an exploration on query hints supported in Flink,
>> starting with this FLIP
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
>> and this FLIP
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
>> .
>>
>>
>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>> `Exchange` step of the join, when creating the physical plan (Possibly
>> because the hint would require the stream-stream join to also support
>> Broadcast join with SQL)
>>
>>
>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
>> parsed from the query:
>>
>>
>> ```sql
>>
>> ...
>>
>> ...
>>
>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>> options:[gpla]]]])
>>
>> ...
>>
>> ```
>>
>>
>> However, the Flink optimizer ignores the hint and still represents the
>> join as a regular `hash` join in the `Exchange` step:
>>
>>
>> ```sql
>>
>> ...
>>
>> ...
>>
>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>
>> ...
>>
>> ```
>>
>>
>> In Flink `StreamExecExchange`, the translation happens only via the
>> `HASH` distribution type
>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
>> unlike in the Flink `BatchExecExchange`, the translation can happen via a
>> multitude of options
>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
>> (`HASH/BROADCAST`).
>>
>>
>>
>> Quoting this Flink mailing list discussion
>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for
>> the FLIP that implemented the Broadcast join hint for batch sql:
>>
>>
>> > But currently, only in batch the optimizer has different Join
>> strategies for Join and
>>
>> > there is no choice of join strategies in the stream. The join hints
>> listed in the current
>>
>> > flip should be ignored (maybe can be warned) in streaming mode. When in
>> the
>>
>> > future the stream mode has the choice of join strategies, I think
>> that's a good time > to discuss that the join hint can affect the streaming
>> SQL.
>>
>>
>> What do you folks think about the possibility of a Broadcast join for
>> Streaming Sql along with its corresponding Broadcast hint, that lets the
>> user choose the kind of distribution they’d want with the dataset ?
>>
>> Happy to learn more about this and hopefully implement it, if it doesn’t
>> sound like a terrible idea.
>>
>>
>> Thanks,
>>
>> Prabhjot
>>
>>
>>
>>

Reply via email to