Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152963546
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
    @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
                        // Prepare the exclusive and floating buffers to verify 
recycle logic later
                        Buffer exclusiveBuffer = inputChannel.requestBuffer();
                        assertNotNull(exclusiveBuffer);
    -                   Buffer floatingBuffer1 = bufferPool.requestBuffer();
    -                   assertNotNull(floatingBuffer1);
    -                   Buffer floatingBuffer2 = bufferPool.requestBuffer();
    -                   assertNotNull(floatingBuffer2);
    +
    +                   final int numRecycleFloatingBuffers = 4;
    +                   final ArrayDeque<Buffer> floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
    +                   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
    +                           Buffer floatingBuffer = 
bufferPool.requestBuffer();
    +                           assertNotNull(floatingBuffer);
    +                           floatingBufferQueue.add(floatingBuffer);
    +                   }
     
                        // Receive the producer's backlog less than the number 
of available floating buffers
                        inputChannel.onSenderBacklog(8);
     
    -                   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
    -                   verify(bufferPool, times(11)).requestBuffer();
    +                   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers.
    +                   // One exclusive buffer is taken before, so we should 
request 13 floating buffers.
    +                   verify(bufferPool, times(13)).requestBuffer();
                        verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
    -                   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 10 buffers available in 
the channel",
    +                           10, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 10 buffers required in 
the channel",
    +                           10, inputChannel.getNumberOfRequiredBuffers());
     
                        // Increase the backlog to exceed the number of 
available floating buffers
                        inputChannel.onSenderBacklog(10);
     
                        // The channel does not get enough floating buffer and 
register as buffer listener
    -                   verify(bufferPool, times(13)).requestBuffer();
    +                   verify(bufferPool, times(15)).requestBuffer();
                        verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
    -                   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 12 buffers required in 
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
    -                   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +                   assertEquals("There should be 11 buffers available in 
the channel",
    +                           11, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 12 buffers required in 
the channel",
    +                           12, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    +                           0, 
bufferPool.getNumberOfAvailableMemorySegments());
     
                        // Continue increasing the backlog
    -                   inputChannel.onSenderBacklog(11);
    +                   inputChannel.onSenderBacklog(12);
     
                        // The channel is already in the status of waiting for 
buffers and will not request any more
    -                   verify(bufferPool, times(13)).requestBuffer();
    +                   verify(bufferPool, times(15)).requestBuffer();
                        verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
    -                   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 13 buffers required in 
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    -                   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +                   assertEquals("There should be 11 buffers available in 
the channel",
    +                           11, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 14 buffers required in 
the channel",
    +                           14, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    +                           0, 
bufferPool.getNumberOfAvailableMemorySegments());
     
    -                   // Recycle the floating buffer and assign it to the 
buffer listener
    -                   floatingBuffer1.recycle();
    +                   // Recycle one floating buffer
    +                   floatingBufferQueue.poll().recycle();
     
    -                   // The channel is still waiting for one more floating 
buffer
    -                   assertEquals("There should be 12 buffers available in 
the channel", 12, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 13 buffers required in 
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    -                   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +                   // Assign the floating buffer to the listener and the 
channel is still waiting for more floating buffers
    +                   assertEquals("There should be 12 buffers available in 
the channel",
    +                           12, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 14 buffers required in 
the channel",
    +                           14, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    +                           0, 
bufferPool.getNumberOfAvailableMemorySegments());
     
    -                   // Recycle one more floating buffer again
    -                   floatingBuffer2.recycle();
    +                   // Recycle one more floating buffer
    +                   floatingBufferQueue.poll().recycle();
     
    -                   // The channel already gets all the required buffers
    -                   assertEquals("There should be 13 buffers available in 
the channel", 13, inputChannel.getNumberOfAvailableBuffers());
    -                   assertEquals("There should be 13 buffers required in 
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
    -                   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
    +                   // Assign the floating buffer to the listener and the 
channel is still waiting for more floating buffers
    +                   assertEquals("There should be 13 buffers available in 
the channel",
    +                           13, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 14 buffers required in 
the channel",
    +                           14, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    +                           0, 
bufferPool.getNumberOfAvailableMemorySegments());
     
    -                   // Decrease the backlog and recycle one exclusive buffer
    +                   // Decrease the backlog
                        inputChannel.onSenderBacklog(10);
    +
    +                   // The number of available buffers is already more than 
required buffers, but the channel is still
    +                   // in the status of waiting for buffers on local pool 
side
    +                   assertEquals("There should be 13 buffers available in 
the channel",
    +                           13, inputChannel.getNumberOfAvailableBuffers());
    +                   assertEquals("There should be 12 buffers required in 
the channel",
    +                           12, inputChannel.getNumberOfRequiredBuffers());
    +                   assertEquals("There should be 0 buffer available in 
local pool",
    +                           0, 
bufferPool.getNumberOfAvailableMemorySegments());
    +
    +                   // Recycle one more floating buffer
    +                   floatingBufferQueue.poll().recycle();
    +
    +                   // Assign the floating buffer to the listener, but the 
channel will return this buffer immediately to the
    +                   // local pool because it has already enough available 
buffers, and will not waiting for buffers any more
    --- End diff --
    
    `it already has` and `will not wait`


---

Reply via email to