Hi, I would definitely expect a FLIP on this topic before moving to implementation.
Best regards, Martijn On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote: > Hi, Prabhjot. > > IIUC, the main reasons why the community has not previously considered > supporting join hints only in batch mode are as follows: > 1. In batch mode, multiple join type algorithms were implemented quite > early on, and > 2. Stream processing represents a long-running scenario, and it is quite > difficult to determine whether a small table will become a large table > after a long period of operation. > > However, as you mentioned, join hints do indeed have their significance in > streaming. If you want to support the implementation of "join hints + > broadcast join" in streaming, the changes I can currently think of include: > 1. At optimizer, changing the exchange on the small table side to > broadcast instead of hash (InputProperty#BROADCAST). > 2. Unknown changes required at the table runtime level. > > You can also discuss it within the community through JIRA, FLIP, or the > dev mailing list. > > > -- > Best! > Xuyang > > > At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <user@flink.apache.org> > wrote: > > Hi Feng, > > Thanks for your prompt response. > If we were to solve this in Flink, my higher level viewpoint is: > > 1. First to implement Broadcast join in Flink Streaming SQL, that works > across Table api (e.g. via a `left.join(right, <predicate>, > join_type="broadcast") > 2. Then, support a Broadcast hint that would utilize this new join based > on the hint type > > What do you think about this ? > Would you have some pointers on how/where to start on the first part ? > (I'm thinking we'd have to extend the Broadcast state pattern for this > purpose) > > Thanks, > Prabhjot > > On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote: > >> Hi Prabhjot >> >> I think this is a reasonable scenario. If there is a large table and a >> very small table for regular join, without broadcasting the regular join, >> it can easily cause data skew. >> We have also encountered similar problems too. Currently, we can only >> copy multiple copies of the small table using the union all and append >> random values to alleviate data skewness. >> >> >> Best, >> Feng >> >> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user < >> user@flink.apache.org> wrote: >> >>> Hello folks, >>> >>> >>> We have a use case where we have a few stream-stream joins, requiring us >>> to join a very large table with a much smaller table, essentially enriching >>> the large table with a permutation on the smaller table (Consider deriving >>> all orders/sessions for a new location). Given the nature of the dataset, >>> if we use a typical join that uses Hash distribution to co-locate the >>> records for each join key, we end up with a very skewed join (a few task >>> slots getting all of the work, as against a good distribution). >>> >>> >>> We’ve internally implemented a Salting based solution where we salt the >>> smaller table and join it with the larger table. While this works in the >>> POC stage, we’d like to leverage flink as much as possible to do such a >>> join. >>> >>> >>> By the nature of the problem, a broadcast join seems theoretically >>> helpful. We’ve done an exploration on query hints supported in Flink, >>> starting with this FLIP >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> >>> and this FLIP >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> >>> . >>> >>> >>> Currently, the Optimizer doesn't consider the Broadcast hint in the >>> `Exchange` step of the join, when creating the physical plan (Possibly >>> because the hint would require the stream-stream join to also support >>> Broadcast join with SQL) >>> >>> >>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint >>> parsed from the query: >>> >>> >>> ```sql >>> >>> ... >>> >>> ... >>> >>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] >>> options:[gpla]]]]) >>> >>> ... >>> >>> ``` >>> >>> >>> However, the Flink optimizer ignores the hint and still represents the >>> join as a regular `hash` join in the `Exchange` step: >>> >>> >>> ```sql >>> >>> ... >>> >>> ... >>> >>> :- Exchange(distribution=[hash[shop_id, join_key]]) >>> >>> ... >>> >>> ``` >>> >>> >>> In Flink `StreamExecExchange`, the translation happens only via the >>> `HASH` distribution type >>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. >>> unlike in the Flink `BatchExecExchange`, the translation can happen via a >>> multitude of options >>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> >>> (`HASH/BROADCAST`). >>> >>> >>> >>> Quoting this Flink mailing list discussion >>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for >>> the FLIP that implemented the Broadcast join hint for batch sql: >>> >>> >>> > But currently, only in batch the optimizer has different Join >>> strategies for Join and >>> >>> > there is no choice of join strategies in the stream. The join hints >>> listed in the current >>> >>> > flip should be ignored (maybe can be warned) in streaming mode. When >>> in the >>> >>> > future the stream mode has the choice of join strategies, I think >>> that's a good time > to discuss that the join hint can affect the streaming >>> SQL. >>> >>> >>> What do you folks think about the possibility of a Broadcast join for >>> Streaming Sql along with its corresponding Broadcast hint, that lets the >>> user choose the kind of distribution they’d want with the dataset ? >>> >>> Happy to learn more about this and hopefully implement it, if it doesn’t >>> sound like a terrible idea. >>> >>> >>> Thanks, >>> >>> Prabhjot >>> >>> >>> >>>