Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152857580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** - * Tests to verify that the input channel requests floating buffers from buffer pool - * in order to maintain backlog + initialCredit buffers available once receiving the - * sender's backlog, and registers as listener if no floating buffers available. + * Tests to verify that the input channel requests floating buffers from buffer pool for + * maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. + * + * <p>Verifies the logic of recycling floating buffer back into the input channel and the logic + * of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 12 buffers available in the channel", - 12, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); - inputChannel.onSenderBacklog(12); + // Continue increasing the backlog + inputChannel.onSenderBacklog(11); - // Already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(11)).requestBuffer(); + // The channel is already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Recycle the floating buffer and assign it to the buffer listener + floatingBuffer1.recycle(); + + // The channel is still waiting for one more floating buffer + assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Recycle one more floating buffer again + floatingBuffer2.recycle(); + + // The channel already gets all the required buffers + assertEquals("There should be 13 buffers available in the channel", 13, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + --- End diff -- Whenever `inputChannel.getNumberOfAvailableBuffers()` reaches `inputChannel.getNumberOfRequiredBuffers()`, we should not listen to / add new buffers. We should verify this behaviour and it could result from three things: 1. enough floating buffers become available 2. enough exclusive buffers become available 3. the sender backlog decreases We can verify that we do not listen to new buffers anymore indirectly by recycling yet another floating buffer and verifying that `inputChannel.getNumberOfAvailableBuffers()` does not change. Can you add these two additional checks?
---