I've seen enough demand for a streaming broadcast join in the community to justify a FLIP -- I think it's a good idea, and look forward to the discussion.
David On Fri, Feb 2, 2024 at 6:55 AM Feng Jin <jinfeng1...@gmail.com> wrote: > +1 a FLIP for this topic. > > > Best, > Feng > > On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi, >> >> I would definitely expect a FLIP on this topic before moving to >> implementation. >> >> Best regards, >> >> Martijn >> >> On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi, Prabhjot. >>> >>> IIUC, the main reasons why the community has not previously considered >>> supporting join hints only in batch mode are as follows: >>> 1. In batch mode, multiple join type algorithms were implemented quite >>> early on, and >>> 2. Stream processing represents a long-running scenario, and it is quite >>> difficult to determine whether a small table will become a large table >>> after a long period of operation. >>> >>> However, as you mentioned, join hints do indeed have their significance >>> in streaming. If you want to support the implementation of "join hints + >>> broadcast join" in streaming, the changes I can currently think of include: >>> 1. At optimizer, changing the exchange on the small table side to >>> broadcast instead of hash (InputProperty#BROADCAST). >>> 2. Unknown changes required at the table runtime level. >>> >>> You can also discuss it within the community through JIRA, FLIP, or the >>> dev mailing list. >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" < >>> user@flink.apache.org> wrote: >>> >>> Hi Feng, >>> >>> Thanks for your prompt response. >>> If we were to solve this in Flink, my higher level viewpoint is: >>> >>> 1. First to implement Broadcast join in Flink Streaming SQL, that works >>> across Table api (e.g. via a `left.join(right, <predicate>, >>> join_type="broadcast") >>> 2. Then, support a Broadcast hint that would utilize this new join based >>> on the hint type >>> >>> What do you think about this ? >>> Would you have some pointers on how/where to start on the first part ? >>> (I'm thinking we'd have to extend the Broadcast state pattern for this >>> purpose) >>> >>> Thanks, >>> Prabhjot >>> >>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote: >>> >>>> Hi Prabhjot >>>> >>>> I think this is a reasonable scenario. If there is a large table and a >>>> very small table for regular join, without broadcasting the regular join, >>>> it can easily cause data skew. >>>> We have also encountered similar problems too. Currently, we can only >>>> copy multiple copies of the small table using the union all and append >>>> random values to alleviate data skewness. >>>> >>>> >>>> Best, >>>> Feng >>>> >>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user < >>>> user@flink.apache.org> wrote: >>>> >>>>> Hello folks, >>>>> >>>>> >>>>> We have a use case where we have a few stream-stream joins, requiring >>>>> us to join a very large table with a much smaller table, essentially >>>>> enriching the large table with a permutation on the smaller table >>>>> (Consider >>>>> deriving all orders/sessions for a new location). Given the nature of the >>>>> dataset, if we use a typical join that uses Hash distribution to co-locate >>>>> the records for each join key, we end up with a very skewed join (a few >>>>> task slots getting all of the work, as against a good distribution). >>>>> >>>>> >>>>> We’ve internally implemented a Salting based solution where we salt >>>>> the smaller table and join it with the larger table. While this works in >>>>> the POC stage, we’d like to leverage flink as much as possible to do such >>>>> a >>>>> join. >>>>> >>>>> >>>>> By the nature of the problem, a broadcast join seems theoretically >>>>> helpful. We’ve done an exploration on query hints supported in Flink, >>>>> starting with this FLIP >>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> >>>>> and this FLIP >>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> >>>>> . >>>>> >>>>> >>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the >>>>> `Exchange` step of the join, when creating the physical plan (Possibly >>>>> because the hint would require the stream-stream join to also support >>>>> Broadcast join with SQL) >>>>> >>>>> >>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast >>>>> hint parsed from the query: >>>>> >>>>> >>>>> ```sql >>>>> >>>>> ... >>>>> >>>>> ... >>>>> >>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] >>>>> options:[gpla]]]]) >>>>> >>>>> ... >>>>> >>>>> ``` >>>>> >>>>> >>>>> However, the Flink optimizer ignores the hint and still represents the >>>>> join as a regular `hash` join in the `Exchange` step: >>>>> >>>>> >>>>> ```sql >>>>> >>>>> ... >>>>> >>>>> ... >>>>> >>>>> :- Exchange(distribution=[hash[shop_id, join_key]]) >>>>> >>>>> ... >>>>> >>>>> ``` >>>>> >>>>> >>>>> In Flink `StreamExecExchange`, the translation happens only via the >>>>> `HASH` distribution type >>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. >>>>> unlike in the Flink `BatchExecExchange`, the translation can happen via a >>>>> multitude of options >>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> >>>>> (`HASH/BROADCAST`). >>>>> >>>>> >>>>> >>>>> Quoting this Flink mailing list discussion >>>>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> >>>>> for the FLIP that implemented the Broadcast join hint for batch sql: >>>>> >>>>> >>>>> > But currently, only in batch the optimizer has different Join >>>>> strategies for Join and >>>>> >>>>> > there is no choice of join strategies in the stream. The join hints >>>>> listed in the current >>>>> >>>>> > flip should be ignored (maybe can be warned) in streaming mode. When >>>>> in the >>>>> >>>>> > future the stream mode has the choice of join strategies, I think >>>>> that's a good time > to discuss that the join hint can affect the >>>>> streaming >>>>> SQL. >>>>> >>>>> >>>>> What do you folks think about the possibility of a Broadcast join for >>>>> Streaming Sql along with its corresponding Broadcast hint, that lets the >>>>> user choose the kind of distribution they’d want with the dataset ? >>>>> >>>>> Happy to learn more about this and hopefully implement it, if it >>>>> doesn’t sound like a terrible idea. >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Prabhjot >>>>> >>>>> >>>>> >>>>>