zhijiangW commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285002711
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -121,24 +117,29 @@ public RemoteInputChannel( ConnectionManager connectionManager, int initialBackOff, int maxBackoff, - InputChannelMetrics metrics) { + InputChannelMetrics metrics, + @Nonnull MemorySegmentProvider memorySegmentProvider) { - super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter()); + super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, + metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter()); this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); + this.memorySegmentProvider = memorySegmentProvider; } /** * Assigns exclusive buffers to this input channel, and this method should be called only once * after this input channel is created. */ - void assignExclusiveSegments(List<MemorySegment> segments) { + void assignExclusiveSegments() throws IOException { + Collection<MemorySegment> segments = memorySegmentProvider.requestMemorySegments(); Review comment: request should be after the check of `this.initialCredit == 0` below. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services