xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool URL: https://github.com/apache/flink/pull/8841#discussion_r297472482
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java ########## @@ -347,20 +365,65 @@ private MultiTaskSlot( CompletableFuture<? extends SlotContext> slotContextFuture, @Nullable SlotRequestId allocatedSlotRequestId) { super(slotRequestId, groupId); + Preconditions.checkNotNull(slotContextFuture); this.parent = parent; - this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture); this.allocatedSlotRequestId = allocatedSlotRequestId; this.children = new HashMap<>(16); this.releasingChildren = false; - slotContextFuture.whenComplete( - (SlotContext ignored, Throwable throwable) -> { - if (throwable != null) { - release(throwable); + this.requestedResources = ResourceProfile.EMPTY; + + this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> { + if (throwable != null) { + // If the underlying resource request fail, currently we fails all the requests to + // simplify the logic. + release(throwable); + throw new CompletionException(throwable); + } + + if (parent == null) { + ResourceProfile allocated = ResourceProfile.EMPTY; + List<TaskSlot> childrenToEvict = new ArrayList<>(); + + for (TaskSlot slot : children.values()) { + ResourceProfile allocatedIfInclude = allocated.merge(slot.getRequestedResources()); + + if (slotContext.getResourceProfile().isMatching(allocatedIfInclude)) { + allocated = allocatedIfInclude; + } else { + childrenToEvict.add(slot); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " + + "number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " + + "evicted requests is {},", + children.size(), + childrenToEvict.size(), + slotContext.getResourceProfile(), + allocated, + childrenToEvict); } - }); + + if (childrenToEvict.size() == children.size()) { + // This only happens when we request to RM using the resource profile of a task + // who is belonging to a CoLocationGroup. Similar to dealing with the fail of Review comment: It's not clear to me why this only happens for CoLocationGroup. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services