Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141901569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -209,6 +276,95 @@ public String toString() { } // ------------------------------------------------------------------------ + // Credit-based + // ------------------------------------------------------------------------ + + /** + * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + */ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** + * Exclusive buffer is recycled to this input channel directly and it may trigger notify + * credit to producer. + * + * @param segment The exclusive segment of this channel. + */ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + // that way the segment can also be returned to global pool after added into + // the available queue during releasing all resources. + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + availableBuffers.add(new Buffer(segment, this)); + } + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + } + + public int getNumberOfAvailableBuffers() { + synchronized (availableBuffers) { + return availableBuffers.size(); + } + } + + /** + * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or + * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, + * the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is + * increased by one. + * + * @param buffer Buffer that becomes available in buffer pool. + * @return True when this channel is waiting for more floating buffers, otherwise false. + */ + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers."); + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + buffer.recycle(); + + return false; + } + + availableBuffers.add(buffer); + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } --- End diff -- can we do this outside the `synchronized` block?
---