Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154319502 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -108,17 +117,75 @@ public void testReceiveEmptyBuffer() throws Exception { final Buffer emptyBuffer = TestBufferFactory.createBuffer(); emptyBuffer.setSize(0); + final int backlog = 2; final BufferResponse receivedBuffer = createBufferResponse( - emptyBuffer, 0, inputChannel.getInputChannelId()); + emptyBuffer, 0, inputChannel.getInputChannelId(), backlog); - final PartitionRequestClientHandler client = new PartitionRequestClientHandler(); + final CreditBasedClientHandler client = new CreditBasedClientHandler(); client.addInputChannel(inputChannel); // Read the empty buffer client.channelRead(mock(ChannelHandlerContext.class), receivedBuffer); // This should not throw an exception verify(inputChannel, never()).onError(any(Throwable.class)); + verify(inputChannel, times(1)).onEmptyBuffer(0, backlog); + } + + /** + * Verifies that {@link RemoteInputChannel#onBuffer(Buffer, int, int)} is called when a + * {@link BufferResponse} is received. + */ + @Test + public void testReceiveBuffer() throws Exception { + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel); + try { + final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + inputGate.setBufferPool(bufferPool); + inputGate.assignExclusiveSegments(networkBufferPool, 2); + + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + handler.addInputChannel(inputChannel); + + final int backlog = 2; + final BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), backlog); + handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse); + + verify(inputChannel, times(1)).onBuffer(any(Buffer.class), anyInt(), anyInt()); + verify(inputChannel, times(1)).onSenderBacklog(backlog); --- End diff -- If you used `RemoteInputChannel#getNumberOfRequiredBuffers()` here to check the backlog's value instead, we could make the `onSenderBacklog()` method package-private again and remove the `@VisibleForTesting`
---