Thanks Yun for starting this discussion.
I think the multicasting can be very helpful in certain cases.

I have received requirements from users that they want to do broadcast
join, while the data set to broadcast is too large to fit in one task.
Thus the requirement turned out to be to support cartesian product of 2
data set(one of which can be infinite stream).
For example, A(parallelism=2) broadcast join B(parallelism=2) in JobVertex
C.
The idea to is have 4 C subtasks to deal with different combinations of A/B
partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
This requires one record to be sent to multiple downstream subtasks, but
not to all subtasks.

With current interface this is not supported, as one record can only be
sent to one subtask, or to all subtasks of a JobVertex.
And the user had to split the broadcast data set manually to several
different JobVertices, which is hard to maintain and extend.

Thanks,
Zhu Zhu

Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月22日周四 下午8:42写道:

> Hi everyone,
>       In some scenarios we met a requirement that some operators want to
> send records to theirs downstream operators with an multicast communication
> pattern. In detail, for some records, the operators want to send them
> according to the partitioner (for example, Rebalance), and for some other
> records, the operators want to send them to all the connected operators and
> tasks. Such a communication pattern could be viewed as a kind of multicast:
> it does not broadcast every record, but some record will indeed be sent to
> multiple downstream operators.
>
> However, we found that this kind of communication pattern seems could not
> be implemented rightly if the operators have multiple consumers with
> different parallelism, using the customized partitioner. To solve the above
> problem, we propose to enhance the support for such kind of irregular
> communication pattern. We think there may be two options:
>
>      1. Support a kind of customized operator events, which share much
> similarity with Watermark, and these events can be broadcasted to the
> downstream operators separately.
>      2. Let the channel selector supports multicast, and also add the
> separate RecordWriter implementation to avoid impacting the performance of
> the channel selector that does not need multicast.
>
> The problem and options are detailed in
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
>
> We are also wondering if there are other methods to implement this
> requirement with or without changing Runtime. Very thanks for any feedbacks
> !
>
>
> Best,
> Yun
>
>

Reply via email to