[ 
https://issues.apache.org/jira/browse/FLINK-23457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403171#comment-17403171
 ] 

Anton Kalashnikov commented on FLINK-23457:
-------------------------------------------

[~louzhengyu]. You can take a look at the changes in 
BufferWrittingResultPartition in 
commit(545b6126cc42c5c9bce8783931b04a3bec571aa6). It is implemented for 
unicast(https://issues.apache.org/jira/browse/FLINK-23454). As you can see 
before we send something to a subpartition we change the bufferBuilder size 
according to information from the 
subpartition(BufferWritingResultPartition#addToSubpartition). 

In this task, it needs to implement the same idea only for broadcast. The main 
problem is to choose the right size of bufferBuilder because in the case of 
unicast we have one buffer for one subpartition, but in the case of the 
broadcast, we have one buffer for several subpartitions and each subpartition 
can have a different desirable buffer size. Our current idea for the 
implementation of this task is simple: just picking the smallest size among all 
subpartitions.

> Sending the buffer of the right size for broadcast
> --------------------------------------------------
>
>                 Key: FLINK-23457
>                 URL: https://issues.apache.org/jira/browse/FLINK-23457
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Anton Kalashnikov
>            Priority: Major
>
> It is not enough to know just the number of available buffers (credits) for 
> the downstream because the size of these buffers can be different. So we are 
> proposing to resolve this problem in the following way: If the downstream 
> buffer size is changed then the upstream should send the buffer of the size 
> not greater than the new one regardless of how big the current buffer on the 
> upstream. (pollBuffer should receive parameters like bufferSize and return 
> buffer not greater than it)
> Downstream will be able to support any buffer size < max buffer size, so it 
> should be just good enough to request BufferBuilder with new size after 
> getting announcement, and leaving existing BufferBuilder/BufferConsumers 
> unchanged. In other words code in {{PipelinedSubpartition(View)}} doesn’t 
> need to be changed (apart of forwarding new buffer size to the 
> {{BufferWritingResultPartition}}). All buffer size adjustments can be 
> implemented exclusively in {{BufferWritingResultPartition}}.
> If different downstream subtasks have different throughput and hence 
> different desired buffer sizes, then a single upstream subtask has to support 
> having two different subpartitions with different buffer sizes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to