gaoyunhaii commented on a change in pull request #15192: URL: https://github.com/apache/flink/pull/15192#discussion_r594938336
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -70,15 +81,18 @@ /** Number of data buffers (excluding events) written for each subpartition. */ private final int[] numDataBuffers; - /** A piece of unmanaged memory for data writing. */ - private final MemorySegment writeBuffer; + /** Buffers cut from the network buffer pool for data writing. */ + private final List<MemorySegment> writeBuffers = new ArrayList<>(); Review comment: @GuardBy("lock") ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException { if (currentSortBuffer.hasRemaining()) { fileWriter.startNewRegion(); + List<BufferWithChannel> toWrite = new ArrayList<>(); + Queue<MemorySegment> segments = getWriteBuffers(); + while (currentSortBuffer.hasRemaining()) { - BufferWithChannel bufferWithChannel = - currentSortBuffer.copyIntoSegment(writeBuffer); - Buffer buffer = bufferWithChannel.getBuffer(); - int subpartitionIndex = bufferWithChannel.getChannelIndex(); + if (segments.isEmpty()) { + fileWriter.writeBuffers(toWrite); + toWrite.clear(); + segments = getWriteBuffers(); + } - writeCompressedBufferIfPossible(buffer, subpartitionIndex); + BufferWithChannel bufferWithChannel = + currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll())); + toWrite.add(compressBufferIfPossible(bufferWithChannel)); } + + fileWriter.writeBuffers(toWrite); } currentSortBuffer.release(); } - private void writeCompressedBufferIfPossible(Buffer buffer, int targetSubpartition) - throws IOException { - updateStatistics(buffer, targetSubpartition); + private Queue<MemorySegment> getWriteBuffers() { + synchronized (lock) { + checkState(!writeBuffers.isEmpty(), "Task has been canceled."); + return new ArrayDeque<>(writeBuffers); + } + } - try { - if (canBeCompressed(buffer)) { - buffer = bufferCompressor.compressToIntermediateBuffer(buffer); - } - fileWriter.writeBuffer(buffer, targetSubpartition); - } finally { - buffer.recycleBuffer(); + private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel) { + Buffer buffer = bufferWithChannel.getBuffer(); + int channelIndex = bufferWithChannel.getChannelIndex(); + + updateStatistics(buffer, channelIndex); Review comment: After the modification, `updateStatistics` method called here seems to be a bit inconsistent with the name of this method. We might move the statistics out. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -289,8 +374,20 @@ public void finish() throws IOException { super.finish(); } + private void releaseWriteBuffers() { + synchronized (lock) { + if (bufferPool != null) { + for (MemorySegment segment : writeBuffers) { + bufferPool.recycle(segment); + } + writeBuffers.clear(); + } + } + } + @Override public void close() { + releaseWriteBuffers(); releaseCurrentSortBuffer(); Review comment: Although not fully related with this PR, currently the sort buffer seems to have different treatment with the write buffers on lock. We might add a comment on there would always be a close() call from the main thread or we might change currentSortBuffer to be volatile. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org