pnowojski commented on a change in pull request #13499:
URL: https://github.com/apache/flink/pull/13499#discussion_r495970800



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
##########
@@ -48,8 +48,6 @@
         *              minimum number of network buffers in this pool
         * @param maxUsedBuffers
         *              maximum number of network buffers this pool offers
-        * @param bufferPoolOwner

Review comment:
       Have you checked if the `BufferPoolOwner` is not part of our shuffle 
service API? Maybe there can be some 3rd party shuffle services using it? 
   
   @zhijiangW seemed to be fine with removing it in the ticket, so I guess 
that's not an issue (he was involved in the plugable shuffle service story).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -518,6 +518,11 @@ private void readRecoveredChannelState() throws 
IOException, InterruptedExceptio
                                                        "Cannot restore state 
to a non-checkpointable partition type: " + writer);
                                }
                        }
+
+                       if (!recordWriter.isAvailable()) {
+                               MailboxDefaultAction.Suspension 
suspendedDefaultAction = mailboxProcessor.suspendDefaultAction();
+                               
getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).thenRun(suspendedDefaultAction::resume);
+                       }

Review comment:
       What's the purpose of this change? Is it actually working? What if 
`getInputOutputJointFuture` returns completed future, but it become unavailable 
during the input recovery? 
   
   Also, it's missing a test coverage.
   
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
                return requestMemorySegment(UNKNOWN_CHANNEL);
        }
 
-       @Nullable
-       private MemorySegment requestMemorySegmentFromGlobal() {
-               assert Thread.holdsLock(availableMemorySegments);
+       private boolean requestMemorySegmentFromGlobal() {
+               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+                       return false;
+               }
+
+               MemorySegment segment = 
networkBufferPool.requestMemorySegment();
+               if (segment != null) {
+                       availableMemorySegments.add(segment);
+                       numberOfRequestedMemorySegments++;
+                       return true;
+               }
+               return false;
+       }
 
