[ https://issues.apache.org/jira/browse/FLINK-23457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403171#comment-17403171 ]
Anton Kalashnikov commented on FLINK-23457: ------------------------------------------- [~louzhengyu]. You can take a look at the changes in BufferWrittingResultPartition in commit(545b6126cc42c5c9bce8783931b04a3bec571aa6). It is implemented for unicast(https://issues.apache.org/jira/browse/FLINK-23454). As you can see before we send something to a subpartition we change the bufferBuilder size according to information from the subpartition(BufferWritingResultPartition#addToSubpartition). In this task, it needs to implement the same idea only for broadcast. The main problem is to choose the right size of bufferBuilder because in the case of unicast we have one buffer for one subpartition, but in the case of the broadcast, we have one buffer for several subpartitions and each subpartition can have a different desirable buffer size. Our current idea for the implementation of this task is simple: just picking the smallest size among all subpartitions. > Sending the buffer of the right size for broadcast > -------------------------------------------------- > > Key: FLINK-23457 > URL: https://issues.apache.org/jira/browse/FLINK-23457 > Project: Flink > Issue Type: Sub-task > Reporter: Anton Kalashnikov > Priority: Major > > It is not enough to know just the number of available buffers (credits) for > the downstream because the size of these buffers can be different. So we are > proposing to resolve this problem in the following way: If the downstream > buffer size is changed then the upstream should send the buffer of the size > not greater than the new one regardless of how big the current buffer on the > upstream. (pollBuffer should receive parameters like bufferSize and return > buffer not greater than it) > Downstream will be able to support any buffer size < max buffer size, so it > should be just good enough to request BufferBuilder with new size after > getting announcement, and leaving existing BufferBuilder/BufferConsumers > unchanged. In other words code in {{PipelinedSubpartition(View)}} doesn’t > need to be changed (apart of forwarding new buffer size to the > {{BufferWritingResultPartition}}). All buffer size adjustments can be > implemented exclusively in {{BufferWritingResultPartition}}. > If different downstream subtasks have different throughput and hence > different desired buffer sizes, then a single upstream subtask has to support > having two different subpartitions with different buffer sizes. -- This message was sent by Atlassian Jira (v8.3.4#803005)