Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4559 @NicoK , thanks for suggestions. I understand your point of wrapping the buffer and backlog together in a new structure returned by `getNextBuffer()` and it really makes sense for `PipelinedSubpartition`. But for `SpillableSubpartition`, when it begins to write the buffer to disk, we can not get the total backlog from that. We can only get the precise backlog by decreasing one for `getNextBuffer()` and increasing one for `add(Buffer)` . So I think we can put the `decreaseStatistics` under the lock which can cover all the subpartitions.
---