TanYuxin-tyx commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1472451043


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -37,8 +37,16 @@ public class TieredStorageConfiguration {
 
     private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null;
 
+    private static final boolean DEFAULT_MEMORY_DECOUPLING_ENABLED = false;

Review Comment:
   By default, the memory decoupling switch is false. When do we enable it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java:
##########
@@ -39,6 +40,10 @@
  */
 public class NettyShuffleUtils {
 
+    // Temporarily declare the default value here, it would be moved to the 
configuration class
+    // later.

Review Comment:
   We should use `/** */` instead here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -335,19 +338,22 @@ private MemorySegment requestBufferBlockingFromPool() {
     }
 
     /** @return a memory segment from the internal buffer queue. */
-    private MemorySegment requestBufferBlockingFromQueue() {
+    @Nullable
+    private MemorySegment requestBufferFromQueue() {
         CompletableFuture<Void> requestBufferFuture = new 
CompletableFuture<>();
         scheduleCheckRequestBufferFuture(
                 requestBufferFuture, 
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
 
+        hardBackpressureTimerGauge.markStart();

Review Comment:
   Marking the start and end in the loop can affect the real value of the 
`hardBackpressureTimerGauge`(e.g.,  `inpreviousMaxSingleMeasurement` in 
`TimerGauge`).  
   
   The marking start can be moved to the location before starting the `while 
loop`, and the marking end can be moved to the location where the loop ends.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -132,16 +134,44 @@ private boolean shouldContinueRequest(BufferPool 
bufferPool) {
         }
     }
 
-    /** Requests exclusive buffers from the provider. */
-    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
-        checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be 
non-negative.");
-        if (numExclusiveBuffers == 0) {
-            return;
+    private void resizeBufferQueue() {
+        SingleInputGate inputGate = inputChannel.inputGate;
+        int currentSize = inputGate.getBufferPool().getNumBuffers();
+        int numRemoteChannels =
+                inputGate.getNumberOfInputChannels() - 
inputGate.getNumberOfLocalInputChannels();
+        if (numRemoteChannels == 0) {
+            numExclusiveBuffers = 0;
+        } else if (currentSize > 1 && currentSize != bufferPoolSize) {
+            int targetExclusivePerChannel =
+                    Math.min(initialCredit, (currentSize - 1) / 
numRemoteChannels);

Review Comment:
   Why do we `currentSize - 1` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortBuffer.java:
##########
@@ -184,6 +184,27 @@ public boolean append(ByteBuffer source, int 
targetSubpartition, Buffer.DataType
         return false;
     }
 
+    /**
+     * Try to release some unused memory segments.
+     *
+     * <p>Note that this class is not thread safe, so please make sure to call 
{@link
+     * #append(ByteBuffer source, int targetSubpartition, Buffer.DataType 
dataType)} and this method
+     * with lock acquired.
+     *
+     * @param numFreeSegments the number of segments to be released.
+     * @return true if released successfully, otherwise false.
+     */
+    public boolean returnFreeSegments(int numFreeSegments) {
+        if (numFreeSegments < numGuaranteedBuffers - segments.size()) {
+            for (int i = 0; i < numFreeSegments; i++) {
+                bufferRecycler.recycle(freeSegments.poll());

Review Comment:
   I have a concern here. Directly polling buffer and recycling it may not be 
right. 
   
   We should ensure that the left buffers are enough when reading buffers from 
the sort buffer. Only when the left buffers are enough for reading, we can 
recycle the buffers safely. (We can record the initial total number of the 
buffers and decide whether to recycle according to the left buffers.)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -208,14 +238,21 @@ private int tryRequestBuffers() {
      */
     @Override
     public void recycle(MemorySegment segment) {
+
         @Nullable Buffer releasedFloatingBuffer = null;
         synchronized (bufferQueue) {
+            resizeBufferQueue();
             try {
+                BufferPool bufferPool = inputChannel.inputGate.getBufferPool();
                 // Similar to notifyBufferAvailable(), make sure that we never 
add a buffer
                 // after channel released all buffers via 
releaseAllResources().
                 if (inputChannel.isReleased()) {
-                    
globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
+                    bufferPool.recycle(segment);
                     return;
+                } else if (inputChannel instanceof RecoveredInputChannel

Review Comment:
   As we discussed, for the recovering process, the buffers should be recycled 
instead of added to the exclusive buffers because the number of exclusive 
buffers is calculated by the number of remote channels.
   If we add these buffers to the local recover input channel wrongly, the 
buffers may be used up.
   
   I'm not sure whether we need to describe the condition reason for 
`inputChannel instanceof RecoveredInputChannel` in more detail.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java:
##########
@@ -275,11 +274,6 @@ protected int getNumberOfQueuedBuffers() {
     }
 
     public Buffer requestBufferBlocking() throws InterruptedException, 
IOException {
-        // not in setup to avoid assigning buffers unnecessarily if there is 
no state

Review Comment:
   For the recovered input channel, do we need this change anymore if we set 
networkBuffersPerChannel to 0?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java:
##########
@@ -132,16 +134,44 @@ private boolean shouldContinueRequest(BufferPool 
bufferPool) {
         }
     }
 
-    /** Requests exclusive buffers from the provider. */
-    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
-        checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be 
non-negative.");
-        if (numExclusiveBuffers == 0) {
-            return;
+    private void resizeBufferQueue() {
+        SingleInputGate inputGate = inputChannel.inputGate;
+        int currentSize = inputGate.getBufferPool().getNumBuffers();
+        int numRemoteChannels =
+                inputGate.getNumberOfInputChannels() - 
inputGate.getNumberOfLocalInputChannels();
+        if (numRemoteChannels == 0) {
+            numExclusiveBuffers = 0;
+        } else if (currentSize > 1 && currentSize != bufferPoolSize) {
+            int targetExclusivePerChannel =

Review Comment:
   is `targetExclusivePerChannel` useless?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -244,18 +246,8 @@ public int getMaxNonReclaimableBuffers(Object owner) {
     public boolean ensureCapacity(int numAdditionalBuffers) {
         checkIsInitialized();
 
-        final int numRequestedByGuaranteedReclaimableOwners =

Review Comment:
   Is this a bug fix?  We'd better separate it as a hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +605,133 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
-
-        long totalCapacity = 0; // long to avoid int overflow

Review Comment:
   The total redistributing process is a little complex and hard to understand. 
We'd better add detailed descriptions here to show that process. In addition, 
we should add more tests to cover the corner cases.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -79,7 +79,9 @@ public TieredStorageProducerClient(
         this.currentSubpartitionTierAgent = new 
TierProducerAgent[numSubpartitions];
 
         Arrays.fill(currentSubpartitionSegmentId, -1);
+    }
 

Review Comment:
   is `setUp` useless?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to