[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186038#comment-16186038 ]
ASF GitHub Bot commented on FLINK-7406: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141901569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -209,6 +276,95 @@ public String toString() { } // ------------------------------------------------------------------------ + // Credit-based + // ------------------------------------------------------------------------ + + /** + * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + */ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** + * Exclusive buffer is recycled to this input channel directly and it may trigger notify + * credit to producer. + * + * @param segment The exclusive segment of this channel. + */ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + // that way the segment can also be returned to global pool after added into + // the available queue during releasing all resources. + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + availableBuffers.add(new Buffer(segment, this)); + } + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + } + + public int getNumberOfAvailableBuffers() { + synchronized (availableBuffers) { + return availableBuffers.size(); + } + } + + /** + * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or + * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, + * the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is + * increased by one. + * + * @param buffer Buffer that becomes available in buffer pool. + * @return True when this channel is waiting for more floating buffers, otherwise false. + */ + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers."); + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + buffer.recycle(); + + return false; + } + + availableBuffers.add(buffer); + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } --- End diff -- can we do this outside the `synchronized` block? > Implement Netty receiver incoming pipeline for credit-based > ----------------------------------------------------------- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)