Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141896488 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int sequenceNumber) { } } } + + if (success && backlog > 0) { + onSenderBacklog(backlog); + } + } finally { if (!success) { buffer.recycle(); } } } - public void onEmptyBuffer(int sequenceNumber) { + public void onEmptyBuffer(int sequenceNumber, int backlog) { + boolean success = false; + synchronized (receivedBuffers) { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { expectedSequenceNumber++; + success = true; } else { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } } + + if (success && backlog > 0) { --- End diff -- same here
---