Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152961281
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
                        exclusiveBuffers.add(buffer);
                }
     
    -           Buffer takeExclusiveBuffer() {
    -                   return exclusiveBuffers.poll();
    -           }
    -
                void addFloatingBuffer(Buffer buffer) {
                        floatingBuffers.add(buffer);
                }
     
    -           Buffer takeFloatingBuffer() {
    -                   return floatingBuffers.poll();
    +           /**
    +            * Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
    +            * number of available buffers in queue is more than required 
amount.
    +            *
    +            * @param buffer The exclusive buffer of this channel.
    +            * @return Whether to recycle one floating buffer.
    +            */
    +           boolean maintainTargetSize(Buffer buffer) {
    --- End diff --
    
    Sorry about the forth-and-back here, but thinking about the bug that you 
fixed with the latest commit, it would be dangerous if we ever called 
`maintainTargetSize()` without adding an exclusive buffer. What do you think 
about my second suggestion instead, i.e. having a
    ```
                /**
                 * Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
                 * number of available buffers in queue is more than the 
required amount.
                 *
                 * @param buffer
                 *              the exclusive buffer to add
                 * @param numRequiredBuffers
                 *              the number of required buffers
                 *
                 * @return how many buffers were added to the queue
                 */
                int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
                        exclusiveBuffers.add(buffer);
                        if (getAvailableBufferSize() > numRequiredBuffers) {
                                Buffer floatingBuffer = floatingBuffers.poll();
                                floatingBuffer.recycle();
                                return 0;
                        } else {
                                return 1;
                        }
                }
    ```
    (please note the changed return type which I think is more obvious than a 
`boolean`)


---

Reply via email to