Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152852133
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
                        exclusiveBuffers.add(buffer);
                }
     
    -           Buffer takeExclusiveBuffer() {
    -                   return exclusiveBuffers.poll();
    -           }
    -
                void addFloatingBuffer(Buffer buffer) {
                        floatingBuffers.add(buffer);
                }
     
    -           Buffer takeFloatingBuffer() {
    -                   return floatingBuffers.poll();
    +           /**
    +            * Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
    +            * number of available buffers in queue is more than required 
amount.
    +            *
    +            * @param buffer The exclusive buffer of this channel.
    +            * @return Whether to recycle one floating buffer.
    +            */
    +           boolean maintainTargetSize(Buffer buffer) {
    +                   exclusiveBuffers.add(buffer);
    +
    +                   if (getAvailableBufferSize() > numRequiredBuffers) {
    +                           Buffer floatingBuffer = floatingBuffers.poll();
    +                           floatingBuffer.recycle();
    +                           return true;
    +                   } else {
    +                           return false;
    +                   }
                }
     
    -           int getFloatingBufferSize() {
    -                   return floatingBuffers.size();
    +           /**
    +            * Take the floating buffer first if possible.
    +            */
    +           @Nullable
    +           Buffer takeBuffer() {
    --- End diff --
    
    please explain when the result may be `null`


---

Reply via email to