AHeise commented on a change in pull request #13499:
URL: https://github.com/apache/flink/pull/13499#discussion_r496483325



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -292,27 +313,34 @@ private MemorySegment requestMemorySegmentBlocking(int 
targetChannel) throws Int
 
        @Nullable
        private MemorySegment requestMemorySegment(int targetChannel) {
-               MemorySegment segment = null;
+               MemorySegment segment;
                synchronized (availableMemorySegments) {
-                       returnExcessMemorySegments();
-
-                       if (availableMemorySegments.isEmpty()) {
-                               segment = requestMemorySegmentFromGlobal();
+                       if (isDestroyed) {
+                               throw new IllegalStateException("Buffer pool is 
destroyed.");
                        }
-                       // segment may have been released by buffer pool owner
-                       if (segment == null) {
-                               segment = availableMemorySegments.poll();
+
+                       // target channel over quota; do not return a segment
+                       if (targetChannel != UNKNOWN_CHANNEL && 
subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
+                               return null;
                        }
+
+                       segment = availableMemorySegments.poll();
+
                        if (segment == null) {
-                               availabilityHelper.resetUnavailable();
+                               return null;
                        }
 
-                       if (segment != null && targetChannel != 
UNKNOWN_CHANNEL) {
-                               if (subpartitionBuffersCount[targetChannel]++ 
== maxBuffersPerChannel) {
+                       if (targetChannel != UNKNOWN_CHANNEL) {
+                               if (++subpartitionBuffersCount[targetChannel] 
== maxBuffersPerChannel) {

Review comment:
       Yes, I should probably pull it out. Imho it was wrong before. For me 
valid `subpartitionBuffersCount`s are `[0, maxBuffersPerChannel]`, but it used 
to be `[0, maxBuffersPerChannel + 1]`. In the end, `maxBuffersPerChannel` is 
rather arbitrary, so it shouldn't have a any impact on production applications, 
but for me, it's also important to have a clear semantics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to