curcur commented on a change in pull request #13614: URL: https://github.com/apache/flink/pull/13614#discussion_r505183102
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -211,46 +225,62 @@ protected void releaseInternal() { } } - private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException { - final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition]; - if (bufferBuilder != null) { - return bufferBuilder; - } + private BufferBuilder getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws IOException { + final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition); + subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), 0); - return getNewSubpartitionBufferBuilder(targetSubpartition); + return bufferBuilder; } - private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException { + private BufferBuilder getNewEmptySubpartitionBufferBuilderForRecordContinuation( + final ByteBuffer remainingRecordBytes, + final int targetSubpartition) throws IOException { + final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition); + final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes); + subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), partialRecordBytes); + + return bufferBuilder; + } + + private BufferBuilder requestNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException { checkInProduceState(); ensureUnicastMode(); - final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition); - subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer()); subpartitionBufferBuilders[targetSubpartition] = bufferBuilder; + return bufferBuilder; } - private BufferBuilder getBroadcastBufferBuilder() throws IOException { - if (broadcastBufferBuilder != null) { - return broadcastBufferBuilder; + private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord() throws IOException { + final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder(); + try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.add(consumer.copy(), 0); + } } - return getNewBroadcastBufferBuilder(); + return bufferBuilder; } - private BufferBuilder getNewBroadcastBufferBuilder() throws IOException { + private BufferBuilder getNewEmptyBroadcastBufferBuilderForRecordContinuation( + final ByteBuffer remainingRecordBytes) throws IOException { + final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder(); + final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes); + try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.add(consumer.copy(), partialRecordBytes); Review comment: I think that's roughly the same (from the complexity point of view). The only difference is easier to differentiate when remainingRecordLength == buffer.size(), but I think that's not that much big difference. I agree the original code is a bit duplicated. If that's the concern, I've rewritten the code (literally just rewrite), to see whether it looks much better now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org