[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265295#comment-16265295 ]
ASF GitHub Bot commented on FLINK-7406: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152963546 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque<Buffer> floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 14 buffers required in the channel", + 14, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); - // Recycle the floating buffer and assign it to the buffer listener - floatingBuffer1.recycle(); + // Recycle one floating buffer + floatingBufferQueue.poll().recycle(); - // The channel is still waiting for one more floating buffer - assertEquals("There should be 12 buffers available in the channel", 12, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + // Assign the floating buffer to the listener and the channel is still waiting for more floating buffers + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 14 buffers required in the channel", + 14, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); - // Recycle one more floating buffer again - floatingBuffer2.recycle(); + // Recycle one more floating buffer + floatingBufferQueue.poll().recycle(); - // The channel already gets all the required buffers - assertEquals("There should be 13 buffers available in the channel", 13, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + // Assign the floating buffer to the listener and the channel is still waiting for more floating buffers + assertEquals("There should be 13 buffers available in the channel", + 13, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 14 buffers required in the channel", + 14, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); - // Decrease the backlog and recycle one exclusive buffer + // Decrease the backlog inputChannel.onSenderBacklog(10); + + // The number of available buffers is already more than required buffers, but the channel is still + // in the status of waiting for buffers on local pool side + assertEquals("There should be 13 buffers available in the channel", + 13, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); + + // Recycle one more floating buffer + floatingBufferQueue.poll().recycle(); + + // Assign the floating buffer to the listener, but the channel will return this buffer immediately to the + // local pool because it has already enough available buffers, and will not waiting for buffers any more --- End diff -- `it already has` and `will not wait` > Implement Netty receiver incoming pipeline for credit-based > ----------------------------------------------------------- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)