Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152234362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -287,17 +290,22 @@ void notifyCreditAvailable() { } /** - * Exclusive buffer is recycled to this input channel directly and it may trigger notify - * credit to producer. + * Exclusive buffer is recycled to this input channel directly and it may trigger return extra + * floating buffer and notify increased credit to the producer. * * @param segment The exclusive segment of this channel. */ @Override public void recycle(MemorySegment segment) { - synchronized (availableBuffers) { - // Important: the isReleased check should be inside the synchronized block. - // that way the segment can also be returned to global pool after added into - // the available queue during releasing all resources. + boolean floatingBufferRecycled = false; + + synchronized (bufferQueue) { + final int numRequiredBuffers = senderBacklog + initialCredit; + checkState(bufferQueue.getAvailableBufferSize() <= numRequiredBuffers, + "The number of available buffers " + bufferQueue.getAvailableBufferSize() + " should not exceed " + numRequiredBuffers); + + // Important: check the isReleased state inside synchronized block, so there is no + // race condition when recycle and releaseAllResources running in parallel. if (isReleased.get()) { try { inputGate.returnExclusiveSegments(Arrays.asList(segment)); --- End diff -- now that I see it, it should rather be `inputGate.returnExclusiveSegments(Collections.singletonList(segment));`
---