Hi Piotr,

The case is about a broadcast join:
A--\
     +--(join)--> C
B--/

Usually we can broadcast A(the result that JobVertex A produces) to all
subtasks of C.
But in this case the size of A is too large to fit in one subtask of C.
Thus we have to partition A to (A_0, A_1, A_2, ..., A_m-1).
The throughput of B is too large to deal in one subtask as well. And we
partition B into (B_0, B_1, B_2, ..., B_n-1).

Now if we want to join A and B, the basic idea is to set parallelism of C
to be m*n, and subtask C_kn+l should deal with the join work of (A_k, B_l).
To achieve this,
each record in partition A_k should to sent to *n* downstream subtasks:
{C_kn, C_kn+1, C_kn+2, ..., C_kn+n-1}
each record in partition B_l should to sent to *m* downstream
subtasks:  {C_l, C_n+l, C_2n+l, ..., C_(m-1)n+l}

This is different from current single-cast or broad-cast way.
That's why I think multi-cast can help with this case.

Thanks,
Zhu Zhu

Piotr Nowojski <pi...@ververica.com> 于2019年8月23日周五 下午3:20写道:

> Hi,
>
> Yun:
>
> Thanks for proposing the idea. I have checked the document and left couple
> of questions there, but it might be better to answer them here.
>
> What is the exact motivation and what problems do you want to solve? We
> have dropped multicast support from the network stack [1] for two reasons:
> 1. Performance
> 2. Code simplicity
>
> The proposal to re introduce `int[] ChannelSelector#selectChannels()`
> would revert those changes. At that time we were thinking about a way how
> to keep the multicast support on the network level, while keeping the
> performance and simplicity for non multicast cases and there are ways to
> achieve that. However they would add extra complexity to Flink, which it
> would be better to avoid.
>
> On the other hand, supporting dual pattern: standard partitioning or
> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It
> would be just a matter of exposing this to the user in some way. So before
> we go any further, can you describe your use cases/motivation? Isn’t mix of
> standard partitioning and broadcasting enough? Do we need multicasting?
>
> Zhu:
>
> Could you rephrase your example? I didn’t quite understand it.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-10662 <
> https://issues.apache.org/jira/browse/FLINK-10662>
>
> > On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com> wrote:
> >
> > Thanks Yun for starting this discussion.
> > I think the multicasting can be very helpful in certain cases.
> >
> > I have received requirements from users that they want to do broadcast
> > join, while the data set to broadcast is too large to fit in one task.
> > Thus the requirement turned out to be to support cartesian product of 2
> > data set(one of which can be infinite stream).
> > For example, A(parallelism=2) broadcast join B(parallelism=2) in
> JobVertex
> > C.
> > The idea to is have 4 C subtasks to deal with different combinations of
> A/B
> > partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
> > This requires one record to be sent to multiple downstream subtasks, but
> > not to all subtasks.
> >
> > With current interface this is not supported, as one record can only be
> > sent to one subtask, or to all subtasks of a JobVertex.
> > And the user had to split the broadcast data set manually to several
> > different JobVertices, which is hard to maintain and extend.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月22日周四 下午8:42写道:
> >
> >> Hi everyone,
> >>      In some scenarios we met a requirement that some operators want to
> >> send records to theirs downstream operators with an multicast
> communication
> >> pattern. In detail, for some records, the operators want to send them
> >> according to the partitioner (for example, Rebalance), and for some
> other
> >> records, the operators want to send them to all the connected operators
> and
> >> tasks. Such a communication pattern could be viewed as a kind of
> multicast:
> >> it does not broadcast every record, but some record will indeed be sent
> to
> >> multiple downstream operators.
> >>
> >> However, we found that this kind of communication pattern seems could
> not
> >> be implemented rightly if the operators have multiple consumers with
> >> different parallelism, using the customized partitioner. To solve the
> above
> >> problem, we propose to enhance the support for such kind of irregular
> >> communication pattern. We think there may be two options:
> >>
> >>     1. Support a kind of customized operator events, which share much
> >> similarity with Watermark, and these events can be broadcasted to the
> >> downstream operators separately.
> >>     2. Let the channel selector supports multicast, and also add the
> >> separate RecordWriter implementation to avoid impacting the performance
> of
> >> the channel selector that does not need multicast.
> >>
> >> The problem and options are detailed in
> >>
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> >>
> >> We are also wondering if there are other methods to implement this
> >> requirement with or without changing Runtime. Very thanks for any
> feedbacks
> >> !
> >>
> >>
> >> Best,
> >> Yun
> >>
> >>
>
>

Reply via email to