Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK , thanks for suggestions.
    
    I understand your point of wrapping the buffer and backlog together in a 
new structure returned by `getNextBuffer()` and it really makes sense for 
`PipelinedSubpartition`. But for `SpillableSubpartition`, when it begins to 
write the buffer to disk, we can not get the total backlog from that. We can 
only get the precise backlog by decreasing one for `getNextBuffer()` and 
increasing one for `add(Buffer)` .
    
    So I think we can put the `decreaseStatistics` under the lock which can 
cover all the subpartitions.


---

Reply via email to