[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265239#comment-16265239
 ] 

ASF GitHub Bot commented on FLINK-7406:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152961281
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
                        exclusiveBuffers.add(buffer);
                }
     
    -           Buffer takeExclusiveBuffer() {
    -                   return exclusiveBuffers.poll();
    -           }
    -
                void addFloatingBuffer(Buffer buffer) {
                        floatingBuffers.add(buffer);
                }
     
    -           Buffer takeFloatingBuffer() {
    -                   return floatingBuffers.poll();
    +           /**
    +            * Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
    +            * number of available buffers in queue is more than required 
amount.
    +            *
    +            * @param buffer The exclusive buffer of this channel.
    +            * @return Whether to recycle one floating buffer.
    +            */
    +           boolean maintainTargetSize(Buffer buffer) {
    --- End diff --
    
    Sorry about the forth-and-back here, but thinking about the bug that you 
fixed with the latest commit, it would be dangerous if we ever called 
`maintainTargetSize()` without adding an exclusive buffer. What do you think 
about my second suggestion instead, i.e. having a
    ```
                /**
                 * Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
                 * number of available buffers in queue is more than the 
required amount.
                 *
                 * @param buffer
                 *              the exclusive buffer to add
                 * @param numRequiredBuffers
                 *              the number of required buffers
                 *
                 * @return how many buffers were added to the queue
                 */
                int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
                        exclusiveBuffers.add(buffer);
                        if (getAvailableBufferSize() > numRequiredBuffers) {
                                Buffer floatingBuffer = floatingBuffers.poll();
                                floatingBuffer.recycle();
                                return 0;
                        } else {
                                return 1;
                        }
                }
    ```
    (please note the changed return type which I think is more obvious than a 
`boolean`)


> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7406
>                 URL: https://issues.apache.org/jira/browse/FLINK-7406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to