Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r143230905
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -306,10 +311,27 @@ public void recycle(MemorySegment segment) {
                                        ExceptionUtils.rethrow(t);
                                }
                        }
    +
    +                   // Recycle the extra floating buffers in order not to 
stack up 2*initialCredit
    +                   // buffers once current backlog is 0
    +                   if (senderBacklog.get() == 0 && availableBuffers.size() 
>= initialCredit) {
    +                           final int size = availableBuffers.size();
    +                           for (int i = 0; i < size; i++) {
    +                                   final Buffer buffer = 
availableBuffers.poll();
    --- End diff --
    
    I wonder if it would make things simpler if we had two separate lists of 
buffers: one for exclusive buffers and one for floating buffers. Then you would 
at least not have to iterate through the list. When retrieving buffers, you'd 
always take the floating buffers first (so we don't get into this situation too 
often). If that makes it simpler, we could also extract this logic into a 
separate `AvailableBuffersQueue` class or so. Regarding testability, this may 
also make sense.


---

Reply via email to