Hi, Prabhjot.
IIUC, the main reasons why the community has not previously considered supporting join hints only in batch mode are as follows: 1. In batch mode, multiple join type algorithms were implemented quite early on, and 2. Stream processing represents a long-running scenario, and it is quite difficult to determine whether a small table will become a large table after a long period of operation. However, as you mentioned, join hints do indeed have their significance in streaming. If you want to support the implementation of "join hints + broadcast join" in streaming, the changes I can currently think of include: 1. At optimizer, changing the exchange on the small table side to broadcast instead of hash (InputProperty#BROADCAST). 2. Unknown changes required at the table runtime level. You can also discuss it within the community through JIRA, FLIP, or the dev mailing list. -- Best! Xuyang At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <user@flink.apache.org> wrote: Hi Feng, Thanks for your prompt response. If we were to solve this in Flink, my higher level viewpoint is: 1. First to implement Broadcast join in Flink Streaming SQL, that works across Table api (e.g. via a `left.join(right, <predicate>, join_type="broadcast") 2. Then, support a Broadcast hint that would utilize this new join based on the hint type What do you think about this ? Would you have some pointers on how/where to start on the first part ? (I'm thinking we'd have to extend the Broadcast state pattern for this purpose) Thanks, Prabhjot On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote: Hi Prabhjot I think this is a reasonable scenario. If there is a large table and a very small table for regular join, without broadcasting the regular join, it can easily cause data skew. We have also encountered similar problems too. Currently, we can only copy multiple copies of the small table using the union all and append random values to alleviate data skewness. Best, Feng On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <user@flink.apache.org> wrote: Hello folks, We have a use case where we have a few stream-stream joins, requiring us to join a very large table with a much smaller table, essentially enriching the large table with a permutation on the smaller table (Consider deriving all orders/sessions for a new location). Given the nature of the dataset, if we use a typical join that uses Hash distribution to co-locate the records for each join key, we end up with a very skewed join (a few task slots getting all of the work, as against a good distribution). We’ve internally implemented a Salting based solution where we salt the smaller table and join it with the larger table. While this works in the POC stage, we’d like to leverage flink as much as possible to do such a join. By the nature of the problem, a broadcast join seems theoretically helpful. We’ve done an exploration on query hints supported in Flink, starting withthis FLIP andthis FLIP. Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange` step of the join, when creating the physical plan (Possibly because the hint would require the stream-stream join to also support Broadcast join with SQL) Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed from the query: ```sql ... ... joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]]) ... ``` However, the Flink optimizer ignores the hint and still represents the join as a regular `hash` join in the `Exchange` step: ```sql ... ... :- Exchange(distribution=[hash[shop_id, join_key]]) ... ``` In Flink `StreamExecExchange`, the translation happens only viathe `HASH` distribution type. unlike in the Flink `BatchExecExchange`, the translation can happen viaa multitude of options (`HASH/BROADCAST`). Quotingthis Flink mailing list discussion for the FLIP that implemented the Broadcast join hint for batch sql: > But currently, only in batch the optimizer has different Join strategies for > Join and > there is no choice of join strategies in the stream. The join hints listed in > the current > flip should be ignored (maybe can be warned) in streaming mode. When in the > future the stream mode has the choice of join strategies, I think that's a > good time > to discuss that the join hint can affect the streaming SQL. What do you folks think about the possibility of a Broadcast join for Streaming Sql along with its corresponding Broadcast hint, that lets the user choose the kind of distribution they’d want with the dataset ? Happy to learn more about this and hopefully implement it, if it doesn’t sound like a terrible idea. Thanks, Prabhjot