Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141906648 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws Exception { verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); } + /** + * Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request + * floating buffers once receiving the producer's backlog. + */ + @Test + public void testRequestFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Receive the producer's backlog + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10); + // Need to request 10 floating buffers from buffer pool + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8); + // No need to request extra floating buffers from pool because + // there are already 10 available buffers in queue now + verify(bufferPool, times(10)).requestBuffer(); + + inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11); + // Need to request another floating buffer from pool + verify(bufferPool, times(11)).requestBuffer(); + } + + /** + * Tests {@link BufferPool#requestBuffer()}, verifying the remote input channel tries to request + * floating buffers once receiving the producer's backlog. And it requests from pool only once + * and registers as listener if there are currently no available buffers in the pool. + */ + @Test + public void testWaitForFloatingBuffersOnBuffer() throws Exception { + // Setup + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); --- End diff -- How about using the real `NetworkBufferPool#createBufferPool()` here with a limited set of buffers? Then you could start retrieving buffers as in the test method above and continue with verifying the expected behaviour in case the buffer limit was reached (no need for two test methods, I guess). I'd prefer this over a mock so that you can also verify the interaction with the real methods such as `addBufferListener()`.
---