[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16217171#comment-16217171
 ] 

ASF GitHub Bot commented on FLINK-7416:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r146614543
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
    @@ -64,23 +75,36 @@
        @Test(timeout = 60000)
        @SuppressWarnings("unchecked")
        public void testReleaseInputChannelDuringDecode() throws Exception {
    -           // Mocks an input channel in a state as it was released during 
a decode.
    -           final BufferProvider bufferProvider = 
mock(BufferProvider.class);
    -           when(bufferProvider.requestBuffer()).thenReturn(null);
    -           when(bufferProvider.isDestroyed()).thenReturn(true);
    -           
when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false);
    -
    -           final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
    -           when(inputChannel.getInputChannelId()).thenReturn(new 
InputChannelID());
    -           
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
    -
    -           final BufferResponse receivedBuffer = createBufferResponse(
    -                   TestBufferFactory.createBuffer(), 0, 
inputChannel.getInputChannelId(), 2);
    -
    -           final PartitionRequestClientHandler client = new 
PartitionRequestClientHandler();
    -           client.addInputChannel(inputChannel);
    -
    -           client.channelRead(mock(ChannelHandlerContext.class), 
receivedBuffer);
    +           final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32, MemoryType.HEAP);
    --- End diff --
    
    Thanks for those changes!
    
    One more question, maybe it would be better to split refactoring of the old 
tests to separate commit? If this is too much work you can ignore this one, but 
it would make a cleaner history (and it would give you one more contribution ;) 
)


> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7416
>                 URL: https://issues.apache.org/jira/browse/FLINK-7416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to