Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154318950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -280,94 +280,120 @@ public String toString() { // ------------------------------------------------------------------------ /** - * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); + + // We should skip the notification if this channel is already released. + if (!isReleased.get()) { + partitionRequestClient.notifyCreditAvailable(this); + } } /** - * Exclusive buffer is recycled to this input channel directly and it may trigger notify - * credit to producer. + * Exclusive buffer is recycled to this input channel directly and it may trigger return extra + * floating buffer and notify increased credit to the producer. * * @param segment The exclusive segment of this channel. */ @Override public void recycle(MemorySegment segment) { - synchronized (availableBuffers) { - // Important: the isReleased check should be inside the synchronized block. - // that way the segment can also be returned to global pool after added into - // the available queue during releasing all resources. + int numAddedBuffers; + + synchronized (bufferQueue) { + // Important: check the isReleased state inside synchronized block, so there is no + // race condition when recycle and releaseAllResources running in parallel. if (isReleased.get()) { try { - inputGate.returnExclusiveSegments(Arrays.asList(segment)); + inputGate.returnExclusiveSegments(Collections.singletonList(segment)); return; } catch (Throwable t) { ExceptionUtils.rethrow(t); } } - availableBuffers.add(new Buffer(segment, this)); + numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers); } - if (unannouncedCredit.getAndAdd(1) == 0) { + if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) { notifyCreditAvailable(); } } public int getNumberOfAvailableBuffers() { - synchronized (availableBuffers) { - return availableBuffers.size(); + synchronized (bufferQueue) { + return bufferQueue.getAvailableBufferSize(); } } + @VisibleForTesting + public int getNumberOfRequiredBuffers() { --- End diff -- this could be package-private (and then remove `@VisibleForTesting`)
---