Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152969860 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque<Buffer> floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 14 buffers required in the channel", + 14, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); - // Recycle the floating buffer and assign it to the buffer listener - floatingBuffer1.recycle(); + // Recycle one floating buffer + floatingBufferQueue.poll().recycle(); - // The channel is still waiting for one more floating buffer - assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + // Assign the floating buffer to the listener and the channel is still waiting for more floating buffers + assertEquals("There should be 12 buffers available in the channel", --- End diff -- we should also add ``` verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); ``` here and into all blocks of checks below to ensure that the behaviour is as expected
---