+1 a FLIP to clarify the idea. Please be careful to choose which type of state you use here. The doc[1] says the broadcast state doesn't support RocksDB backend here.
Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations David Anderson <dander...@apache.org> 于2024年2月2日周五 23:57写道: > I've seen enough demand for a streaming broadcast join in the community to > justify a FLIP -- I think it's a good idea, and look forward to the > discussion. > > David > > On Fri, Feb 2, 2024 at 6:55 AM Feng Jin <jinfeng1...@gmail.com> wrote: > >> +1 a FLIP for this topic. >> >> >> Best, >> Feng >> >> On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser <martijnvis...@apache.org> >> wrote: >> >>> Hi, >>> >>> I would definitely expect a FLIP on this topic before moving to >>> implementation. >>> >>> Best regards, >>> >>> Martijn >>> >>> On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote: >>> >>>> Hi, Prabhjot. >>>> >>>> IIUC, the main reasons why the community has not previously considered >>>> supporting join hints only in batch mode are as follows: >>>> 1. In batch mode, multiple join type algorithms were implemented quite >>>> early on, and >>>> 2. Stream processing represents a long-running scenario, and it is >>>> quite difficult to determine whether a small table will become a large >>>> table after a long period of operation. >>>> >>>> However, as you mentioned, join hints do indeed have their significance >>>> in streaming. If you want to support the implementation of "join hints + >>>> broadcast join" in streaming, the changes I can currently think of include: >>>> 1. At optimizer, changing the exchange on the small table side to >>>> broadcast instead of hash (InputProperty#BROADCAST). >>>> 2. Unknown changes required at the table runtime level. >>>> >>>> You can also discuss it within the community through JIRA, FLIP, or the >>>> dev mailing list. >>>> >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" < >>>> user@flink.apache.org> wrote: >>>> >>>> Hi Feng, >>>> >>>> Thanks for your prompt response. >>>> If we were to solve this in Flink, my higher level viewpoint is: >>>> >>>> 1. First to implement Broadcast join in Flink Streaming SQL, that works >>>> across Table api (e.g. via a `left.join(right, <predicate>, >>>> join_type="broadcast") >>>> 2. Then, support a Broadcast hint that would utilize this new join >>>> based on the hint type >>>> >>>> What do you think about this ? >>>> Would you have some pointers on how/where to start on the first part ? >>>> (I'm thinking we'd have to extend the Broadcast state pattern for this >>>> purpose) >>>> >>>> Thanks, >>>> Prabhjot >>>> >>>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote: >>>> >>>>> Hi Prabhjot >>>>> >>>>> I think this is a reasonable scenario. If there is a large table and a >>>>> very small table for regular join, without broadcasting the regular join, >>>>> it can easily cause data skew. >>>>> We have also encountered similar problems too. Currently, we can only >>>>> copy multiple copies of the small table using the union all and append >>>>> random values to alleviate data skewness. >>>>> >>>>> >>>>> Best, >>>>> Feng >>>>> >>>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user < >>>>> user@flink.apache.org> wrote: >>>>> >>>>>> Hello folks, >>>>>> >>>>>> >>>>>> We have a use case where we have a few stream-stream joins, requiring >>>>>> us to join a very large table with a much smaller table, essentially >>>>>> enriching the large table with a permutation on the smaller table >>>>>> (Consider >>>>>> deriving all orders/sessions for a new location). Given the nature of the >>>>>> dataset, if we use a typical join that uses Hash distribution to >>>>>> co-locate >>>>>> the records for each join key, we end up with a very skewed join (a few >>>>>> task slots getting all of the work, as against a good distribution). >>>>>> >>>>>> >>>>>> We’ve internally implemented a Salting based solution where we salt >>>>>> the smaller table and join it with the larger table. While this works in >>>>>> the POC stage, we’d like to leverage flink as much as possible to do >>>>>> such a >>>>>> join. >>>>>> >>>>>> >>>>>> By the nature of the problem, a broadcast join seems theoretically >>>>>> helpful. We’ve done an exploration on query hints supported in Flink, >>>>>> starting with this FLIP >>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> >>>>>> and this FLIP >>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> >>>>>> . >>>>>> >>>>>> >>>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the >>>>>> `Exchange` step of the join, when creating the physical plan (Possibly >>>>>> because the hint would require the stream-stream join to also support >>>>>> Broadcast join with SQL) >>>>>> >>>>>> >>>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast >>>>>> hint parsed from the query: >>>>>> >>>>>> >>>>>> ```sql >>>>>> >>>>>> ... >>>>>> >>>>>> ... >>>>>> >>>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] >>>>>> options:[gpla]]]]) >>>>>> >>>>>> ... >>>>>> >>>>>> ``` >>>>>> >>>>>> >>>>>> However, the Flink optimizer ignores the hint and still represents >>>>>> the join as a regular `hash` join in the `Exchange` step: >>>>>> >>>>>> >>>>>> ```sql >>>>>> >>>>>> ... >>>>>> >>>>>> ... >>>>>> >>>>>> :- Exchange(distribution=[hash[shop_id, join_key]]) >>>>>> >>>>>> ... >>>>>> >>>>>> ``` >>>>>> >>>>>> >>>>>> In Flink `StreamExecExchange`, the translation happens only via the >>>>>> `HASH` distribution type >>>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. >>>>>> unlike in the Flink `BatchExecExchange`, the translation can happen via a >>>>>> multitude of options >>>>>> <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> >>>>>> (`HASH/BROADCAST`). >>>>>> >>>>>> >>>>>> >>>>>> Quoting this Flink mailing list discussion >>>>>> <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> >>>>>> for the FLIP that implemented the Broadcast join hint for batch sql: >>>>>> >>>>>> >>>>>> > But currently, only in batch the optimizer has different Join >>>>>> strategies for Join and >>>>>> >>>>>> > there is no choice of join strategies in the stream. The join hints >>>>>> listed in the current >>>>>> >>>>>> > flip should be ignored (maybe can be warned) in streaming mode. >>>>>> When in the >>>>>> >>>>>> > future the stream mode has the choice of join strategies, I think >>>>>> that's a good time > to discuss that the join hint can affect the >>>>>> streaming >>>>>> SQL. >>>>>> >>>>>> >>>>>> What do you folks think about the possibility of a Broadcast join for >>>>>> Streaming Sql along with its corresponding Broadcast hint, that lets the >>>>>> user choose the kind of distribution they’d want with the dataset ? >>>>>> >>>>>> Happy to learn more about this and hopefully implement it, if it >>>>>> doesn’t sound like a terrible idea. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Prabhjot >>>>>> >>>>>> >>>>>> >>>>>>