pnowojski commented on a change in pull request #13499: URL: https://github.com/apache/flink/pull/13499#discussion_r496565333
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } Review comment: maybe change it to `while (numberOfRequestedMemorySegments < currentPoolSize)` loop? (as a follow up commit?) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() { return requestMemorySegment(UNKNOWN_CHANNEL); } - @Nullable - private MemorySegment requestMemorySegmentFromGlobal() { - assert Thread.holdsLock(availableMemorySegments); + private boolean requestMemorySegmentFromGlobal() { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return false; + } + + MemorySegment segment = networkBufferPool.requestMemorySegment(); + if (segment != null) { + availableMemorySegments.add(segment); + numberOfRequestedMemorySegments++; + return true; + } + return false; + } - if (isDestroyed) { - throw new IllegalStateException("Buffer pool is destroyed."); + /** + * Tries to obtain a buffer from global pool as soon as one pool is available. Note that multiple + * {@link LocalBufferPool}s might wait on the future of the global pool, hence this method double-check if a new + * buffer is really needed at the time it becomes available. + */ + private void eagerlyRequestMemorySegmentFromGlobal() { + if (eagerlyRequesting) { + return; } + eagerlyRequesting = true; + networkBufferPool.getAvailableFuture().thenRun(() -> { + eagerlyRequesting = false; + if (availabilityHelper.isAvailable()) { + // there is currently no benefit for this pool to obtain buffer from global; give other pools precedent + return; + } + CompletableFuture<?> toNotify = null; + synchronized (availableMemorySegments) { + if (numberOfRequestedMemorySegments >= currentPoolSize) { + return; + } + + // fetch a segment from global pool + if (requestMemorySegmentFromGlobal()) { + toNotify = availabilityHelper.getUnavailableToResetAvailable(); + } else { + // segment probably taken by other pool, so retry later + eagerlyRequestMemorySegmentFromGlobal(); + } + } + mayNotifyAvailable(toNotify); + }); + } + private boolean checkAvailability() { + if (!availableMemorySegments.isEmpty()) { + return unavailableSubpartitionsCount == 0; + } if (numberOfRequestedMemorySegments < currentPoolSize) { - final MemorySegment segment = networkBufferPool.requestMemorySegment(); - if (segment != null) { - numberOfRequestedMemorySegments++; - return segment; + if (requestMemorySegmentFromGlobal()) { + return unavailableSubpartitionsCount == 0; Review comment: I don't think this is a big issue. Just wanted to make sure it's not a regression (that would be an issue) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ########## @@ -292,27 +313,34 @@ private MemorySegment requestMemorySegmentBlocking(int targetChannel) throws Int @Nullable private MemorySegment requestMemorySegment(int targetChannel) { - MemorySegment segment = null; + MemorySegment segment; synchronized (availableMemorySegments) { - returnExcessMemorySegments(); - - if (availableMemorySegments.isEmpty()) { - segment = requestMemorySegmentFromGlobal(); + if (isDestroyed) { + throw new IllegalStateException("Buffer pool is destroyed."); } - // segment may have been released by buffer pool owner - if (segment == null) { - segment = availableMemorySegments.poll(); + + // target channel over quota; do not return a segment + if (targetChannel != UNKNOWN_CHANNEL && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) { + return null; } + + segment = availableMemorySegments.poll(); + if (segment == null) { - availabilityHelper.resetUnavailable(); + return null; } - if (segment != null && targetChannel != UNKNOWN_CHANNEL) { - if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { + if (targetChannel != UNKNOWN_CHANNEL) { + if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) { Review comment: Yes, it would be better to move it to a separate commit. I agree that `[0, maxBuffersPerChannel]` makes more sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org