KarmaGYZ commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r411338006



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,52 @@ private TaskManagerSlot 
createAndRegisterTaskManagerSlot(SlotID slotId, Resource
        @Nullable
        private PendingTaskManagerSlot 
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
                for (PendingTaskManagerSlot pendingTaskManagerSlot : 
pendingSlots.values()) {
-                       if 
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+                       if 
(pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, 
resourceProfile)) {
                                return pendingTaskManagerSlot;
                        }
                }
 
                return null;
        }
 
+       private boolean 
pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot 
pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+               return 
pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+       }
+
+       private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport 
slotReport) {
+               final int numReportedNewSlots = slotReport.getNumSlotStatus();
+               final int numRegisteredSlots =  getNumberRegisteredSlots();
+               final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+               // check if the total number exceed before matching pending 
slot.
+               if (numRegisteredSlots + numPendingSlots + numReportedNewSlots 
<= maxSlotNum) {
+                       return false;
+               }
+
+               // check how many exceed slots could be consumed by pending 
slot.
+               final int totalSlotNum = numRegisteredSlots + numPendingSlots + 
getNumNonPendingReportedNewSlots(slotReport, numReportedNewSlots);
+               if (totalSlotNum > maxSlotNum) {
+                       return true;
+               }
+
+               return false;
+       }
+
+       private int getNumNonPendingReportedNewSlots(SlotReport slotReport, int 
numReportedNewSlots) {

Review comment:
       Yes, I pass this param to avoid calling `slotReport.getNumSlotStatus()` 
multiple times.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to