Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141902956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -70,6 +79,21 @@ */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been announced to the producer yet. */ + private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false); --- End diff -- Now seeing this in action: do we really need a `AtomicBoolean`? Or is a `volatile boolean` enough? All uses except for `notifyBufferDestroyed()` (where only a safety-check uses the value) are actually under a `synchronized (availableBuffers)` block...in this case, you may also annotate the variable as `@GuardedBy("availableBuffers")` for documentation.
---