pnowojski commented on a change in pull request #13499: URL: https://github.com/apache/flink/pull/13499#discussion_r495970800
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java ########## @@ -48,8 +48,6 @@ * minimum number of network buffers in this pool * @param maxUsedBuffers * maximum number of network buffers this pool offers - * @param bufferPoolOwner Review comment: Have you checked if the `BufferPoolOwner` is not part of our shuffle service API? Maybe there can be some 3rd party shuffle services using it? @zhijiangW seemed to be fine with removing it in the ticket, so I guess that's not an issue (he was involved in the plugable shuffle service story). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -518,6 +518,11 @@ private void readRecoveredChannelState() throws IOException, InterruptedExceptio "Cannot restore state to a non-checkpointable partition type: " + writer); } } + + if (!recordWriter.isAvailable()) { + MailboxDefaultAction.Suspension suspendedDefaultAction = mailboxProcessor.suspendDefaultAction(); + getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).thenRun(suspendedDefaultAction::resume); + } Review comment: What's the purpose of this change? Is it actually working? What if `getInputOutputJointFuture` returns completed future, but it become unavailable during the input recovery? Also, it's missing a test coverage. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } - if (isDestroyed) { - throw new IllegalStateException("Buffer pool is destroyed."); + /** + * Tries to obtain a buffer from global pool as soon as one pool is available. Note that multiple + * {@link LocalBufferPool}s might wait on the future of the global pool, hence this method double-check if a new + * buffer is really needed at the time it becomes available. + */ + private void eagerlyRequestMemorySegmentFromGlobal() { + if (eagerlyRequesting) { + return; } + eagerlyRequesting = true; + networkBufferPool.getAvailableFuture().thenRun(() -> { + eagerlyRequesting = false; + if (availabilityHelper.isAvailable()) { + // there is currently no benefit for this pool to obtain buffer from global; give other pools precedent + return; + } Review comment: doing this under the lock would look more consistent? As it is, can not it yield false results? All of the modifications to the `availabilityHelper ` are happening under the `availableMemorySegments` lock , so after moving there it should be perfectly accurate. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } - if (isDestroyed) { - throw new IllegalStateException("Buffer pool is destroyed."); + /** + * Tries to obtain a buffer from global pool as soon as one pool is available. Note that multiple + * {@link LocalBufferPool}s might wait on the future of the global pool, hence this method double-check if a new + * buffer is really needed at the time it becomes available. + */ + private void eagerlyRequestMemorySegmentFromGlobal() { + if (eagerlyRequesting) { + return; } + eagerlyRequesting = true; + networkBufferPool.getAvailableFuture().thenRun(() -> { + eagerlyRequesting = false; + if (availabilityHelper.isAvailable()) { + // there is currently no benefit for this pool to obtain buffer from global; give other pools precedent + return; + } + CompletableFuture<?> toNotify = null; + synchronized (availableMemorySegments) { + if (numberOfRequestedMemorySegments >= currentPoolSize) { Review comment: nit: for (future) consistency and self documenting code, maybe extract this condition to a private method `isRequestedSizeReached()`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } - if (isDestroyed) { - throw new IllegalStateException("Buffer pool is destroyed."); + /** + * Tries to obtain a buffer from global pool as soon as one pool is available. Note that multiple + * {@link LocalBufferPool}s might wait on the future of the global pool, hence this method double-check if a new + * buffer is really needed at the time it becomes available. + */ + private void eagerlyRequestMemorySegmentFromGlobal() { Review comment: I guess it's not "eagerly" after all? Eagerly would mean to me something like request them upon construction, or something like that? Here you mean, request the buffers first, before making `LocalBufferPool` available? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -92,16 +102,21 @@ private final int maxBuffersPerChannel; + @GuardedBy("availableMemorySegments") private final int[] subpartitionBuffersCount; private final BufferRecycler[] subpartitionBufferRecyclers; + @GuardedBy("availableMemorySegments") private int unavailableSubpartitionsCount = 0; private boolean isDestroyed; + @GuardedBy("availableMemorySegments") private final AvailabilityHelper availabilityHelper = new AvailabilityHelper(); + private volatile boolean eagerlyRequesting; Review comment: does it need to be `volatile` if we already have `@GuardedBy("availableMemorySegments")`? Adding another point of synchronisation makes it more difficult to reason about the concurrency model. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -190,6 +205,12 @@ subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this); } this.maxBuffersPerChannel = maxBuffersPerChannel; + + if (checkAvailability()) { + availabilityHelper.resetAvailable(); + } + + assert hasConsistentAvailability(); Review comment: why `assert` and not `checkState()`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -292,27 +313,34 @@ private MemorySegment requestMemorySegmentBlocking(int targetChannel) throws Int @Nullable private MemorySegment requestMemorySegment(int targetChannel) { - MemorySegment segment = null; + MemorySegment segment; synchronized (availableMemorySegments) { - returnExcessMemorySegments(); - - if (availableMemorySegments.isEmpty()) { - segment = requestMemorySegmentFromGlobal(); + if (isDestroyed) { + throw new IllegalStateException("Buffer pool is destroyed."); } - // segment may have been released by buffer pool owner - if (segment == null) { - segment = availableMemorySegments.poll(); + + // target channel over quota; do not return a segment + if (targetChannel != UNKNOWN_CHANNEL && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) { + return null; } + + segment = availableMemorySegments.poll(); + if (segment == null) { - availabilityHelper.resetUnavailable(); + return null; } - if (segment != null && targetChannel != UNKNOWN_CHANNEL) { - if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { + if (targetChannel != UNKNOWN_CHANNEL) { + if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) { Review comment: `subpartitionBuffersCount[targetChannel]++` vs `++subpartitionBuffersCount[targetChannel]`, isn't it changing the semantic a bit? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } - if (isDestroyed) { - throw new IllegalStateException("Buffer pool is destroyed."); + /** + * Tries to obtain a buffer from global pool as soon as one pool is available. Note that multiple + * {@link LocalBufferPool}s might wait on the future of the global pool, hence this method double-check if a new + * buffer is really needed at the time it becomes available. + */ + private void eagerlyRequestMemorySegmentFromGlobal() { + if (eagerlyRequesting) { + return; } + eagerlyRequesting = true; + networkBufferPool.getAvailableFuture().thenRun(() -> { + eagerlyRequesting = false; + if (availabilityHelper.isAvailable()) { + // there is currently no benefit for this pool to obtain buffer from global; give other pools precedent + return; + } + CompletableFuture<?> toNotify = null; + synchronized (availableMemorySegments) { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return; + } + + // fetch a segment from global pool + if (requestMemorySegmentFromGlobal()) { + toNotify = availabilityHelper.getUnavailableToResetAvailable(); + } else { + // segment probably taken by other pool, so retry later + eagerlyRequestMemorySegmentFromGlobal(); + } + } + mayNotifyAvailable(toNotify); + }); + } + private boolean checkAvailability() { + if (!availableMemorySegments.isEmpty()) { + return unavailableSubpartitionsCount == 0; + } if (numberOfRequestedMemorySegments < currentPoolSize) { - final MemorySegment segment = networkBufferPool.requestMemorySegment(); - if (segment != null) { - numberOfRequestedMemorySegments++; - return segment; + if (requestMemorySegmentFromGlobal()) { + return unavailableSubpartitionsCount == 0; Review comment: does it mean we can keep requesting segments despite reaching per sub-partition limit? Is it a pre-existing behaviour? (if so, we could leave it as a future improvement) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org