Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -161,6 +172,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private)
---