Hello folks,

We have a use case where we have a few stream-stream joins, requiring us to
join a very large table with a much smaller table, essentially enriching
the large table with a permutation on the smaller table (Consider deriving
all orders/sessions for a new location). Given the nature of the dataset,
if we use a typical join that uses Hash distribution to co-locate the
records for each join key, we end up with a very skewed join (a few task
slots getting all of the work, as against a good distribution).


We’ve internally implemented a Salting based solution where we salt the
smaller table and join it with the larger table. While this works in the
POC stage, we’d like to leverage flink as much as possible to do such a
join.


By the nature of the problem, a broadcast join seems theoretically helpful.
We’ve done an exploration on query hints supported in Flink, starting with this
FLIP
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
and this FLIP
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
.


Currently, the Optimizer doesn't consider the Broadcast hint in the
`Exchange` step of the join, when creating the physical plan (Possibly
because the hint would require the stream-stream join to also support
Broadcast join with SQL)


Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
parsed from the query:


```sql

...

...

joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]])

...

```


However, the Flink optimizer ignores the hint and still represents the join
as a regular `hash` join in the `Exchange` step:


```sql

...

...

:- Exchange(distribution=[hash[shop_id, join_key]])

...

```


In Flink `StreamExecExchange`, the translation happens only via the `HASH`
distribution type
<https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
unlike in the Flink `BatchExecExchange`, the translation can happen via a
multitude of options
<https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
(`HASH/BROADCAST`).



Quoting this Flink mailing list discussion
<https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for the
FLIP that implemented the Broadcast join hint for batch sql:


> But currently, only in batch the optimizer has different Join strategies
for Join and

> there is no choice of join strategies in the stream. The join hints
listed in the current

> flip should be ignored (maybe can be warned) in streaming mode. When in
the

> future the stream mode has the choice of join strategies, I think that's
a good time > to discuss that the join hint can affect the streaming SQL.


What do you folks think about the possibility of a Broadcast join for
Streaming Sql along with its corresponding Broadcast hint, that lets the
user choose the kind of distribution they’d want with the dataset ?

Happy to learn more about this and hopefully implement it, if it doesn’t
sound like a terrible idea.


Thanks,

Prabhjot

Reply via email to