Hi Piotr,

Thanks for your replies and professional suggestions!

My initial thought is just as you said in first suggestion. The current 
RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or 
broadcast events/watermark to all subpartitions directly.
If the ChannelSelector implementation is BroadcastPartitioner, then we can 
create a specialized BroadcastRecordWriter to handle the 'emit', 
'broadcastEmit', 'broadcastEvent', etc.
To make it seems not tricky, I want to abstract the RecordWriter as a plugin, 
then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately 
to extend abstract RecordWriter. That means we divide the RecordWriter by 
ChannelSelector, and also we may remove current StreamRecordWriter to uniform 
the RecordWriter criteria in both stream and batch mode.

Considering specific implementations, I think one RecordSerializer can work for 
both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition 
is making the RecordSerializer has no internal state, so we have to remove the 
BufferBuilder variable from SpanningRecordSerializer and pass it via 
addRecord/continueWritingWithNextBufferBuilder
 methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
maintain one BufferBuilder per subpartition.

Another issue is whether this improvement is suitable for 
broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 
2,3. I wonder it may decrease the buffer utilization if switch between 
broadcast and non-broadcast modes, even it may seem more tricky in 
implementation. I am still thinking of it.

Maybe we can implement the improvement for BroadcastPartitioner in first step 
and make sure one RecordSerializer for all subpartitions. That can reduce the 
memory overhead in RecordSerializer and the time cost in broadcast 
serialization scenarios.

Best,

Zhijiang


------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年7月17日(星期二) 23:31
收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com>
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi

Generally speaking this would be a nice optimisation, however it might be 
tricky to implement. The thing to keep in mind is that currently interface 
allow to interleave broadcasting and normal sending, because of that at any 
given time some serialisers can have more data then others. For example when we 
have two output channels and we are looping following writes:

Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
(…)

Thus buffers of different channels can fill out with different rates.

> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.

The problem here is that after records serialising, the only unit that can be 
referenced afterwards is “Buffer”. So that would leave us now with couple of 
options:

1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, 
guaranteeing that all channels always receive the same data. Here you could 
serialise records only once, to one BufferBuilder that could be shared and 
referenced by multiple BufferConsumers from different channels. Any non 
broadcast write would have to fail.

2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. 
for broadcasts, but for any non broadcast write: finish current broadcasting 
BufferBuilder, flush all data on all channels, serialise single record to 
single channel using newly create BufferBuilder and also immediately 
finish/flush it, so that any subsequent broadcasts will work again as in 1.:

3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
modes. It would have two modes of operating that could be switched back and 
forth: the same as currently implemented for non-broadcasted and optimised 
broadcast mode

Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Write sth to X Channel // this flushes all channels and clears/finishes 
previous BufferBuilder 
Write sth to Y Channel
Write sth to Y Channel
Write sth to Y Channel
Write sth to X Channel 
Broadcast to all channels // this flushes all channels and clears/finishes 
previous BufferBuilders, 
Broadcast to all channels
Broadcast to all channels
(…)

However both in 2. and 3. there would be very big penalty for mixing broadcast 
with normal writes.  

Piotrek

> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
> <wangzhijiang...@aliyun.com.INVALID> wrote:
> 
> Hi all,
> 
> In current implementation, the RecordSerializer is created separately for 
> each subpartition in RecordWriter, that means the number of serializers 
> equals to the number of subpartitions.
> For broadcast partitioner, every record will be serialized many times in all 
> the subpartitions, and this may bring bad performance to some extent.
> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.
> 
> To do so, I propose the following changes:
> 1. Create and maintain only one serializer in RecordWriter, and it will 
> serialize the record for all the subpartitions. It makes sense for any 
> partitioners, and the memory overhead can be also decreased, because every 
> serializer will maintain some separate byte buffers internally.
> 2. Maybe we can abstract the RecordWriter as a base class used for other 
> partitioner mode and implement a BroadcastRecordWriter for 
> BroadcastPartitioner. And this new implementation will add buffer references 
> based on the number of subpartitions before adding into subpartition queue.
> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
> RecordWriter, then the uniform RecordWriter can be used for both stream and 
> batch. The above BroadcastRecordWriter can aslo uniform for both stream and 
> batch.
> 
> I am not sure whether this improvement is proposed before and what do you 
> think of it?
> If necessary I can create JIRAs to contirbute it, and may need one commiter 
> cooperate with me.
> 
> Best,
> 
> Zhijiang

Reply via email to