Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r152255026
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -82,17 +84,19 @@
        /** The initial number of exclusive buffers assigned to this channel. */
        private int initialCredit;
     
    -   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
    -   private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>();
    +   /** The available buffer queue wraps both exclusive and requested 
floating buffers. */
    +   private final AvailableBufferQueue bufferQueue = new 
AvailableBufferQueue();
     
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
        /** The number of unsent buffers in the producer's sub partition. */
    -   private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +   @GuardedBy("bufferQueue")
    +   private int senderBacklog;
    --- End diff --
    
    Skipping over the code, we only ever need `numRequiredBuffers = 
senderBacklog + initialCredit` - what do you think about storing this value 
instead of the backlog itself? It could still be retrieved if desired, but we 
spare adding these two numbers over and over again and only need to update this 
in `onSenderBacklog()`.


---

Reply via email to