Hi all, Sorry for joining this thread late. Basically, I think enabling multicast pattern could be the right direction, but more detailed implementation policies need to be discussed.
Two years ago, I filed an issue [1] about the multicast API. However, due to some reasons, it was laid aside. After that, when I tried to cherry-pick the change for experimental use, I found the return type of `selectChannels()` method had changed from `int[]` to `int`, which makes the old implementation not work anymore. From my side, the multicast has always been used for theta-join. As far as I know, it’s an essential requirement for some sophisticated joining algorithms. Until now, the Flink non-equi joins can still only be executed single-threaded. If we'd like to make some improvements on this, we should first take some measures to support multicast pattern. Best, Xingcan [1] https://issues.apache.org/jira/browse/FLINK-6936 > On Aug 24, 2019, at 5:54 AM, Zhu Zhu <reed...@gmail.com> wrote: > > Hi Piotr, > > Thanks for the explanation. > Agreed that the broadcastEmit(record) is a better choice for broadcasting > for the iterations. > As broadcasting for the iterations is the first motivation, let's support > it first. > > Thanks, > Zhu Zhu > > Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道: > >> Hi Piotr, >> >> Very thanks for the suggestions! >> >> Totally agree with that we could first focus on the broadcast >> scenarios and exposing the broadcastEmit method first considering the >> semantics and performance. >> >> For the keyed stream, I also agree with that broadcasting keyed >> records to all the tasks may be confused considering the semantics of keyed >> partitioner. However, in the iteration case supporting broadcast over keyed >> partitioner should be required since users may create any subgraph for the >> iteration body, including the operators with key. I think a possible >> solution to this issue is to introduce another data type for >> 'broadcastEmit'. For example, for an operator Operator<T>, it may broadcast >> emit another type E instead of T, and the transmitting E will bypass the >> partitioner and setting keyed context. This should result in the design to >> introduce customized operator event (option 1 in the document). The cost of >> this method is that we need to introduce a new type of StreamElement and >> new interface for this type, but it should be suitable for both keyed or >> non-keyed partitioner. >> >> Best, >> Yun >> >> >> >> ------------------------------------------------------------------ >> From:Piotr Nowojski <pi...@ververica.com> >> Send Time:2019 Aug. 23 (Fri.) 22:29 >> To:Zhu Zhu <reed...@gmail.com> >> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com> >> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern >> >> Hi, >> >> If the primary motivation is broadcasting (for the iterations) and we have >> no immediate need for multicast (cross join), I would prefer to first >> expose broadcast via the DataStream API and only later, once we finally >> need it, support multicast. As I wrote, multicast would be more challenging >> to implement, with more complicated runtime and API. And re-using multicast >> just to support broadcast doesn’t have much sense: >> >> 1. It’s a bit obfuscated. It’s easier to understand >> collectBroadcast(record) or broadcastEmit(record) compared to some >> multicast channel selector that just happens to return all of the channels. >> 2. There are performance benefits of explicitly calling >> `RecordWriter#broadcastEmit`. >> >> >> On a different note, what would be the semantic of such broadcast emit on >> KeyedStream? Would it be supported? Or would we limit support only to the >> non-keyed streams? >> >> Piotrek >> >>> On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote: >>> >>> Thanks Piotr, >>> >>> Users asked for this feature sometimes ago when they migrating batch >> jobs to Flink(Blink). >>> It's not very urgent as they have taken some workarounds to solve >> it.(like partitioning data set to different job vertices) >>> So it's fine to not make it top priority. >>> >>> Anyway, as a commonly known scenario, I think users can benefit from >> cross join sooner or later. >>> >>> Thanks, >>> Zhu Zhu >>> >>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> >> 于2019年8月23日周五 下午6:19写道: >>> Hi, >>> >>> Thanks for the answers :) Ok I understand the full picture now. +1 from >> my side on solving this issue somehow. But before we start discussing how >> to solve it one last control question: >>> >>> I guess this multicast is intended to be used in blink planner, right? >> Assuming that we implement the multicast support now, when would it be used >> by the blink? I would like to avoid a scenario, where we implement an >> unused feature and we keep maintaining it for a long period of time. >>> >>> Piotrek >>> >>> PS, try to include motivating examples, including concrete ones in the >> proposals/design docs, for example in the very first paragraph. Especially >> if it’s a commonly known feature like cross join :) >>> >>>> On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID> >> wrote: >>>> >>>> Hi Piotr, >>>> >>>> Thanks a lot for sharing the thoughts! >>>> >>>> For the iteration, agree with that multicasting is not >> necessary. Exploring the broadcast interface to Output of the operators in >> some way should also solve this issue, and I think it should be even more >> convenient to have the broadcast method for the iteration. >>>> >>>> Also thanks Zhu Zhu for the cross join case! >>>> Best, >>>> Yun >>>> >>>> >>>> >>>> ------------------------------------------------------------------ >>>> From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>> >>>> Send Time:2019 Aug. 23 (Fri.) 17:25 >>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> >>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> >>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication >> Pattern >>>> >>>> Hi Piotr, >>>> >>>> Yes you are right it's a distributed cross join requirement. >>>> Broadcast join can help with cross join cases. But users cannot use it >> if the data set to join is too large to fit into one subtask. >>>> >>>> Sorry for left some details behind. >>>> >>>> Thanks, >>>> Zhu Zhu >>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> >> 于2019年8月23日周五 下午4:57写道: >>>> Hi Yun and Zhu Zhu, >>>> >>>> Thanks for the more detailed example Zhu Zhu. >>>> >>>> As far as I understand for the iterations example we do not need >> multicasting. Regarding the Join example, I don’t fully understand it. The >> example that Zhu Zhu presented has a drawback of sending both tables to >> multiple nodes. What’s the benefit of using broadcast join over a hash join >> in such case? As far as I know, the biggest benefit of using broadcast join >> instead of hash join is that we can avoid sending the larger table over the >> network, because we can perform the join locally. In this example we are >> sending both of the tables to multiple nodes, which should defeat the >> purpose. >>>> >>>> Is it about implementing cross join or near cross joins in a >> distributed fashion? >>>> >>>>> if we introduce a new MulticastRecordWriter >>>> >>>> That’s one of the solutions. It might have a drawback of 3 class >> virtualisation problem (We have RecordWriter and BroadcastRecordWriter >> already). With up to two implementations, JVM is able to devirtualise the >> calls. >>>> >>>> Previously I was also thinking about just providing two different >> ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector` >> with plain `int` and based on that, RecordWriter could perform some magic >> (worst case scenario `instaceof` checks). >>>> >>>> Another solution might be to change `ChannelSelector` interface into >> an iterator. >>>> >>>> But let's discuss the details after we agree on implementing this. >>>> >>>> Piotrek >>>> >>>>> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto: >> yungao...@aliyun.com>> wrote: >>>>> >>>>> Hi Piotr, >>>>> >>>>> Thanks a lot for the suggestions! >>>>> >>>>> The core motivation of this discussion is to implement a new >> iteration library on the DataStream, and it requires to insert special >> records in the stream to notify the progress of the iteration. The >> mechanism of such records is very similar to the current Watermark, and we >> meet the problem of sending normal records according to the partition >> (Rebalance, etc..) and also be able to broadcast the inserted progress >> records to all the connected records. I have read the notes in the google >> doc and I totally agree with that exploring the broadcast interface in >> RecordWriter in some way is able to solve this issue. >>>>> >>>>> Regarding to `int[] ChannelSelector#selectChannels()`, I'm >> wondering if we introduce a new MulticastRecordWriter and left the current >> RecordWriter untouched, could we avoid the performance degradation ? Since >> with such a modification the normal RecordWriter does not need to iterate >> the return array by ChannelSelector, and the only difference will be >> returning an array instead of an integer, and accessing the first element >> of the returned array instead of reading the integer directly. >>>>> >>>>> Best, >>>>> Yun >>>>> >>>>> ------------------------------------------------------------------ >>>>> From:Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com >>>> >>>>> Send Time:2019 Aug. 23 (Fri.) 15:20 >>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> >>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> >>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication >> Pattern >>>>> >>>>> Hi, >>>>> >>>>> Yun: >>>>> >>>>> Thanks for proposing the idea. I have checked the document and left >> couple of questions there, but it might be better to answer them here. >>>>> >>>>> What is the exact motivation and what problems do you want to solve? >> We have dropped multicast support from the network stack [1] for two >> reasons: >>>>> 1. Performance >>>>> 2. Code simplicity >>>>> >>>>> The proposal to re introduce `int[] ChannelSelector#selectChannels()` >> would revert those changes. At that time we were thinking about a way how >> to keep the multicast support on the network level, while keeping the >> performance and simplicity for non multicast cases and there are ways to >> achieve that. However they would add extra complexity to Flink, which it >> would be better to avoid. >>>>> >>>>> On the other hand, supporting dual pattern: standard partitioning or >> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It >> would be just a matter of exposing this to the user in some way. So before >> we go any further, can you describe your use cases/motivation? Isn’t mix of >> standard partitioning and broadcasting enough? Do we need multicasting? >>>>> >>>>> Zhu: >>>>> >>>>> Could you rephrase your example? I didn’t quite understand it. >>>>> >>>>> Piotrek >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-10662 < >> https://issues.apache.org/jira/browse/FLINK-10662> < >> https://issues.apache.org/jira/browse/FLINK-10662 < >> https://issues.apache.org/jira/browse/FLINK-10662>> >>>>> >>>>> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto: >> reed...@gmail.com> <mailto:reed...@gmail.com <mailto:reed...@gmail.com>>> >> wrote: >>>>> >>>>> Thanks Yun for starting this discussion. >>>>> I think the multicasting can be very helpful in certain cases. >>>>> >>>>> I have received requirements from users that they want to do broadcast >>>>> join, while the data set to broadcast is too large to fit in one task. >>>>> Thus the requirement turned out to be to support cartesian product of >> 2 >>>>> data set(one of which can be infinite stream). >>>>> For example, A(parallelism=2) broadcast join B(parallelism=2) in >> JobVertex >>>>> C. >>>>> The idea to is have 4 C subtasks to deal with different combinations >> of A/B >>>>> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2). >>>>> This requires one record to be sent to multiple downstream subtasks, >> but >>>>> not to all subtasks. >>>>> >>>>> With current interface this is not supported, as one record can only >> be >>>>> sent to one subtask, or to all subtasks of a JobVertex. >>>>> And the user had to split the broadcast data set manually to several >>>>> different JobVertices, which is hard to maintain and extend. >>>>> >>>>> Thanks, >>>>> Zhu Zhu >>>>> >>>>> Yun Gao <yungao...@aliyun.com.invalid <mailto: >> yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>>> >> 于2019年8月22日周四 下午8:42写道: >>>>> >>>>> Hi everyone, >>>>> In some scenarios we met a requirement that some operators want to >>>>> send records to theirs downstream operators with an multicast >> communication >>>>> pattern. In detail, for some records, the operators want to send them >>>>> according to the partitioner (for example, Rebalance), and for some >> other >>>>> records, the operators want to send them to all the connected >> operators and >>>>> tasks. Such a communication pattern could be viewed as a kind of >> multicast: >>>>> it does not broadcast every record, but some record will indeed be >> sent to >>>>> multiple downstream operators. >>>>> >>>>> However, we found that this kind of communication pattern seems could >> not >>>>> be implemented rightly if the operators have multiple consumers with >>>>> different parallelism, using the customized partitioner. To solve the >> above >>>>> problem, we propose to enhance the support for such kind of irregular >>>>> communication pattern. We think there may be two options: >>>>> >>>>> 1. Support a kind of customized operator events, which share much >>>>> similarity with Watermark, and these events can be broadcasted to the >>>>> downstream operators separately. >>>>> 2. Let the channel selector supports multicast, and also add the >>>>> separate RecordWriter implementation to avoid impacting the >> performance of >>>>> the channel selector that does not need multicast. >>>>> >>>>> The problem and options are detailed in >>>>> >> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing >> < >> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing> >> < >> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing >> < >> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing >>>> >>>>> >>>>> We are also wondering if there are other methods to implement this >>>>> requirement with or without changing Runtime. Very thanks for any >> feedbacks >>>>> ! >>>>> >>>>> >>>>> Best, >>>>> Yun >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >>