[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186042#comment-16186042
 ] 

ASF GitHub Bot commented on FLINK-7406:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r141906648
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
    @@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
                verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
        }
     
    +   /**
    +    * Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
    +    * floating buffers once receiving the producer's backlog.
    +    */
    +   @Test
    +   public void testRequestFloatingBuffersOnBuffer() throws Exception {
    +           // Setup
    +           final BufferPool bufferPool = mock(BufferPool.class);
    +           
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
    +
    +           final SingleInputGate inputGate = mock(SingleInputGate.class);
    +           when(inputGate.getBufferPool()).thenReturn(bufferPool);
    +
    +           final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
    +
    +           // Receive the producer's backlog
    +           inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
    +           // Need to request 10 floating buffers from buffer pool
    +           verify(bufferPool, times(10)).requestBuffer();
    +
    +           inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
    +           // No need to request extra floating buffers from pool because
    +           // there are already 10 available buffers in queue now
    +           verify(bufferPool, times(10)).requestBuffer();
    +
    +           inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
    +           // Need to request another floating buffer from pool
    +           verify(bufferPool, times(11)).requestBuffer();
    +   }
    +
    +   /**
    +    * Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
    +    * floating buffers once receiving the producer's backlog. And it 
requests from pool only once
    +    * and registers as listener if there are currently no available 
buffers in the pool.
    +    */
    +   @Test
    +   public void testWaitForFloatingBuffersOnBuffer() throws Exception {
    +           // Setup
    +           final BufferPool bufferPool = mock(BufferPool.class);
    +           when(bufferPool.requestBuffer()).thenReturn(null);
    +           
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
    --- End diff --
    
    How about using the real `NetworkBufferPool#createBufferPool()` here with a 
limited set of buffers? Then you could start retrieving buffers as in the test 
method above and continue with verifying the expected behaviour in case the 
buffer limit was reached (no need for two test methods, I guess). I'd prefer 
this over a mock so that you can also verify the interaction with the real 
methods such as `addBufferListener()`.


> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7406
>                 URL: https://issues.apache.org/jira/browse/FLINK-7406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to