Hi Piotr, The case is about a broadcast join: A--\ +--(join)--> C B--/
Usually we can broadcast A(the result that JobVertex A produces) to all subtasks of C. But in this case the size of A is too large to fit in one subtask of C. Thus we have to partition A to (A_0, A_1, A_2, ..., A_m-1). The throughput of B is too large to deal in one subtask as well. And we partition B into (B_0, B_1, B_2, ..., B_n-1). Now if we want to join A and B, the basic idea is to set parallelism of C to be m*n, and subtask C_kn+l should deal with the join work of (A_k, B_l). To achieve this, each record in partition A_k should to sent to *n* downstream subtasks: {C_kn, C_kn+1, C_kn+2, ..., C_kn+n-1} each record in partition B_l should to sent to *m* downstream subtasks: {C_l, C_n+l, C_2n+l, ..., C_(m-1)n+l} This is different from current single-cast or broad-cast way. That's why I think multi-cast can help with this case. Thanks, Zhu Zhu Piotr Nowojski <pi...@ververica.com> 于2019年8月23日周五 下午3:20写道: > Hi, > > Yun: > > Thanks for proposing the idea. I have checked the document and left couple > of questions there, but it might be better to answer them here. > > What is the exact motivation and what problems do you want to solve? We > have dropped multicast support from the network stack [1] for two reasons: > 1. Performance > 2. Code simplicity > > The proposal to re introduce `int[] ChannelSelector#selectChannels()` > would revert those changes. At that time we were thinking about a way how > to keep the multicast support on the network level, while keeping the > performance and simplicity for non multicast cases and there are ways to > achieve that. However they would add extra complexity to Flink, which it > would be better to avoid. > > On the other hand, supporting dual pattern: standard partitioning or > broadcasting is easy to do, as LatencyMarkers are doing exactly that. It > would be just a matter of exposing this to the user in some way. So before > we go any further, can you describe your use cases/motivation? Isn’t mix of > standard partitioning and broadcasting enough? Do we need multicasting? > > Zhu: > > Could you rephrase your example? I didn’t quite understand it. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-10662 < > https://issues.apache.org/jira/browse/FLINK-10662> > > > On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com> wrote: > > > > Thanks Yun for starting this discussion. > > I think the multicasting can be very helpful in certain cases. > > > > I have received requirements from users that they want to do broadcast > > join, while the data set to broadcast is too large to fit in one task. > > Thus the requirement turned out to be to support cartesian product of 2 > > data set(one of which can be infinite stream). > > For example, A(parallelism=2) broadcast join B(parallelism=2) in > JobVertex > > C. > > The idea to is have 4 C subtasks to deal with different combinations of > A/B > > partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2). > > This requires one record to be sent to multiple downstream subtasks, but > > not to all subtasks. > > > > With current interface this is not supported, as one record can only be > > sent to one subtask, or to all subtasks of a JobVertex. > > And the user had to split the broadcast data set manually to several > > different JobVertices, which is hard to maintain and extend. > > > > Thanks, > > Zhu Zhu > > > > Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月22日周四 下午8:42写道: > > > >> Hi everyone, > >> In some scenarios we met a requirement that some operators want to > >> send records to theirs downstream operators with an multicast > communication > >> pattern. In detail, for some records, the operators want to send them > >> according to the partitioner (for example, Rebalance), and for some > other > >> records, the operators want to send them to all the connected operators > and > >> tasks. Such a communication pattern could be viewed as a kind of > multicast: > >> it does not broadcast every record, but some record will indeed be sent > to > >> multiple downstream operators. > >> > >> However, we found that this kind of communication pattern seems could > not > >> be implemented rightly if the operators have multiple consumers with > >> different parallelism, using the customized partitioner. To solve the > above > >> problem, we propose to enhance the support for such kind of irregular > >> communication pattern. We think there may be two options: > >> > >> 1. Support a kind of customized operator events, which share much > >> similarity with Watermark, and these events can be broadcasted to the > >> downstream operators separately. > >> 2. Let the channel selector supports multicast, and also add the > >> separate RecordWriter implementation to avoid impacting the performance > of > >> the channel selector that does not need multicast. > >> > >> The problem and options are detailed in > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >> > >> We are also wondering if there are other methods to implement this > >> requirement with or without changing Runtime. Very thanks for any > feedbacks > >> ! > >> > >> > >> Best, > >> Yun > >> > >> > >