Hi Fabian, thanks for jumping within this thread. Do you think there is possible to extend any join generic operator in order to make it a little dynamic? I was thinking that after I process a checkpoint I can change the join strategy.
and if you do, do you have any toy example of this? Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > Just to clarify. You cannot dynamically switch the join strategy while a > job is running. > What Hequn suggested was to have a util method Util.joinDynamically(ds1, > ds2) that chooses the join strategy when the program is generated (before > it is submitted for execution). > > The problem is that distributed joins are composed of a data distribution > strategy (Broadcast-Forward, Partitioning) and a local execution strategy > (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, ...). > Switching the local strategy is sometimes possible but changing the data > distribution strategy is much more involved because you'd need global > coordination and re-distribute the data. > > Best, > Fabian > > Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez < > felipe.o.gutier...@gmail.com>: > >> I see, I am gonna try this. >> Thanks Hequn >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Felipe, >>> >>> If I understand correctly, you also have to decide whether to broadcast >>> the datastream from the right side before performing the function? >>> >>> One option is you can add a Util method to join dynamically, e.g., >>> Util.joinDynamically(ds1, ds2). In the util method, you can implement your >>> own strategy logic and decide whether to broadcast or use CoProcessFunction. >>> >>> Best, Hequn >>> >>> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> Hi Hequn, >>>> >>>> I am implementing the broadcast and the regular join. As you said I >>>> need different functions. My question is more about if I can have an >>>> operator which decides beteween broadcast and regular join dynamically. I >>>> suppose I will have to extend the generic TwoInputStreamOperator in Flink. >>>> Do you have any suggestion? >>>> >>>> Thanks >>>> >>>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng, <chenghe...@gmail.com> wrote: >>>> >>>>> Hi Felipe, >>>>> >>>>> > I want to implement a join operator which can use different >>>>> strategies for joining tuples. >>>>> Not all kinds of join strategies can be applied to streaming jobs. >>>>> Take sort-merge join as an example, it's impossible to sort an unbounded >>>>> data. However, you can perform a window join and use the sort-merge >>>>> strategy to join the data within a window. Even though, I'm not sure it's >>>>> worth to do it considering the performance. >>>>> >>>>> > Therefore, I am not sure if I will need to implement my own operator >>>>> to do this or if it is still possible to do with CoProcessFunction. >>>>> You can't implement broadcast join with CoProcessFunction. But you can >>>>> implement it with BroadcastProcessFunction or >>>>> KeyedBroadcastProcessFunction, more details here[1]. >>>>> >>>>> Furthermore, you can take a look at the implementation of both window >>>>> join and non-window join in Table API & SQL[2]. The code can be found >>>>> here[3]. >>>>> Hope this helps. >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins >>>>> [3] >>>>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join >>>>> >>>>> >>>>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I want to implement a join operator which can use different >>>>>> strategies for joining tuples. I saw that with CoProcessFunction I am >>>>>> able >>>>>> to implement low-level joins [1]. However, I do know how to decide >>>>>> between >>>>>> different algorithms to join my tuples. >>>>>> >>>>>> On the other hand, to do a broadcast join I will need to use the >>>>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am >>>>>> not >>>>>> sure if I will need to implement my own operator to do this or if it is >>>>>> still possible to do with CoProcessFunction. >>>>>> >>>>>> Does anyone have some clues for this matter? >>>>>> Thanks >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>>>> *--* >>>>>> *-- Felipe Gutierrez* >>>>>> >>>>>> *-- skype: felipe.o.gutierrez* >>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>> >>>>>