gaoyunhaii commented on a change in pull request #15192:
URL: https://github.com/apache/flink/pull/15192#discussion_r594938336



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -70,15 +81,18 @@
     /** Number of data buffers (excluding events) written for each 
subpartition. */
     private final int[] numDataBuffers;
 
-    /** A piece of unmanaged memory for data writing. */
-    private final MemorySegment writeBuffer;
+    /** Buffers cut from the network buffer pool for data writing. */
+    private final List<MemorySegment> writeBuffers = new ArrayList<>();

Review comment:
       @GuardBy("lock") ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException {
         if (currentSortBuffer.hasRemaining()) {
             fileWriter.startNewRegion();
 
+            List<BufferWithChannel> toWrite = new ArrayList<>();
+            Queue<MemorySegment> segments = getWriteBuffers();
+
             while (currentSortBuffer.hasRemaining()) {
-                BufferWithChannel bufferWithChannel =
-                        currentSortBuffer.copyIntoSegment(writeBuffer);
-                Buffer buffer = bufferWithChannel.getBuffer();
-                int subpartitionIndex = bufferWithChannel.getChannelIndex();
+                if (segments.isEmpty()) {
+                    fileWriter.writeBuffers(toWrite);
+                    toWrite.clear();
+                    segments = getWriteBuffers();
+                }
 
-                writeCompressedBufferIfPossible(buffer, subpartitionIndex);
+                BufferWithChannel bufferWithChannel =
+                        
currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll()));
+                toWrite.add(compressBufferIfPossible(bufferWithChannel));
             }
+
+            fileWriter.writeBuffers(toWrite);
         }
 
         currentSortBuffer.release();
     }
 
-    private void writeCompressedBufferIfPossible(Buffer buffer, int 
targetSubpartition)
-            throws IOException {
-        updateStatistics(buffer, targetSubpartition);
+    private Queue<MemorySegment> getWriteBuffers() {
+        synchronized (lock) {
+            checkState(!writeBuffers.isEmpty(), "Task has been canceled.");
+            return new ArrayDeque<>(writeBuffers);
+        }
+    }
 
-        try {
-            if (canBeCompressed(buffer)) {
-                buffer = bufferCompressor.compressToIntermediateBuffer(buffer);
-            }
-            fileWriter.writeBuffer(buffer, targetSubpartition);
-        } finally {
-            buffer.recycleBuffer();
+    private BufferWithChannel compressBufferIfPossible(BufferWithChannel 
bufferWithChannel) {
+        Buffer buffer = bufferWithChannel.getBuffer();
+        int channelIndex = bufferWithChannel.getChannelIndex();
+
+        updateStatistics(buffer, channelIndex);

Review comment:
       After the modification, `updateStatistics` method called here seems to 
be a bit inconsistent with the name of this method. We might move the 
statistics out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -289,8 +374,20 @@ public void finish() throws IOException {
         super.finish();
     }
 
+    private void releaseWriteBuffers() {
+        synchronized (lock) {
+            if (bufferPool != null) {
+                for (MemorySegment segment : writeBuffers) {
+                    bufferPool.recycle(segment);
+                }
+                writeBuffers.clear();
+            }
+        }
+    }
+
     @Override
     public void close() {
+        releaseWriteBuffers();
         releaseCurrentSortBuffer();

Review comment:
       Although not fully related with this PR, currently the sort buffer seems 
to have different treatment with the write buffers on lock. We might add a 
comment on there would always be a close() call from the main thread or we 
might change currentSortBuffer to be volatile.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to