Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141291264 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -122,6 +113,28 @@ public void testReceiveEmptyBuffer() throws Exception { } /** + * Verifies that {@link RemoteInputChannel#onSenderBacklog(int)} is called when a + * {@link BufferResponse} is received. + */ + @Test + public void testReceiveBacklog() throws Exception { + final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); + when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID()); + when(inputChannel.requestBuffer()).thenReturn(TestBufferFactory.createBuffer()); + + final int backlog = 10; + final BufferResponse receivedBuffer = createBufferResponse( + TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), backlog); + + final CreditBasedClientHandler client = new CreditBasedClientHandler(); + client.addInputChannel(inputChannel); + + client.channelRead(mock(ChannelHandlerContext.class), receivedBuffer); + + verify(inputChannel, times(1)).onSenderBacklog(backlog); --- End diff -- Please also add this check to the `testReceiveEmptyBuffer()` method above.
---