KarmaGYZ commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r415189229



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -650,14 +658,48 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean isMaxSlotNumExceededAfterRegistration(SlotReport 
initialSlotReport) {
+               final int numReportedNewSlots = 
initialSlotReport.getNumSlotStatus();
+               final int numRegisteredSlots =  getNumberRegisteredSlots();
+               final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+               // check if the total number exceed before matching pending 
slot.
+               if (numRegisteredSlots + numPendingSlots + numReportedNewSlots 
<= maxSlotNum) {
+                       return false;
+               }
+
+               // check how many exceed slots could be consumed by pending 
slot.
+               final int totalSlotNum = numRegisteredSlots + numPendingSlots + 
getNumNonPendingReportedNewSlots(initialSlotReport);
+               return totalSlotNum > maxSlotNum;
+       }
+
+       private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
+               final Set<TaskManagerSlotId> matchingPendingSlots = new 
HashSet<>();
+
+               for (SlotStatus slotStatus : slotReport) {
+                       for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
+                               if 
(!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) 
&&

Review comment:
       This function checks how many `pendingTaskManagerSlot` could be 
fulfilled by the slot in `SlotReport`. If we do not record which 
`pendingTaskManagerSlot` has already been matched before, all the slot in 
`SlotReport` would match with a single `pendingTaskManagerSlot`, which is not 
correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to