Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152512614
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
    @@ -301,81 +306,388 @@ public void testProducerFailedException() throws 
Exception {
        }
     
        /**
    -    * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
    -    * recycled to available buffers directly and it triggers notify of 
announced credit.
    +    * Tests to verify that the input channel requests floating buffers 
from buffer pool
    +    * in order to maintain backlog + initialCredit buffers available once 
receiving the
    +    * sender's backlog, and registers as listener if no floating buffers 
available.
         */
        @Test
    -   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
    -           final SingleInputGate inputGate = mock(SingleInputGate.class);
    -           final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
    +   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
    +           // Setup
    +           final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
    +           final SingleInputGate inputGate = createSingleInputGate();
    +           final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
    +           try {
    +                   final int numFloatingBuffers = 10;
    +                   final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
    +                   inputGate.setBufferPool(bufferPool);
    +
    +                   // Assign exclusive segments to the channel
    +                   final int numExclusiveBuffers = 2;
    +                   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
    +                   inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
    +
    +                   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
    +                           numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
     
    -           // Recycle exclusive segment
    -           
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
    +                   // Receive the producer's backlog
    +                   inputChannel.onSenderBacklog(8);
     
    -           assertEquals("There should be one buffer available after 
recycle.",
    -                   1, inputChannel.getNumberOfAvailableBuffers());
    -           verify(inputChannel, times(1)).notifyCreditAvailable();
    +                   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
    +                   verify(bufferPool, times(8)).requestBuffer();
    +                   verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
    +                   assertEquals("There should be 10 buffers available in 
the channel",
    +                           10, inputChannel.getNumberOfAvailableBuffers());
     
    -           
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
    +                   inputChannel.onSenderBacklog(11);
     
    -           assertEquals("There should be two buffers available after 
recycle.",
    -                   2, inputChannel.getNumberOfAvailableBuffers());
    -           // It should be called only once when increased from zero.
    -           verify(inputChannel, times(1)).notifyCreditAvailable();
    +                   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
    +                   verify(bufferPool, times(11)).requestBuffer();
    +                   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
    +                   assertEquals("There should be 12 buffers available in 
the channel",
    +                           12, inputChannel.getNumberOfAvailableBuffers());
    +
    +                   inputChannel.onSenderBacklog(12);
    +
    +                   // Already in the status of waiting for buffers and 
will not request any more
    +                   verify(bufferPool, times(11)).requestBuffer();
    +                   verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
    +
    --- End diff --
    
    Actually I tried to test the two logics in two separate tests 
`testRequestFloatingBufferOnSenderBacklog` and 
`testFairDistributionFloatingBuffers`.
    
    For `testRequestFloatingBufferOnSenderBacklog`, it only wants to verify the 
request logic on input channel side.  The key point is that the input channel 
will not request repeated if it is already as listener in pool.
    
    For 'testFairDistributionFloatingBuffers`, it only wants to verify that the 
input channel listener is getting buffer fairly during buffer `recycle()` on 
`bufferPool` side. I think it can cover the comment you mentioned "Can you also 
verify the behaviour when the buffers become available?" I will further check 
that later.
    
    I agree with that we missed the tests to verify that we stick to 
`senderBacklog + initialCredit` in different processes. And I will add them 
later.


---

Reply via email to