Hi, Just to clarify. You cannot dynamically switch the join strategy while a job is running. What Hequn suggested was to have a util method Util.joinDynamically(ds1, ds2) that chooses the join strategy when the program is generated (before it is submitted for execution).
The problem is that distributed joins are composed of a data distribution strategy (Broadcast-Forward, Partitioning) and a local execution strategy (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, ...). Switching the local strategy is sometimes possible but changing the data distribution strategy is much more involved because you'd need global coordination and re-distribute the data. Best, Fabian Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez < felipe.o.gutier...@gmail.com>: > I see, I am gonna try this. > Thanks Hequn > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Felipe, >> >> If I understand correctly, you also have to decide whether to broadcast >> the datastream from the right side before performing the function? >> >> One option is you can add a Util method to join dynamically, e.g., >> Util.joinDynamically(ds1, ds2). In the util method, you can implement your >> own strategy logic and decide whether to broadcast or use CoProcessFunction. >> >> Best, Hequn >> >> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi Hequn, >>> >>> I am implementing the broadcast and the regular join. As you said I need >>> different functions. My question is more about if I can have an operator >>> which decides beteween broadcast and regular join dynamically. I suppose I >>> will have to extend the generic TwoInputStreamOperator in Flink. Do you >>> have any suggestion? >>> >>> Thanks >>> >>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng, <chenghe...@gmail.com> wrote: >>> >>>> Hi Felipe, >>>> >>>> > I want to implement a join operator which can use different >>>> strategies for joining tuples. >>>> Not all kinds of join strategies can be applied to streaming jobs. Take >>>> sort-merge join as an example, it's impossible to sort an unbounded data. >>>> However, you can perform a window join and use the sort-merge strategy to >>>> join the data within a window. Even though, I'm not sure it's worth to do >>>> it considering the performance. >>>> >>>> > Therefore, I am not sure if I will need to implement my own operator >>>> to do this or if it is still possible to do with CoProcessFunction. >>>> You can't implement broadcast join with CoProcessFunction. But you can >>>> implement it with BroadcastProcessFunction or >>>> KeyedBroadcastProcessFunction, more details here[1]. >>>> >>>> Furthermore, you can take a look at the implementation of both window >>>> join and non-window join in Table API & SQL[2]. The code can be found >>>> here[3]. >>>> Hope this helps. >>>> >>>> Best, Hequn >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins >>>> [3] >>>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join >>>> >>>> >>>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I want to implement a join operator which can use different strategies >>>>> for joining tuples. I saw that with CoProcessFunction I am able to >>>>> implement low-level joins [1]. However, I do know how to decide between >>>>> different algorithms to join my tuples. >>>>> >>>>> On the other hand, to do a broadcast join I will need to use the >>>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am not >>>>> sure if I will need to implement my own operator to do this or if it is >>>>> still possible to do with CoProcessFunction. >>>>> >>>>> Does anyone have some clues for this matter? >>>>> Thanks >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>