[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276918#comment-16276918 ]
ASF GitHub Bot commented on FLINK-7416: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154675125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -280,94 +280,120 @@ public String toString() { // ------------------------------------------------------------------------ /** - * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); + + // We should skip the notification if this channel is already released. + if (!isReleased.get()) { + partitionRequestClient.notifyCreditAvailable(this); + } } /** - * Exclusive buffer is recycled to this input channel directly and it may trigger notify - * credit to producer. + * Exclusive buffer is recycled to this input channel directly and it may trigger return extra + * floating buffer and notify increased credit to the producer. * * @param segment The exclusive segment of this channel. */ @Override public void recycle(MemorySegment segment) { - synchronized (availableBuffers) { - // Important: the isReleased check should be inside the synchronized block. - // that way the segment can also be returned to global pool after added into - // the available queue during releasing all resources. + int numAddedBuffers; + + synchronized (bufferQueue) { + // Important: check the isReleased state inside synchronized block, so there is no + // race condition when recycle and releaseAllResources running in parallel. if (isReleased.get()) { try { - inputGate.returnExclusiveSegments(Arrays.asList(segment)); + inputGate.returnExclusiveSegments(Collections.singletonList(segment)); return; } catch (Throwable t) { ExceptionUtils.rethrow(t); } } - availableBuffers.add(new Buffer(segment, this)); + numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers); } - if (unannouncedCredit.getAndAdd(1) == 0) { + if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) { notifyCreditAvailable(); } } public int getNumberOfAvailableBuffers() { - synchronized (availableBuffers) { - return availableBuffers.size(); + synchronized (bufferQueue) { + return bufferQueue.getAvailableBufferSize(); } } + @VisibleForTesting + public int getNumberOfRequiredBuffers() { --- End diff -- ok, let's keep this public just like `getNumberOfAvailableBuffers()` - I guess, you could even remove the `@VisibleForTesting` sine it's just a getter > Implement Netty receiver outgoing pipeline for credit-based > ----------------------------------------------------------- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)