Hi Xiaogang,
Very thanks for also considering the iteration case! :) These points are
really important for iteration. As a whole, we are implementing a new iteration
library on top of Stream API. As a library, most of its implementation does not
need to touch Runtime layer, but it really has some new requirements on the
API, like the one for being able to broadcast the progressive events. To be
more detail, these events indeed carry the sender's index and the downstream
operators need to do alignment the events from all the upstream operators. It
works very similar to watermark, thus these events do not need to be contained
in checkpoints.
Some other points are also under implementation. However, since some part of
the design is still under discussion internally, we may not be able to start a
new discussion on iteration immediately. Besides, we should also need to fix
the problems that may have new requirements on the Runtime, like broadcasting
events, to have a complete design. Therefore, I think we may still first have
the broadcasting problem settled in this thread? Based on the points learned in
the discussion, now I think that we might be able to decouple the broadcasting
events requirements and more generalized multicasting mechanism. :)
Best,
Yun
------------------------------------------------------------------
From:SHI Xiaogang <[email protected]>
Send Time:2019 Aug. 27 (Tue.) 09:16
To:dev <[email protected]>; Yun Gao <[email protected]>
Cc:Piotr Nowojski <[email protected]>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
Hi, Yun Gao
The discussion seems to move in a different direction, changing from supporting
multicasting to implementing new iteration libraries on data streams.
Regarding the broadcast events in iterations, many details of new iteration
libraries are unclear,
1. How the iteration progress is determined and notified? The iterations are
synchronous or asynchronous? As far as i know, progress tracking for
asynchronous iterations is very difficult.
2. Do async I/O operators allowed in the iterations? If so, how the broadcast
events are checkpointed and restored? How broadcast events are distributed when
the degree of parallelism changes?
3. Do the emitted broadcast events carry the sender's index? Will they be
aligned in a similar way to checkpoint barriers in downstream operators?
4. In the case of synchronous iterations, do we need something similar to
barrier buffers to guarantee the correctness of iterations?
5. Will checkpointing be enabled in iterations? If checkpointing is enabled,
how will checkpoint barriers interact with broadcast events?
I think a detailed design document for iterations will help understand these
problems, hencing improving the discussion.
I also suggest a new thread for the discussion on iterations.
This thread should focus on multicasting and discuss those problems related to
multicasting, including how data is delivered and states are partitioned.
Regards,
Xiaogang
Yun Gao <[email protected]> 于2019年8月26日周一 下午11:35写道:
Hi,
Very thanks for all the points raised !
@Piotr For using another edge to broadcast the event, I think it may not be
able to address the iteration case. The primary problem is that with two edges
we cannot ensure the order of records. However, In the iteration case, the
broadcasted event is used to mark the progress of the iteration and it works
like watermark, thus its position relative to the normal records can not change.
And @Piotr, @Xiaogang, for the requirements on the state, I think different
options seems vary. The first option is to allow Operator<T> to broadcast a
separate event and have a separate process method for this event. To be detail,
we may add a new type of StreamElement called Event and allow Operator<T> to
broadcastEmit Event. Then in the received side, we could add a new
`processEvent` method to the (Keyed)ProcessFunction. Similar to the broadcast
side of KeyedBroadcastProcessFunction, in this new method users cannot access
keyed state with specific key, but can register a state function to touch all
the elements in the keyed state. This option needs to modify the runtime to
support the new type of StreamElement, but it does not affect the semantics of
states and thus it has no requirements on state.
The second option is to allow Operator<T> to broadcastEmit T and in the
receiver side, user can process the broadcast element with the existing process
method. This option is consistent with the OperatorState, but for keyedState we
may send a record to tasks that do not containing the corresponding keyed
state, thus it should require some changes on the State.
The third option is to support the generic Multicast. For keyedState it also
meets the problem of inconsistency between network partitioner and keyed state
partitioner, and if we want to rely on it to implement the non-key join, it
should be also meet the problem of cannot control the partitioning of operator
state. Therefore, it should also require some changes on the State.
Then for the different scenarios proposed, the iteration case in fact requires
exactly the ability to broadcast a different event type. In the iteration the
fields of the progress event are in fact different from that of normal records.
It does not contain actual value but contains some fields for the downstream
operators to align the events and track the progress. Therefore, broadcasting a
different event type is able to solve the iteration case without the
requirements on the state. Besides, allowing the operator to broadcast a
separate event may also facilitate some other user cases, for example, users
may notify the downstream operators to change logic if some patterns are
matched. The notification might be different from the normal records and users
do not need to uniform them with a wrapper type manually if the operators are
able to broadcast a separate event. However, it truly cannot address the
non-key join scenarios.
Since allowing broadcasting a separate event seems to be able to serve as a
standalone functionality, and it does not require change on the state, I am
thinking that is it possible for us to partition to multiple steps and supports
broadcasting events first ? At the same time we could also continue working on
other options to support more scenarios like non-key join and they seems to
requires more thoughts.
Best,
Yun
------------------------------------------------------------------
From:Piotr Nowojski <[email protected]>
Send Time:2019 Aug. 26 (Mon.) 18:59
To:dev <[email protected]>
Cc:Yun Gao <[email protected]>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
Hi,
Xiaogang, those things worry me the most.
1. Regarding the broadcasting, doesn’t the BroadcastState [1] cover our
issues? Can not we construct a job graph, where one operator has two outputs,
one keyed another broadcasted, which are wired together back to the
KeyedBroadcastProcessFunction or BroadcastProcessFunction?
2. Multicast on keyed streams, might be done by iterating over all of the
keys. However I have a feeling that might not be the feature which distributed
cross/theta joins would want, since they would probably need a guarantee to
have only a single key per operator instance.
Kurt, by broadcast optimisation do you mean [2]?
I’m not sure if we should split the discussion yet. Most of the changes
required by either multicast or broadcast will be in the API/state layers.
Runtime changes for broadcast would be almost none (just exposing existing
features) and for multicast they shouldn't be huge as well. However maybe we
should consider those two things together at the API level, so that we do not
make wrong decisions when just looking at the simpler/more narrow broadcast
support?
Piotrek
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2] https://github.com/apache/flink/pull/7713
On 26 Aug 2019, at 09:35, Kurt Young <[email protected]> wrote:
From SQL's perspective, distributed cross join is a valid feature but not
very
urgent. Actually this discuss reminds me about another useful feature
(sorry
for the distraction):
when doing broadcast in batch shuffle mode, we can make each producer only
write one copy of the output data, but not for every consumer. Broadcast
join
is much more useful, and this is a very important optimization. Not sure if
we
have already consider this.
Best,
Kurt
On Mon, Aug 26, 2019 at 12:16 PM Guowei Ma <[email protected]> wrote:
Thanks Yun for bringing up this discussion and very thanks for all the deep
thoughts!
For now, I think this discussion contains two scenarios: one if for
iteration library support and the other is for SQL join support. I think
both of the two scenarios are useful but they seem to have different best
suitable solutions. For making the discussion more clear, I would suggest
to split the discussion into two threads.
And I agree with Piotr that it is very tricky that a keyed stream received
a "broadcast element". So we may add some new interfaces, which could
broadcast or process some special "broadcast event". In that way "broadcast
event" will not be sent with the normal process.
Best,
Guowei
SHI Xiaogang <[email protected]> 于2019年8月26日周一 上午9:27写道:
Hi all,
I also think that multicasting is a necessity in Flink, but more details
are needed to be considered.
Currently network is tightly coupled with states in Flink to achieve
automatic scaling. We can only access keyed states in keyed streams and
operator states in all streams.
In the concrete example of theta-joins implemented with mutlticasting,
the
following questions exist:
- In which type of states will the data be stored? Do we need another
type of states which is coupled with multicasting streams?
- How to ensure the consistency between network and states when jobs
scale out or scale in?
Regards,
Xiaogang
Xingcan Cui <[email protected]> 于2019年8月25日周日 上午10:03写道:
Hi all,
Sorry for joining this thread late. Basically, I think enabling
multicast
pattern could be the right direction, but more detailed implementation
policies need to be discussed.
Two years ago, I filed an issue [1] about the multicast API. However,
due
to some reasons, it was laid aside. After that, when I tried to
cherry-pick
the change for experimental use, I found the return type of
`selectChannels()` method had changed from `int[]` to `int`, which
makes
the old implementation not work anymore.
From my side, the multicast has always been used for theta-join. As far
as
I know, it’s an essential requirement for some sophisticated joining
algorithms. Until now, the Flink non-equi joins can still only be
executed
single-threaded. If we'd like to make some improvements on this, we
should
first take some measures to support multicast pattern.
Best,
Xingcan
[1] https://issues.apache.org/jira/browse/FLINK-6936
On Aug 24, 2019, at 5:54 AM, Zhu Zhu <[email protected]> wrote:
Hi Piotr,
Thanks for the explanation.
Agreed that the broadcastEmit(record) is a better choice for
broadcasting
for the iterations.
As broadcasting for the iterations is the first motivation, let's
support
it first.
Thanks,
Zhu Zhu
Yun Gao <[email protected]> 于2019年8月23日周五 下午11:56写道:
Hi Piotr,
Very thanks for the suggestions!
Totally agree with that we could first focus on the broadcast
scenarios and exposing the broadcastEmit method first considering
the
semantics and performance.
For the keyed stream, I also agree with that broadcasting keyed
records to all the tasks may be confused considering the semantics
of
keyed
partitioner. However, in the iteration case supporting broadcast
over
keyed
partitioner should be required since users may create any subgraph
for
the
iteration body, including the operators with key. I think a possible
solution to this issue is to introduce another data type for
'broadcastEmit'. For example, for an operator Operator<T>, it may
broadcast
emit another type E instead of T, and the transmitting E will bypass
the
partitioner and setting keyed context. This should result in the
design
to
introduce customized operator event (option 1 in the document). The
cost of
this method is that we need to introduce a new type of StreamElement
and
new interface for this type, but it should be suitable for both
keyed
or
non-keyed partitioner.
Best,
Yun
------------------------------------------------------------------
From:Piotr Nowojski <[email protected]>
Send Time:2019 Aug. 23 (Fri.) 22:29
To:Zhu Zhu <[email protected]>
Cc:dev <[email protected]>; Yun Gao <[email protected]>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
Pattern
Hi,
If the primary motivation is broadcasting (for the iterations) and
we
have
no immediate need for multicast (cross join), I would prefer to
first
expose broadcast via the DataStream API and only later, once we
finally
need it, support multicast. As I wrote, multicast would be more
challenging
to implement, with more complicated runtime and API. And re-using
multicast
just to support broadcast doesn’t have much sense:
1. It’s a bit obfuscated. It’s easier to understand
collectBroadcast(record) or broadcastEmit(record) compared to some
multicast channel selector that just happens to return all of the
channels.
2. There are performance benefits of explicitly calling
`RecordWriter#broadcastEmit`.
On a different note, what would be the semantic of such broadcast
emit
on
KeyedStream? Would it be supported? Or would we limit support only
to
the
non-keyed streams?
Piotrek
On 23 Aug 2019, at 12:48, Zhu Zhu <[email protected]> wrote:
Thanks Piotr,
Users asked for this feature sometimes ago when they migrating
batch
jobs to Flink(Blink).
It's not very urgent as they have taken some workarounds to solve
it.(like partitioning data set to different job vertices)
So it's fine to not make it top priority.
Anyway, as a commonly known scenario, I think users can benefit
from
cross join sooner or later.
Thanks,
Zhu Zhu
Piotr Nowojski <[email protected] <mailto:[email protected]>>
于2019年8月23日周五 下午6:19写道:
Hi,
Thanks for the answers :) Ok I understand the full picture now. +1
from
my side on solving this issue somehow. But before we start
discussing
how
to solve it one last control question:
I guess this multicast is intended to be used in blink planner,
right?
Assuming that we implement the multicast support now, when would it
be
used
by the blink? I would like to avoid a scenario, where we implement
an
unused feature and we keep maintaining it for a long period of time.
Piotrek
PS, try to include motivating examples, including concrete ones in
the
proposals/design docs, for example in the very first paragraph.
Especially
if it’s a commonly known feature like cross join :)
On 23 Aug 2019, at 11:38, Yun Gao <[email protected]>
wrote:
Hi Piotr,
Thanks a lot for sharing the thoughts!
For the iteration, agree with that multicasting is not
necessary. Exploring the broadcast interface to Output of the
operators
in
some way should also solve this issue, and I think it should be even
more
convenient to have the broadcast method for the iteration.
Also thanks Zhu Zhu for the cross join case!
Best,
Yun
------------------------------------------------------------------
From:Zhu Zhu <[email protected] <mailto:[email protected]>>
Send Time:2019 Aug. 23 (Fri.) 17:25
To:dev <[email protected] <mailto:[email protected]>>
Cc:Yun Gao <[email protected] <mailto:[email protected]>>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
Pattern
Hi Piotr,
Yes you are right it's a distributed cross join requirement.
Broadcast join can help with cross join cases. But users cannot
use
it
if the data set to join is too large to fit into one subtask.
Sorry for left some details behind.
Thanks,
Zhu Zhu
Piotr Nowojski <[email protected] <mailto:[email protected]>>
于2019年8月23日周五 下午4:57写道:
Hi Yun and Zhu Zhu,
Thanks for the more detailed example Zhu Zhu.
As far as I understand for the iterations example we do not need
multicasting. Regarding the Join example, I don’t fully understand
it.
The
example that Zhu Zhu presented has a drawback of sending both tables
to
multiple nodes. What’s the benefit of using broadcast join over a
hash
join
in such case? As far as I know, the biggest benefit of using
broadcast
join
instead of hash join is that we can avoid sending the larger table
over
the
network, because we can perform the join locally. In this example we
are
sending both of the tables to multiple nodes, which should defeat
the
purpose.
Is it about implementing cross join or near cross joins in a
distributed fashion?
if we introduce a new MulticastRecordWriter
That’s one of the solutions. It might have a drawback of 3 class
virtualisation problem (We have RecordWriter and
BroadcastRecordWriter
already). With up to two implementations, JVM is able to
devirtualise
the
calls.
Previously I was also thinking about just providing two different
ChannelSelector interfaces. One with `int[]` and
`SingleChannelSelector`
with plain `int` and based on that, RecordWriter could perform some
magic
(worst case scenario `instaceof` checks).
Another solution might be to change `ChannelSelector` interface
into
an iterator.
But let's discuss the details after we agree on implementing this.
Piotrek
On 23 Aug 2019, at 10:20, Yun Gao <[email protected] <mailto:
[email protected]>> wrote:
Hi Piotr,
Thanks a lot for the suggestions!
The core motivation of this discussion is to implement a
new
iteration library on the DataStream, and it requires to insert
special
records in the stream to notify the progress of the iteration. The
mechanism of such records is very similar to the current Watermark,
and
we
meet the problem of sending normal records according to the
partition
(Rebalance, etc..) and also be able to broadcast the inserted
progress
records to all the connected records. I have read the notes in the
google
doc and I totally agree with that exploring the broadcast interface
in
RecordWriter in some way is able to solve this issue.
Regarding to `int[] ChannelSelector#selectChannels()`, I'm
wondering if we introduce a new MulticastRecordWriter and left the
current
RecordWriter untouched, could we avoid the performance degradation ?
Since
with such a modification the normal RecordWriter does not need to
iterate
the return array by ChannelSelector, and the only difference will be
returning an array instead of an integer, and accessing the first
element
of the returned array instead of reading the integer directly.
Best,
Yun
------------------------------------------------------------------
From:Piotr Nowojski <[email protected] <mailto:
[email protected]
Send Time:2019 Aug. 23 (Fri.) 15:20
To:dev <[email protected] <mailto:[email protected]>>
Cc:Yun Gao <[email protected] <mailto:[email protected]>>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
Pattern
Hi,
Yun:
Thanks for proposing the idea. I have checked the document and
left
couple of questions there, but it might be better to answer them
here.
What is the exact motivation and what problems do you want to
solve?
We have dropped multicast support from the network stack [1] for two
reasons:
1. Performance
2. Code simplicity
The proposal to re introduce `int[]
ChannelSelector#selectChannels()`
would revert those changes. At that time we were thinking about a
way
how
to keep the multicast support on the network level, while keeping
the
performance and simplicity for non multicast cases and there are
ways
to
achieve that. However they would add extra complexity to Flink,
which
it
would be better to avoid.
On the other hand, supporting dual pattern: standard partitioning
or
broadcasting is easy to do, as LatencyMarkers are doing exactly
that.
It
would be just a matter of exposing this to the user in some way. So
before
we go any further, can you describe your use cases/motivation? Isn’t
mix of
standard partitioning and broadcasting enough? Do we need
multicasting?
Zhu:
Could you rephrase your example? I didn’t quite understand it.
Piotrek
[1] https://issues.apache.org/jira/browse/FLINK-10662 <
https://issues.apache.org/jira/browse/FLINK-10662> <
https://issues.apache.org/jira/browse/FLINK-10662 <
https://issues.apache.org/jira/browse/FLINK-10662>>
On 23 Aug 2019, at 09:17, Zhu Zhu <[email protected] <mailto:
[email protected]> <mailto:[email protected] <mailto:
[email protected]
wrote:
Thanks Yun for starting this discussion.
I think the multicasting can be very helpful in certain cases.
I have received requirements from users that they want to do
broadcast
join, while the data set to broadcast is too large to fit in one
task.
Thus the requirement turned out to be to support cartesian
product
of
2
data set(one of which can be infinite stream).
For example, A(parallelism=2) broadcast join B(parallelism=2) in
JobVertex
C.
The idea to is have 4 C subtasks to deal with different
combinations
of A/B
partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
This requires one record to be sent to multiple downstream
subtasks,
but
not to all subtasks.
With current interface this is not supported, as one record can
only
be
sent to one subtask, or to all subtasks of a JobVertex.
And the user had to split the broadcast data set manually to
several
different JobVertices, which is hard to maintain and extend.
Thanks,
Zhu Zhu
Yun Gao <[email protected] <mailto:
[email protected] <mailto:[email protected]
于2019年8月22日周四 下午8:42写道:
Hi everyone,
In some scenarios we met a requirement that some operators
want
to
send records to theirs downstream operators with an multicast
communication
pattern. In detail, for some records, the operators want to send
them
according to the partitioner (for example, Rebalance), and for
some
other
records, the operators want to send them to all the connected
operators and
tasks. Such a communication pattern could be viewed as a kind of
multicast:
it does not broadcast every record, but some record will indeed
be
sent to
multiple downstream operators.
However, we found that this kind of communication pattern seems
could
not
be implemented rightly if the operators have multiple consumers
with
different parallelism, using the customized partitioner. To solve
the
above
problem, we propose to enhance the support for such kind of
irregular
communication pattern. We think there may be two options:
1. Support a kind of customized operator events, which share
much
similarity with Watermark, and these events can be broadcasted to
the
downstream operators separately.
2. Let the channel selector supports multicast, and also add
the
separate RecordWriter implementation to avoid impacting the
performance of
the channel selector that does not need multicast.
The problem and options are detailed in
https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
<
https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
<
https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
<
https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
We are also wondering if there are other methods to implement
this
requirement with or without changing Runtime. Very thanks for any
feedbacks
!
Best,
Yun