-               if (isDestroyed) {
-                       throw new IllegalStateException("Buffer pool is 
destroyed.");
+       /**
+        * Tries to obtain a buffer from global pool as soon as one pool is 
available. Note that multiple
+        * {@link LocalBufferPool}s might wait on the future of the global 
pool, hence this method double-check if a new
+        * buffer is really needed at the time it becomes available.
+        */
+       private void eagerlyRequestMemorySegmentFromGlobal() {
+               if (eagerlyRequesting) {
+                       return;
                }
+               eagerlyRequesting = true;
+               networkBufferPool.getAvailableFuture().thenRun(() -> {
+                       eagerlyRequesting = false;
+                       if (availabilityHelper.isAvailable()) {
+                               // there is currently no benefit for this pool 
to obtain buffer from global; give other pools precedent
+                               return;
+                       }

Review comment:
       doing this under the lock would look more consistent? As it is, can not 
it yield false results? All of the modifications to the `availabilityHelper ` 
are happening under the `availableMemorySegments` lock , so after moving there 
it should be perfectly accurate. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
                return requestMemorySegment(UNKNOWN_CHANNEL);
        }
 
-       @Nullable
-       private MemorySegment requestMemorySegmentFromGlobal() {
-               assert Thread.holdsLock(availableMemorySegments);
+       private boolean requestMemorySegmentFromGlobal() {
+               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+                       return false;
+               }
+
+               MemorySegment segment = 
networkBufferPool.requestMemorySegment();
+               if (segment != null) {
+                       availableMemorySegments.add(segment);
+                       numberOfRequestedMemorySegments++;
+                       return true;
+               }
+               return false;
+       }
 
-               if (isDestroyed) {
-                       throw new IllegalStateException("Buffer pool is 
destroyed.");
+       /**
+        * Tries to obtain a buffer from global pool as soon as one pool is 
available. Note that multiple
+        * {@link LocalBufferPool}s might wait on the future of the global 
pool, hence this method double-check if a new
+        * buffer is really needed at the time it becomes available.
+        */
+       private void eagerlyRequestMemorySegmentFromGlobal() {
+               if (eagerlyRequesting) {
+                       return;
                }
+               eagerlyRequesting = true;
+               networkBufferPool.getAvailableFuture().thenRun(() -> {
+                       eagerlyRequesting = false;
+                       if (availabilityHelper.isAvailable()) {
+                               // there is currently no benefit for this pool 
to obtain buffer from global; give other pools precedent
+                               return;
+                       }
+                       CompletableFuture<?> toNotify = null;
+                       synchronized (availableMemorySegments) {
+                               if (numberOfRequestedMemorySegments >= 
currentPoolSize) {

Review comment:
       nit: for (future) consistency and self documenting code, maybe extract 
this condition to a private method `isRequestedSizeReached()`?  

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
                return requestMemorySegment(UNKNOWN_CHANNEL);
        }
 
-       @Nullable
-       private MemorySegment requestMemorySegmentFromGlobal() {
-               assert Thread.holdsLock(availableMemorySegments);
+       private boolean requestMemorySegmentFromGlobal() {
+               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+                       return false;
+               }
+
+               MemorySegment segment = 
networkBufferPool.requestMemorySegment();
+               if (segment != null) {
+                       availableMemorySegments.add(segment);
+                       numberOfRequestedMemorySegments++;
+                       return true;
+               }
+               return false;
+       }
 
-               if (isDestroyed) {
-                       throw new IllegalStateException("Buffer pool is 
destroyed.");
+       /**
+        * Tries to obtain a buffer from global pool as soon as one pool is 
available. Note that multiple
+        * {@link LocalBufferPool}s might wait on the future of the global 
pool, hence this method double-check if a new
+        * buffer is really needed at the time it becomes available.
+        */
+       private void eagerlyRequestMemorySegmentFromGlobal() {

Review comment:
       I guess it's not "eagerly" after all? Eagerly would mean to me something 
like request them upon construction, or something like that?
   
   Here you mean, request the buffers first, before making `LocalBufferPool` 
available? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -92,16 +102,21 @@
 
        private final int maxBuffersPerChannel;
 
+       @GuardedBy("availableMemorySegments")
        private final int[] subpartitionBuffersCount;
 
        private final BufferRecycler[] subpartitionBufferRecyclers;
 
+       @GuardedBy("availableMemorySegments")
        private int unavailableSubpartitionsCount = 0;
 
        private boolean isDestroyed;
 
+       @GuardedBy("availableMemorySegments")
        private final AvailabilityHelper availabilityHelper = new 
AvailabilityHelper();
 
+       private volatile boolean eagerlyRequesting;

Review comment:
       does it need to be `volatile` if we already have 
`@GuardedBy("availableMemorySegments")`? Adding another point of 
synchronisation makes it more difficult to reason about the concurrency model.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -190,6 +205,12 @@
                        subpartitionBufferRecyclers[i] = new 
SubpartitionBufferRecycler(i, this);
                }
                this.maxBuffersPerChannel = maxBuffersPerChannel;
+
+               if (checkAvailability()) {
+                       availabilityHelper.resetAvailable();
+               }
+
+               assert hasConsistentAvailability();

Review comment:
       why `assert` and not `checkState()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -292,27 +313,34 @@ private MemorySegment requestMemorySegmentBlocking(int 
targetChannel) throws Int
 
        @Nullable
        private MemorySegment requestMemorySegment(int targetChannel) {
-               MemorySegment segment = null;
+               MemorySegment segment;
                synchronized (availableMemorySegments) {
-                       returnExcessMemorySegments();
-
-                       if (availableMemorySegments.isEmpty()) {
-                               segment = requestMemorySegmentFromGlobal();
+                       if (isDestroyed) {
+                               throw new IllegalStateException("Buffer pool is 
destroyed.");
                        }
-                       // segment may have been released by buffer pool owner
-                       if (segment == null) {
-                               segment = availableMemorySegments.poll();
+
+                       // target channel over quota; do not return a segment
+                       if (targetChannel != UNKNOWN_CHANNEL && 
subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
+                               return null;
                        }
+
+                       segment = availableMemorySegments.poll();
+
                        if (segment == null) {
-                               availabilityHelper.resetUnavailable();
+                               return null;
                        }
 
-                       if (segment != null && targetChannel != 
UNKNOWN_CHANNEL) {
-                               if (subpartitionBuffersCount[targetChannel]++ 
== maxBuffersPerChannel) {
+                       if (targetChannel != UNKNOWN_CHANNEL) {
+                               if (++subpartitionBuffersCount[targetChannel] 
== maxBuffersPerChannel) {

Review comment:
       `subpartitionBuffersCount[targetChannel]++` vs 
`++subpartitionBuffersCount[targetChannel]`, isn't it changing the semantic a 
bit?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
                return requestMemorySegment(UNKNOWN_CHANNEL);
        }
 
-       @Nullable
-       private MemorySegment requestMemorySegmentFromGlobal() {
-               assert Thread.holdsLock(availableMemorySegments);
+       private boolean requestMemorySegmentFromGlobal() {
+               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+                       return false;
+               }
+
+               MemorySegment segment = 
networkBufferPool.requestMemorySegment();
+               if (segment != null) {
+                       availableMemorySegments.add(segment);
+                       numberOfRequestedMemorySegments++;
+                       return true;
+               }
+               return false;
+       }
 
-               if (isDestroyed) {
-                       throw new IllegalStateException("Buffer pool is 
destroyed.");
+       /**
+        * Tries to obtain a buffer from global pool as soon as one pool is 
available. Note that multiple
+        * {@link LocalBufferPool}s might wait on the future of the global 
pool, hence this method double-check if a new
+        * buffer is really needed at the time it becomes available.
+        */
+       private void eagerlyRequestMemorySegmentFromGlobal() {
+               if (eagerlyRequesting) {
+                       return;
                }
+               eagerlyRequesting = true;
+               networkBufferPool.getAvailableFuture().thenRun(() -> {
+                       eagerlyRequesting = false;
+                       if (availabilityHelper.isAvailable()) {
+                               // there is currently no benefit for this pool 
to obtain buffer from global; give other pools precedent
+                               return;
+                       }
+                       CompletableFuture<?> toNotify = null;
+                       synchronized (availableMemorySegments) {
+                               if (numberOfRequestedMemorySegments >= 
currentPoolSize) {
+                                       return;
+                               }
+
+                               // fetch a segment from global pool
+                               if (requestMemorySegmentFromGlobal()) {
+                                       toNotify = 
availabilityHelper.getUnavailableToResetAvailable();
+                               } else {
+                                       // segment probably taken by other 
pool, so retry later
+                                       eagerlyRequestMemorySegmentFromGlobal();
+                               }
+                       }
+                       mayNotifyAvailable(toNotify);
+               });
+       }
 
+       private boolean checkAvailability() {
+               if (!availableMemorySegments.isEmpty()) {
+                       return unavailableSubpartitionsCount == 0;
+               }
                if (numberOfRequestedMemorySegments < currentPoolSize) {
-                       final MemorySegment segment = 
networkBufferPool.requestMemorySegment();
-                       if (segment != null) {
-                               numberOfRequestedMemorySegments++;
-                               return segment;
+                       if (requestMemorySegmentFromGlobal()) {
+                               return unavailableSubpartitionsCount == 0;

Review comment:
       does it mean we can keep requesting segments despite reaching per 
sub-partition limit? Is it a pre-existing behaviour? (if so, we could leave it 
as a future improvement)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to