Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152856498 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** - * Tests to verify that the input channel requests floating buffers from buffer pool - * in order to maintain backlog + initialCredit buffers available once receiving the - * sender's backlog, and registers as listener if no floating buffers available. + * Tests to verify that the input channel requests floating buffers from buffer pool for + * maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. + * + * <p>Verifies the logic of recycling floating buffer back into the input channel and the logic + * of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 12 buffers available in the channel", - 12, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); - inputChannel.onSenderBacklog(12); + // Continue increasing the backlog + inputChannel.onSenderBacklog(11); - // Already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(11)).requestBuffer(); + // The channel is already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Recycle the floating buffer and assign it to the buffer listener + floatingBuffer1.recycle(); + + // The channel is still waiting for one more floating buffer + assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Recycle one more floating buffer again + floatingBuffer2.recycle(); + + // The channel already gets all the required buffers + assertEquals("There should be 13 buffers available in the channel", 13, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Decrease the backlog and recycle one exclusive buffer --- End diff -- please make this two separate tests, i.e. check the invariants after decreasing the backlog size (nothing should change here yet) and once again after recycling an exclusive buffer (possibly integrated with the tests I requested above)
---