Yes, glad to see that there is already a PR for such optimization. Best, Kurt
On Mon, Aug 26, 2019 at 6:59 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > Xiaogang, those things worry me the most. > > 1. Regarding the broadcasting, doesn’t the BroadcastState [1] cover our > issues? Can not we construct a job graph, where one operator has two > outputs, one keyed another broadcasted, which are wired together back to > the KeyedBroadcastProcessFunction or BroadcastProcessFunction? > > 2. Multicast on keyed streams, might be done by iterating over all of the > keys. However I have a feeling that might not be the feature which > distributed cross/theta joins would want, since they would probably need a > guarantee to have only a single key per operator instance. > > Kurt, by broadcast optimisation do you mean [2]? > > I’m not sure if we should split the discussion yet. Most of the changes > required by either multicast or broadcast will be in the API/state layers. > Runtime changes for broadcast would be almost none (just exposing existing > features) and for multicast they shouldn't be huge as well. However maybe > we should consider those two things together at the API level, so that we > do not make wrong decisions when just looking at the simpler/more narrow > broadcast support? > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > < > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > > [2] https://github.com/apache/flink/pull/7713 < > https://github.com/apache/flink/pull/7713> > > > On 26 Aug 2019, at 09:35, Kurt Young <ykt...@gmail.com> wrote: > > > > From SQL's perspective, distributed cross join is a valid feature but not > > very > > urgent. Actually this discuss reminds me about another useful feature > > (sorry > > for the distraction): > > > > when doing broadcast in batch shuffle mode, we can make each producer > only > > write one copy of the output data, but not for every consumer. Broadcast > > join > > is much more useful, and this is a very important optimization. Not sure > if > > we > > have already consider this. > > > > Best, > > Kurt > > > > > > On Mon, Aug 26, 2019 at 12:16 PM Guowei Ma <guowei....@gmail.com> wrote: > > > >> Thanks Yun for bringing up this discussion and very thanks for all the > deep > >> thoughts! > >> > >> For now, I think this discussion contains two scenarios: one if for > >> iteration library support and the other is for SQL join support. I think > >> both of the two scenarios are useful but they seem to have different > best > >> suitable solutions. For making the discussion more clear, I would > suggest > >> to split the discussion into two threads. > >> > >> And I agree with Piotr that it is very tricky that a keyed stream > received > >> a "broadcast element". So we may add some new interfaces, which could > >> broadcast or process some special "broadcast event". In that way > "broadcast > >> event" will not be sent with the normal process. > >> > >> Best, > >> Guowei > >> > >> > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年8月26日周一 上午9:27写道: > >> > >>> Hi all, > >>> > >>> I also think that multicasting is a necessity in Flink, but more > details > >>> are needed to be considered. > >>> > >>> Currently network is tightly coupled with states in Flink to achieve > >>> automatic scaling. We can only access keyed states in keyed streams and > >>> operator states in all streams. > >>> In the concrete example of theta-joins implemented with mutlticasting, > >> the > >>> following questions exist: > >>> > >>> - In which type of states will the data be stored? Do we need another > >>> type of states which is coupled with multicasting streams? > >>> - How to ensure the consistency between network and states when jobs > >>> scale out or scale in? > >>> > >>> Regards, > >>> Xiaogang > >>> > >>> Xingcan Cui <xingc...@gmail.com> 于2019年8月25日周日 上午10:03写道: > >>> > >>>> Hi all, > >>>> > >>>> Sorry for joining this thread late. Basically, I think enabling > >> multicast > >>>> pattern could be the right direction, but more detailed implementation > >>>> policies need to be discussed. > >>>> > >>>> Two years ago, I filed an issue [1] about the multicast API. However, > >> due > >>>> to some reasons, it was laid aside. After that, when I tried to > >>> cherry-pick > >>>> the change for experimental use, I found the return type of > >>>> `selectChannels()` method had changed from `int[]` to `int`, which > >> makes > >>>> the old implementation not work anymore. > >>>> > >>>> From my side, the multicast has always been used for theta-join. As > far > >>> as > >>>> I know, it’s an essential requirement for some sophisticated joining > >>>> algorithms. Until now, the Flink non-equi joins can still only be > >>> executed > >>>> single-threaded. If we'd like to make some improvements on this, we > >>> should > >>>> first take some measures to support multicast pattern. > >>>> > >>>> Best, > >>>> Xingcan > >>>> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-6936 > >>>> > >>>>> On Aug 24, 2019, at 5:54 AM, Zhu Zhu <reed...@gmail.com> wrote: > >>>>> > >>>>> Hi Piotr, > >>>>> > >>>>> Thanks for the explanation. > >>>>> Agreed that the broadcastEmit(record) is a better choice for > >>> broadcasting > >>>>> for the iterations. > >>>>> As broadcasting for the iterations is the first motivation, let's > >>> support > >>>>> it first. > >>>>> > >>>>> Thanks, > >>>>> Zhu Zhu > >>>>> > >>>>> Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道: > >>>>> > >>>>>> Hi Piotr, > >>>>>> > >>>>>> Very thanks for the suggestions! > >>>>>> > >>>>>> Totally agree with that we could first focus on the broadcast > >>>>>> scenarios and exposing the broadcastEmit method first considering > >> the > >>>>>> semantics and performance. > >>>>>> > >>>>>> For the keyed stream, I also agree with that broadcasting keyed > >>>>>> records to all the tasks may be confused considering the semantics > >> of > >>>> keyed > >>>>>> partitioner. However, in the iteration case supporting broadcast > >> over > >>>> keyed > >>>>>> partitioner should be required since users may create any subgraph > >> for > >>>> the > >>>>>> iteration body, including the operators with key. I think a possible > >>>>>> solution to this issue is to introduce another data type for > >>>>>> 'broadcastEmit'. For example, for an operator Operator<T>, it may > >>>> broadcast > >>>>>> emit another type E instead of T, and the transmitting E will bypass > >>> the > >>>>>> partitioner and setting keyed context. This should result in the > >>> design > >>>> to > >>>>>> introduce customized operator event (option 1 in the document). The > >>>> cost of > >>>>>> this method is that we need to introduce a new type of StreamElement > >>> and > >>>>>> new interface for this type, but it should be suitable for both > >> keyed > >>> or > >>>>>> non-keyed partitioner. > >>>>>> > >>>>>> Best, > >>>>>> Yun > >>>>>> > >>>>>> > >>>>>> > >>>>>> ------------------------------------------------------------------ > >>>>>> From:Piotr Nowojski <pi...@ververica.com> > >>>>>> Send Time:2019 Aug. 23 (Fri.) 22:29 > >>>>>> To:Zhu Zhu <reed...@gmail.com> > >>>>>> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com> > >>>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication > >>>> Pattern > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> If the primary motivation is broadcasting (for the iterations) and > >> we > >>>> have > >>>>>> no immediate need for multicast (cross join), I would prefer to > >> first > >>>>>> expose broadcast via the DataStream API and only later, once we > >>> finally > >>>>>> need it, support multicast. As I wrote, multicast would be more > >>>> challenging > >>>>>> to implement, with more complicated runtime and API. And re-using > >>>> multicast > >>>>>> just to support broadcast doesn’t have much sense: > >>>>>> > >>>>>> 1. It’s a bit obfuscated. It’s easier to understand > >>>>>> collectBroadcast(record) or broadcastEmit(record) compared to some > >>>>>> multicast channel selector that just happens to return all of the > >>>> channels. > >>>>>> 2. There are performance benefits of explicitly calling > >>>>>> `RecordWriter#broadcastEmit`. > >>>>>> > >>>>>> > >>>>>> On a different note, what would be the semantic of such broadcast > >> emit > >>>> on > >>>>>> KeyedStream? Would it be supported? Or would we limit support only > >> to > >>>> the > >>>>>> non-keyed streams? > >>>>>> > >>>>>> Piotrek > >>>>>> > >>>>>>> On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote: > >>>>>>> > >>>>>>> Thanks Piotr, > >>>>>>> > >>>>>>> Users asked for this feature sometimes ago when they migrating > >> batch > >>>>>> jobs to Flink(Blink). > >>>>>>> It's not very urgent as they have taken some workarounds to solve > >>>>>> it.(like partitioning data set to different job vertices) > >>>>>>> So it's fine to not make it top priority. > >>>>>>> > >>>>>>> Anyway, as a commonly known scenario, I think users can benefit > >> from > >>>>>> cross join sooner or later. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Zhu Zhu > >>>>>>> > >>>>>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> > >>>>>> 于2019年8月23日周五 下午6:19写道: > >>>>>>> Hi, > >>>>>>> > >>>>>>> Thanks for the answers :) Ok I understand the full picture now. +1 > >>> from > >>>>>> my side on solving this issue somehow. But before we start > >> discussing > >>>> how > >>>>>> to solve it one last control question: > >>>>>>> > >>>>>>> I guess this multicast is intended to be used in blink planner, > >>> right? > >>>>>> Assuming that we implement the multicast support now, when would it > >> be > >>>> used > >>>>>> by the blink? I would like to avoid a scenario, where we implement > >> an > >>>>>> unused feature and we keep maintaining it for a long period of time. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>> PS, try to include motivating examples, including concrete ones in > >>> the > >>>>>> proposals/design docs, for example in the very first paragraph. > >>>> Especially > >>>>>> if it’s a commonly known feature like cross join :) > >>>>>>> > >>>>>>>> On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID> > >>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi Piotr, > >>>>>>>> > >>>>>>>> Thanks a lot for sharing the thoughts! > >>>>>>>> > >>>>>>>> For the iteration, agree with that multicasting is not > >>>>>> necessary. Exploring the broadcast interface to Output of the > >>> operators > >>>> in > >>>>>> some way should also solve this issue, and I think it should be even > >>>> more > >>>>>> convenient to have the broadcast method for the iteration. > >>>>>>>> > >>>>>>>> Also thanks Zhu Zhu for the cross join case! > >>>>>>>> Best, > >>>>>>>> Yun > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> ------------------------------------------------------------------ > >>>>>>>> From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>> > >>>>>>>> Send Time:2019 Aug. 23 (Fri.) 17:25 > >>>>>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> > >>>>>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> > >>>>>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication > >>>>>> Pattern > >>>>>>>> > >>>>>>>> Hi Piotr, > >>>>>>>> > >>>>>>>> Yes you are right it's a distributed cross join requirement. > >>>>>>>> Broadcast join can help with cross join cases. But users cannot > >> use > >>> it > >>>>>> if the data set to join is too large to fit into one subtask. > >>>>>>>> > >>>>>>>> Sorry for left some details behind. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Zhu Zhu > >>>>>>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> > >>>>>> 于2019年8月23日周五 下午4:57写道: > >>>>>>>> Hi Yun and Zhu Zhu, > >>>>>>>> > >>>>>>>> Thanks for the more detailed example Zhu Zhu. > >>>>>>>> > >>>>>>>> As far as I understand for the iterations example we do not need > >>>>>> multicasting. Regarding the Join example, I don’t fully understand > >> it. > >>>> The > >>>>>> example that Zhu Zhu presented has a drawback of sending both tables > >>> to > >>>>>> multiple nodes. What’s the benefit of using broadcast join over a > >> hash > >>>> join > >>>>>> in such case? As far as I know, the biggest benefit of using > >> broadcast > >>>> join > >>>>>> instead of hash join is that we can avoid sending the larger table > >>> over > >>>> the > >>>>>> network, because we can perform the join locally. In this example we > >>> are > >>>>>> sending both of the tables to multiple nodes, which should defeat > >> the > >>>>>> purpose. > >>>>>>>> > >>>>>>>> Is it about implementing cross join or near cross joins in a > >>>>>> distributed fashion? > >>>>>>>> > >>>>>>>>> if we introduce a new MulticastRecordWriter > >>>>>>>> > >>>>>>>> That’s one of the solutions. It might have a drawback of 3 class > >>>>>> virtualisation problem (We have RecordWriter and > >> BroadcastRecordWriter > >>>>>> already). With up to two implementations, JVM is able to > >> devirtualise > >>>> the > >>>>>> calls. > >>>>>>>> > >>>>>>>> Previously I was also thinking about just providing two different > >>>>>> ChannelSelector interfaces. One with `int[]` and > >>> `SingleChannelSelector` > >>>>>> with plain `int` and based on that, RecordWriter could perform some > >>>> magic > >>>>>> (worst case scenario `instaceof` checks). > >>>>>>>> > >>>>>>>> Another solution might be to change `ChannelSelector` interface > >> into > >>>>>> an iterator. > >>>>>>>> > >>>>>>>> But let's discuss the details after we agree on implementing this. > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>>> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto: > >>>>>> yungao...@aliyun.com>> wrote: > >>>>>>>>> > >>>>>>>>> Hi Piotr, > >>>>>>>>> > >>>>>>>>> Thanks a lot for the suggestions! > >>>>>>>>> > >>>>>>>>> The core motivation of this discussion is to implement a > >> new > >>>>>> iteration library on the DataStream, and it requires to insert > >> special > >>>>>> records in the stream to notify the progress of the iteration. The > >>>>>> mechanism of such records is very similar to the current Watermark, > >>> and > >>>> we > >>>>>> meet the problem of sending normal records according to the > >> partition > >>>>>> (Rebalance, etc..) and also be able to broadcast the inserted > >> progress > >>>>>> records to all the connected records. I have read the notes in the > >>>> google > >>>>>> doc and I totally agree with that exploring the broadcast interface > >> in > >>>>>> RecordWriter in some way is able to solve this issue. > >>>>>>>>> > >>>>>>>>> Regarding to `int[] ChannelSelector#selectChannels()`, I'm > >>>>>> wondering if we introduce a new MulticastRecordWriter and left the > >>>> current > >>>>>> RecordWriter untouched, could we avoid the performance degradation ? > >>>> Since > >>>>>> with such a modification the normal RecordWriter does not need to > >>>> iterate > >>>>>> the return array by ChannelSelector, and the only difference will be > >>>>>> returning an array instead of an integer, and accessing the first > >>>> element > >>>>>> of the returned array instead of reading the integer directly. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Yun > >>>>>>>>> > >>>>>>>>> > >> ------------------------------------------------------------------ > >>>>>>>>> From:Piotr Nowojski <pi...@ververica.com <mailto: > >>> pi...@ververica.com > >>>>>>>> > >>>>>>>>> Send Time:2019 Aug. 23 (Fri.) 15:20 > >>>>>>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> > >>>>>>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> > >>>>>>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication > >>>>>> Pattern > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Yun: > >>>>>>>>> > >>>>>>>>> Thanks for proposing the idea. I have checked the document and > >> left > >>>>>> couple of questions there, but it might be better to answer them > >> here. > >>>>>>>>> > >>>>>>>>> What is the exact motivation and what problems do you want to > >>> solve? > >>>>>> We have dropped multicast support from the network stack [1] for two > >>>>>> reasons: > >>>>>>>>> 1. Performance > >>>>>>>>> 2. Code simplicity > >>>>>>>>> > >>>>>>>>> The proposal to re introduce `int[] > >>> ChannelSelector#selectChannels()` > >>>>>> would revert those changes. At that time we were thinking about a > >> way > >>>> how > >>>>>> to keep the multicast support on the network level, while keeping > >> the > >>>>>> performance and simplicity for non multicast cases and there are > >> ways > >>> to > >>>>>> achieve that. However they would add extra complexity to Flink, > >> which > >>> it > >>>>>> would be better to avoid. > >>>>>>>>> > >>>>>>>>> On the other hand, supporting dual pattern: standard partitioning > >>> or > >>>>>> broadcasting is easy to do, as LatencyMarkers are doing exactly > >> that. > >>> It > >>>>>> would be just a matter of exposing this to the user in some way. So > >>>> before > >>>>>> we go any further, can you describe your use cases/motivation? Isn’t > >>>> mix of > >>>>>> standard partitioning and broadcasting enough? Do we need > >>> multicasting? > >>>>>>>>> > >>>>>>>>> Zhu: > >>>>>>>>> > >>>>>>>>> Could you rephrase your example? I didn’t quite understand it. > >>>>>>>>> > >>>>>>>>> Piotrek > >>>>>>>>> > >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10662 < > >>>>>> https://issues.apache.org/jira/browse/FLINK-10662> < > >>>>>> https://issues.apache.org/jira/browse/FLINK-10662 < > >>>>>> https://issues.apache.org/jira/browse/FLINK-10662>> > >>>>>>>>> > >>>>>>>>> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto: > >>>>>> reed...@gmail.com> <mailto:reed...@gmail.com <mailto: > >>> reed...@gmail.com > >>>>>>> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Thanks Yun for starting this discussion. > >>>>>>>>> I think the multicasting can be very helpful in certain cases. > >>>>>>>>> > >>>>>>>>> I have received requirements from users that they want to do > >>>> broadcast > >>>>>>>>> join, while the data set to broadcast is too large to fit in one > >>>> task. > >>>>>>>>> Thus the requirement turned out to be to support cartesian > >> product > >>> of > >>>>>> 2 > >>>>>>>>> data set(one of which can be infinite stream). > >>>>>>>>> For example, A(parallelism=2) broadcast join B(parallelism=2) in > >>>>>> JobVertex > >>>>>>>>> C. > >>>>>>>>> The idea to is have 4 C subtasks to deal with different > >>> combinations > >>>>>> of A/B > >>>>>>>>> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2). > >>>>>>>>> This requires one record to be sent to multiple downstream > >>> subtasks, > >>>>>> but > >>>>>>>>> not to all subtasks. > >>>>>>>>> > >>>>>>>>> With current interface this is not supported, as one record can > >>> only > >>>>>> be > >>>>>>>>> sent to one subtask, or to all subtasks of a JobVertex. > >>>>>>>>> And the user had to split the broadcast data set manually to > >>> several > >>>>>>>>> different JobVertices, which is hard to maintain and extend. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Zhu Zhu > >>>>>>>>> > >>>>>>>>> Yun Gao <yungao...@aliyun.com.invalid <mailto: > >>>>>> yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid > >>>>> > >>>>>> 于2019年8月22日周四 下午8:42写道: > >>>>>>>>> > >>>>>>>>> Hi everyone, > >>>>>>>>> In some scenarios we met a requirement that some operators > >> want > >>> to > >>>>>>>>> send records to theirs downstream operators with an multicast > >>>>>> communication > >>>>>>>>> pattern. In detail, for some records, the operators want to send > >>> them > >>>>>>>>> according to the partitioner (for example, Rebalance), and for > >> some > >>>>>> other > >>>>>>>>> records, the operators want to send them to all the connected > >>>>>> operators and > >>>>>>>>> tasks. Such a communication pattern could be viewed as a kind of > >>>>>> multicast: > >>>>>>>>> it does not broadcast every record, but some record will indeed > >> be > >>>>>> sent to > >>>>>>>>> multiple downstream operators. > >>>>>>>>> > >>>>>>>>> However, we found that this kind of communication pattern seems > >>> could > >>>>>> not > >>>>>>>>> be implemented rightly if the operators have multiple consumers > >>> with > >>>>>>>>> different parallelism, using the customized partitioner. To solve > >>> the > >>>>>> above > >>>>>>>>> problem, we propose to enhance the support for such kind of > >>> irregular > >>>>>>>>> communication pattern. We think there may be two options: > >>>>>>>>> > >>>>>>>>> 1. Support a kind of customized operator events, which share > >> much > >>>>>>>>> similarity with Watermark, and these events can be broadcasted to > >>> the > >>>>>>>>> downstream operators separately. > >>>>>>>>> 2. Let the channel selector supports multicast, and also add > >> the > >>>>>>>>> separate RecordWriter implementation to avoid impacting the > >>>>>> performance of > >>>>>>>>> the channel selector that does not need multicast. > >>>>>>>>> > >>>>>>>>> The problem and options are detailed in > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >>>>>> < > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >>>>> > >>>>>> < > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >>>>>> < > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >>>>>>>> > >>>>>>>>> > >>>>>>>>> We are also wondering if there are other methods to implement > >> this > >>>>>>>>> requirement with or without changing Runtime. Very thanks for any > >>>>>> feedbacks > >>>>>>>>> ! > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Yun > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > >