Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152255026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -82,17 +84,19 @@ /** The initial number of exclusive buffers assigned to this channel. */ private int initialCredit; - /** The current available buffers including both exclusive buffers and requested floating buffers. */ - private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>(); + /** The available buffer queue wraps both exclusive and requested floating buffers. */ + private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue(); /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); /** The number of unsent buffers in the producer's sub partition. */ - private final AtomicInteger senderBacklog = new AtomicInteger(0); + @GuardedBy("bufferQueue") + private int senderBacklog; --- End diff -- Skipping over the code, we only ever need `numRequiredBuffers = senderBacklog + initialCredit` - what do you think about storing this value instead of the backlog itself? It could still be retrieved if desired, but we spare adding these two numbers over and over again and only need to update this in `onSenderBacklog()`.
---