Hi Prabhjot

I think this is a reasonable scenario. If there is a large table and a very
small table for regular join, without broadcasting the regular join, it can
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy
multiple copies of the small table using the union all and append random
values to alleviate data skewness.


Best,
Feng

On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
user@flink.apache.org> wrote:

> Hello folks,
>
>
> We have a use case where we have a few stream-stream joins, requiring us
> to join a very large table with a much smaller table, essentially enriching
> the large table with a permutation on the smaller table (Consider deriving
> all orders/sessions for a new location). Given the nature of the dataset,
> if we use a typical join that uses Hash distribution to co-locate the
> records for each join key, we end up with a very skewed join (a few task
> slots getting all of the work, as against a good distribution).
>
>
> We’ve internally implemented a Salting based solution where we salt the
> smaller table and join it with the larger table. While this works in the
> POC stage, we’d like to leverage flink as much as possible to do such a
> join.
>
>
> By the nature of the problem, a broadcast join seems theoretically
> helpful. We’ve done an exploration on query hints supported in Flink,
> starting with this FLIP
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
> and this FLIP
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
> .
>
>
> Currently, the Optimizer doesn't consider the Broadcast hint in the
> `Exchange` step of the join, when creating the physical plan (Possibly
> because the hint would require the stream-stream join to also support
> Broadcast join with SQL)
>
>
> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
> parsed from the query:
>
>
> ```sql
>
> ...
>
> ...
>
> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]])
>
> ...
>
> ```
>
>
> However, the Flink optimizer ignores the hint and still represents the
> join as a regular `hash` join in the `Exchange` step:
>
>
> ```sql
>
> ...
>
> ...
>
> :- Exchange(distribution=[hash[shop_id, join_key]])
>
> ...
>
> ```
>
>
> In Flink `StreamExecExchange`, the translation happens only via the
> `HASH` distribution type
> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
> unlike in the Flink `BatchExecExchange`, the translation can happen via a
> multitude of options
> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
> (`HASH/BROADCAST`).
>
>
>
> Quoting this Flink mailing list discussion
> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for
> the FLIP that implemented the Broadcast join hint for batch sql:
>
>
> > But currently, only in batch the optimizer has different Join strategies
> for Join and
>
> > there is no choice of join strategies in the stream. The join hints
> listed in the current
>
> > flip should be ignored (maybe can be warned) in streaming mode. When in
> the
>
> > future the stream mode has the choice of join strategies, I think that's
> a good time > to discuss that the join hint can affect the streaming SQL.
>
>
> What do you folks think about the possibility of a Broadcast join for
> Streaming Sql along with its corresponding Broadcast hint, that lets the
> user choose the kind of distribution they’d want with the dataset ?
>
> Happy to learn more about this and hopefully implement it, if it doesn’t
> sound like a terrible idea.
>
>
> Thanks,
>
> Prabhjot
>
>
>
>

Reply via email to