Hello folks,
We have a use case where we have a few stream-stream joins, requiring us to join a very large table with a much smaller table, essentially enriching the large table with a permutation on the smaller table (Consider deriving all orders/sessions for a new location). Given the nature of the dataset, if we use a typical join that uses Hash distribution to co-locate the records for each join key, we end up with a very skewed join (a few task slots getting all of the work, as against a good distribution). We’ve internally implemented a Salting based solution where we salt the smaller table and join it with the larger table. While this works in the POC stage, we’d like to leverage flink as much as possible to do such a join. By the nature of the problem, a broadcast join seems theoretically helpful. We’ve done an exploration on query hints supported in Flink, starting with this FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> and this FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> . Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange` step of the join, when creating the physical plan (Possibly because the hint would require the stream-stream join to also support Broadcast join with SQL) Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed from the query: ```sql ... ... joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]]) ... ``` However, the Flink optimizer ignores the hint and still represents the join as a regular `hash` join in the `Exchange` step: ```sql ... ... :- Exchange(distribution=[hash[shop_id, join_key]]) ... ``` In Flink `StreamExecExchange`, the translation happens only via the `HASH` distribution type <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. unlike in the Flink `BatchExecExchange`, the translation can happen via a multitude of options <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> (`HASH/BROADCAST`). Quoting this Flink mailing list discussion <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for the FLIP that implemented the Broadcast join hint for batch sql: > But currently, only in batch the optimizer has different Join strategies for Join and > there is no choice of join strategies in the stream. The join hints listed in the current > flip should be ignored (maybe can be warned) in streaming mode. When in the > future the stream mode has the choice of join strategies, I think that's a good time > to discuss that the join hint can affect the streaming SQL. What do you folks think about the possibility of a Broadcast join for Streaming Sql along with its corresponding Broadcast hint, that lets the user choose the kind of distribution they’d want with the dataset ? Happy to learn more about this and hopefully implement it, if it doesn’t sound like a terrible idea. Thanks, Prabhjot