Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157691096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- The way of `ArrayDeque#size()` for `getBuffersInBacklog()` may be not feasible because we do not know how many events in the `ArrayDeque` and they should not be considered as backlog length. For the new API, we may need to modify the `ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping structure instead of `Buffer`, and do we also need to extend the `BufferAndAvailability` to add backlog in it? By this way, it can get benefits for `PipelinedSubpartition` to reduce 'volatile`, but for `SpillableSubpartition`, the `volatile` may still be needed? Because the `getNextBuffer` and `decreaseBacklog` are in different parts for `SpillableSubpartitionView/SpilledSubpartitionView`.
---