gaoyunhaii commented on a change in pull request #15259: URL: https://github.com/apache/flink/pull/15259#discussion_r602998872
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -94,11 +94,17 @@ */ private final SortMergeResultPartitionReadScheduler readScheduler; - /** Number of guaranteed network buffers can be used by {@link #currentSortBuffer}. */ + /** + * Number of guaranteed network buffers can be used by {@link #unicastSortBuffer} and {@link + * #broadcastSortBuffer}. + */ private int numBuffersForSort; - /** Current {@link SortBuffer} to append records to. */ - private SortBuffer currentSortBuffer; + /** {@link SortBuffer} for records sent by {@link #broadcastRecord(ByteBuffer)}. */ + private SortBuffer broadcastSortBuffer; + + /** {@link SortBuffer} for records sent by {@link #emitRecord(ByteBuffer, int)}. */ + private SortBuffer unicastSortBuffer; Review comment: It seems in the following logic two sort-buffers has no difference ? Thus we may not need to use two separate buffers, we could still use only one `currentBuffer`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -389,8 +418,9 @@ private void releaseWriteBuffers() { public void close() { releaseWriteBuffers(); // the close method will be always called by the task thread, so there is need to make - // the currentSortBuffer filed volatile and visible to the cancel thread intermediately - releaseCurrentSortBuffer(); + // the sort buffer filed volatile and visible to the cancel thread intermediately Review comment: filed -> field ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java ########## @@ -86,6 +86,13 @@ /** Current subpartition to write buffers to. */ private int currentSubpartition = -1; + /** + * Broadcast region is an optimization for the broadcast partition which writes the same data to + * all subpartitions. For a broadcast region, data is only written once and the indexes of all + * subpartitions point to the same offset in the data file. + */ + private boolean isBroadCastRegion; Review comment: Might unified as `isBroadcastRegion`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java ########## @@ -205,32 +215,70 @@ public void writeBuffers(List<BufferWithChannel> bufferWithChannels) throws IOEx return; } - long expectedBytes = 0; + long expectedBytes; ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithChannels.size()]; + if (isBroadCastRegion) { + expectedBytes = writeBroadcastBuffers(bufferWithChannels, bufferWithHeaders); + } else { + expectedBytes = writeUnicastBuffers(bufferWithChannels, bufferWithHeaders); + } + + BufferReaderWriterUtil.writeBuffers(dataFileChannel, expectedBytes, bufferWithHeaders); + } + + private long writeUnicastBuffers( + List<BufferWithChannel> bufferWithChannels, ByteBuffer[] bufferWithHeaders) { + long expectedBytes = 0; for (int i = 0; i < bufferWithChannels.size(); i++) { - BufferWithChannel bufferWithChannel = bufferWithChannels.get(i); - Buffer buffer = bufferWithChannel.getBuffer(); - int subpartitionIndex = bufferWithChannel.getChannelIndex(); - if (subpartitionIndex != currentSubpartition) { + int subpartition = bufferWithChannels.get(i).getChannelIndex(); + if (subpartition != currentSubpartition) { checkState( - subpartitionBuffers[subpartitionIndex] == 0, + subpartitionBuffers[subpartition] == 0, "Must write data of the same channel together."); - subpartitionOffsets[subpartitionIndex] = totalBytesWritten; - currentSubpartition = subpartitionIndex; + subpartitionOffsets[subpartition] = totalBytesWritten; + currentSubpartition = subpartition; } - ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer(); - BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, header); - bufferWithHeaders[2 * i] = header; - bufferWithHeaders[2 * i + 1] = buffer.getNioBufferReadable(); + Buffer buffer = bufferWithChannels.get(i).getBuffer(); + int numBytes = setBufferWithHeader(buffer, bufferWithHeaders, 2 * i); + expectedBytes += numBytes; + totalBytesWritten += numBytes; Review comment: Might we move `totalBytesWritten` update outside the two methods ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java ########## @@ -205,32 +215,70 @@ public void writeBuffers(List<BufferWithChannel> bufferWithChannels) throws IOEx return; } - long expectedBytes = 0; + long expectedBytes; ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithChannels.size()]; + if (isBroadCastRegion) { + expectedBytes = writeBroadcastBuffers(bufferWithChannels, bufferWithHeaders); Review comment: This two methods might need to change the names since it does not do the write. Perhaps change to something like `collectBroadcastBuffers` and `collectUnicastBuffers` ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java ########## @@ -205,32 +215,70 @@ public void writeBuffers(List<BufferWithChannel> bufferWithChannels) throws IOEx return; } - long expectedBytes = 0; + long expectedBytes; ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithChannels.size()]; + if (isBroadCastRegion) { + expectedBytes = writeBroadcastBuffers(bufferWithChannels, bufferWithHeaders); + } else { + expectedBytes = writeUnicastBuffers(bufferWithChannels, bufferWithHeaders); + } + + BufferReaderWriterUtil.writeBuffers(dataFileChannel, expectedBytes, bufferWithHeaders); + } + + private long writeUnicastBuffers( + List<BufferWithChannel> bufferWithChannels, ByteBuffer[] bufferWithHeaders) { + long expectedBytes = 0; for (int i = 0; i < bufferWithChannels.size(); i++) { - BufferWithChannel bufferWithChannel = bufferWithChannels.get(i); - Buffer buffer = bufferWithChannel.getBuffer(); - int subpartitionIndex = bufferWithChannel.getChannelIndex(); - if (subpartitionIndex != currentSubpartition) { + int subpartition = bufferWithChannels.get(i).getChannelIndex(); + if (subpartition != currentSubpartition) { checkState( - subpartitionBuffers[subpartitionIndex] == 0, + subpartitionBuffers[subpartition] == 0, "Must write data of the same channel together."); - subpartitionOffsets[subpartitionIndex] = totalBytesWritten; - currentSubpartition = subpartitionIndex; + subpartitionOffsets[subpartition] = totalBytesWritten; + currentSubpartition = subpartition; } - ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer(); - BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, header); - bufferWithHeaders[2 * i] = header; - bufferWithHeaders[2 * i + 1] = buffer.getNioBufferReadable(); + Buffer buffer = bufferWithChannels.get(i).getBuffer(); + int numBytes = setBufferWithHeader(buffer, bufferWithHeaders, 2 * i); + expectedBytes += numBytes; + totalBytesWritten += numBytes; + ++subpartitionBuffers[subpartition]; + } + return expectedBytes; + } + + private long writeBroadcastBuffers( + List<BufferWithChannel> bufferWithChannels, ByteBuffer[] bufferWithHeaders) { + if (subpartitionBuffers[0] == 0) { Review comment: Maybe we could add some comment here to explain we only need to set the offset on first call ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org