Hi Feng, Thanks for your prompt response. If we were to solve this in Flink, my higher level viewpoint is:
1. First to implement Broadcast join in Flink Streaming SQL, that works across Table api (e.g. via a `left.join(right, <predicate>, join_type="broadcast") 2. Then, support a Broadcast hint that would utilize this new join based on the hint type What do you think about this ? Would you have some pointers on how/where to start on the first part ? (I'm thinking we'd have to extend the Broadcast state pattern for this purpose) Thanks, Prabhjot On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote: > Hi Prabhjot > > I think this is a reasonable scenario. If there is a large table and a > very small table for regular join, without broadcasting the regular join, > it can easily cause data skew. > We have also encountered similar problems too. Currently, we can only copy > multiple copies of the small table using the union all and append random > values to alleviate data skewness. > > > Best, > Feng > > On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user < > user@flink.apache.org> wrote: > >> Hello folks, >> >> >> We have a use case where we have a few stream-stream joins, requiring us >> to join a very large table with a much smaller table, essentially enriching >> the large table with a permutation on the smaller table (Consider deriving >> all orders/sessions for a new location). Given the nature of the dataset, >> if we use a typical join that uses Hash distribution to co-locate the >> records for each join key, we end up with a very skewed join (a few task >> slots getting all of the work, as against a good distribution). >> >> >> We’ve internally implemented a Salting based solution where we salt the >> smaller table and join it with the larger table. While this works in the >> POC stage, we’d like to leverage flink as much as possible to do such a >> join. >> >> >> By the nature of the problem, a broadcast join seems theoretically >> helpful. We’ve done an exploration on query hints supported in Flink, >> starting with this FLIP >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> >> and this FLIP >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> >> . >> >> >> Currently, the Optimizer doesn't consider the Broadcast hint in the >> `Exchange` step of the join, when creating the physical plan (Possibly >> because the hint would require the stream-stream join to also support >> Broadcast join with SQL) >> >> >> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint >> parsed from the query: >> >> >> ```sql >> >> ... >> >> ... >> >> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] >> options:[gpla]]]]) >> >> ... >> >> ``` >> >> >> However, the Flink optimizer ignores the hint and still represents the >> join as a regular `hash` join in the `Exchange` step: >> >> >> ```sql >> >> ... >> >> ... >> >> :- Exchange(distribution=[hash[shop_id, join_key]]) >> >> ... >> >> ``` >> >> >> In Flink `StreamExecExchange`, the translation happens only via the >> `HASH` distribution type >> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. >> unlike in the Flink `BatchExecExchange`, the translation can happen via a >> multitude of options >> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> >> (`HASH/BROADCAST`). >> >> >> >> Quoting this Flink mailing list discussion >> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for >> the FLIP that implemented the Broadcast join hint for batch sql: >> >> >> > But currently, only in batch the optimizer has different Join >> strategies for Join and >> >> > there is no choice of join strategies in the stream. The join hints >> listed in the current >> >> > flip should be ignored (maybe can be warned) in streaming mode. When in >> the >> >> > future the stream mode has the choice of join strategies, I think >> that's a good time > to discuss that the join hint can affect the streaming >> SQL. >> >> >> What do you folks think about the possibility of a Broadcast join for >> Streaming Sql along with its corresponding Broadcast hint, that lets the >> user choose the kind of distribution they’d want with the dataset ? >> >> Happy to learn more about this and hopefully implement it, if it doesn’t >> sound like a terrible idea. >> >> >> Thanks, >> >> Prabhjot >> >> >> >>