KarmaGYZ commented on a change in pull request #11615: URL: https://github.com/apache/flink/pull/11615#discussion_r415189229
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -650,14 +658,48 @@ private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, Resource @Nullable private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) { for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { - if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) { + if (isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) { return pendingTaskManagerSlot; } } return null; } + private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) { + return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile); + } + + private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) { + final int numReportedNewSlots = initialSlotReport.getNumSlotStatus(); + final int numRegisteredSlots = getNumberRegisteredSlots(); + final int numPendingSlots = getNumberPendingTaskManagerSlots(); + + // check if the total number exceed before matching pending slot. + if (numRegisteredSlots + numPendingSlots + numReportedNewSlots <= maxSlotNum) { + return false; + } + + // check how many exceed slots could be consumed by pending slot. + final int totalSlotNum = numRegisteredSlots + numPendingSlots + getNumNonPendingReportedNewSlots(initialSlotReport); + return totalSlotNum > maxSlotNum; + } + + private int getNumNonPendingReportedNewSlots(SlotReport slotReport) { + final Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>(); + + for (SlotStatus slotStatus : slotReport) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) && Review comment: This function checks how many `pendingTaskManagerSlot` could be fulfilled by the slot in `SlotReport`. If we do not record which `pendingTaskManagerSlot` has already been matched before, all the slot in `SlotReport` would match with a single `pendingTaskManagerSlot`, which is not correct. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org