AHeise commented on a change in pull request #13499:
URL: https://github.com/apache/flink/pull/13499#discussion_r496478254



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -322,23 +350,71 @@ private MemorySegment requestMemorySegment() {
                return requestMemorySegment(UNKNOWN_CHANNEL);
        }
 
-       @Nullable
-       private MemorySegment requestMemorySegmentFromGlobal() {
-               assert Thread.holdsLock(availableMemorySegments);
+       private boolean requestMemorySegmentFromGlobal() {
+               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+                       return false;
+               }
+
+               MemorySegment segment = 
networkBufferPool.requestMemorySegment();
+               if (segment != null) {
+                       availableMemorySegments.add(segment);
+                       numberOfRequestedMemorySegments++;
+                       return true;
+               }
+               return false;
+       }
 
-               if (isDestroyed) {
-                       throw new IllegalStateException("Buffer pool is 
destroyed.");
+       /**
+        * Tries to obtain a buffer from global pool as soon as one pool is 
available. Note that multiple
+        * {@link LocalBufferPool}s might wait on the future of the global 
pool, hence this method double-check if a new
+        * buffer is really needed at the time it becomes available.
+        */
+       private void eagerlyRequestMemorySegmentFromGlobal() {
+               if (eagerlyRequesting) {
+                       return;
                }
+               eagerlyRequesting = true;
+               networkBufferPool.getAvailableFuture().thenRun(() -> {
+                       eagerlyRequesting = false;
+                       if (availabilityHelper.isAvailable()) {
+                               // there is currently no benefit for this pool 
to obtain buffer from global; give other pools precedent
+                               return;
+                       }
+                       CompletableFuture<?> toNotify = null;
+                       synchronized (availableMemorySegments) {
+                               if (numberOfRequestedMemorySegments >= 
currentPoolSize) {

Review comment:
       I had various helper methods in-between and in the end inlined them all 
as it didn't make them much easier to read imho. But I'll try your suggestion; 
the semantics is easy enough that you do not need to look into the 
implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to