Hi, Prabhjot. 

IIUC, the main reasons why the community has not previously considered 
supporting join hints only in batch mode are as follows: 
1. In batch mode, multiple join type algorithms were implemented quite early 
on, and 
2. Stream processing represents a long-running scenario, and it is quite 
difficult to determine whether a small table will become a large table after a 
long period of operation. 


However, as you mentioned, join hints do indeed have their significance in 
streaming. If you want to support the implementation of "join hints + broadcast 
join" in streaming, the changes I can currently think of include: 
1. At optimizer, changing the exchange on the small table side to broadcast 
instead of hash (InputProperty#BROADCAST). 
2. Unknown changes required at the table runtime level. 


You can also discuss it within the community through JIRA, FLIP, or the dev 
mailing list.




--

    Best!
    Xuyang




At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <user@flink.apache.org> 
wrote:

Hi Feng,


Thanks for your prompt response.
If we were to solve this in Flink, my higher level viewpoint is:


1. First to implement Broadcast join in Flink Streaming SQL, that works across 
Table api (e.g. via a `left.join(right, <predicate>, join_type="broadcast")
2. Then, support a Broadcast hint that would utilize this new join based on the 
hint type


What do you think about this ?
Would you have some pointers on how/where to start on the first part ? (I'm 
thinking we'd have to extend the Broadcast state pattern for this purpose)


Thanks,
Prabhjot


On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote:

Hi Prabhjot


I think this is a reasonable scenario. If there is a large table and a very 
small table for regular join, without broadcasting the regular join, it can 
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy 
multiple copies of the small table using the union all and append random values 
to alleviate data skewness.




Best,
Feng



On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user 
<user@flink.apache.org> wrote:


Hello folks,




We have a use case where we have a few stream-stream joins, requiring us to 
join a very large table with a much smaller table, essentially enriching the 
large table with a permutation on the smaller table (Consider deriving all 
orders/sessions for a new location). Given the nature of the dataset, if we use 
a typical join that uses Hash distribution to co-locate the records for each 
join key, we end up with a very skewed join (a few task slots getting all of 
the work, as against a good distribution).




We’ve internally implemented a Salting based solution where we salt the smaller 
table and join it with the larger table. While this works in the POC stage, 
we’d like to leverage flink as much as possible to do such a join.




By the nature of the problem, a broadcast join seems theoretically helpful. 
We’ve done an exploration on query hints supported in Flink, starting withthis 
FLIP andthis FLIP. 




Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange` 
step of the join, when creating the physical plan (Possibly because the hint 
would require the stream-stream join to also support Broadcast join with SQL)




Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed 
from the query:




```sql

...

...

joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]])

...

```




However, the Flink optimizer ignores the hint and still represents the join as 
a regular `hash` join in the `Exchange` step:




```sql

...

...

:- Exchange(distribution=[hash[shop_id, join_key]])

...

```




In Flink `StreamExecExchange`, the translation happens only viathe `HASH` 
distribution type. unlike in the Flink `BatchExecExchange`, the translation can 
happen viaa multitude of options (`HASH/BROADCAST`).





Quotingthis Flink mailing list discussion for the FLIP that implemented the 
Broadcast join hint for batch sql:




> But currently, only in batch the optimizer has different Join strategies for 
> Join and

> there is no choice of join strategies in the stream. The join hints listed in 
> the current 

> flip should be ignored (maybe can be warned) in streaming mode. When in the

> future the stream mode has the choice of join strategies, I think that's a 
> good time > to discuss that the join hint can affect the streaming SQL.




What do you folks think about the possibility of a Broadcast join for Streaming 
Sql along with its corresponding Broadcast hint, that lets the user choose the 
kind of distribution they’d want with the dataset ?

Happy to learn more about this and hopefully implement it, if it doesn’t sound 
like a terrible idea.




Thanks,

Prabhjot



Reply via email to