xintongsong commented on a change in pull request #15668:
URL: https://github.com/apache/flink/pull/15668#discussion_r617359286



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
##########
@@ -98,38 +93,66 @@ public ResourceAllocationResult tryFulfillRequirements(
         return resultBuilder.build();
     }
 
-    private static Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> 
getRegisteredResources(
-            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+    private static List<InternalResourceInfo> getRegisteredResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
+            ResourceAllocationResult.Builder resultBuilder) {
         return 
taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
-                .collect(
-                        Collectors.toMap(
-                                TaskManagerInfo::getInstanceId,
-                                taskManager ->
-                                        Tuple2.of(
-                                                
taskManager.getAvailableResource(),
-                                                
taskManager.getDefaultSlotResourceProfile())));
+                .map(
+                        taskManager ->
+                                new InternalResourceInfo(
+                                        taskManager.getInstanceId(),
+                                        
taskManager.getDefaultSlotResourceProfile(),
+                                        taskManager.getAvailableResource(),
+                                        resultBuilder))
+                .collect(Collectors.toList());
     }
 
-    private static Map<PendingTaskManagerId, ResourceProfile> 
getPendingResources(
-            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+    private static List<InternalResourceInfo> getPendingResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
+            ResourceAllocationResult.Builder resultBuilder) {
         return 
taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
-                .collect(
-                        Collectors.toMap(
-                                PendingTaskManager::getPendingTaskManagerId,
-                                PendingTaskManager::getTotalResourceProfile));
+                .map(
+                        pendingTaskManager ->
+                                new InternalResourceInfo(
+                                        
pendingTaskManager.getPendingTaskManagerId(),
+                                        
pendingTaskManager.getDefaultSlotResourceProfile(),
+                                        
pendingTaskManager.getTotalResourceProfile(),
+                                        resultBuilder))
+                .collect(Collectors.toList());
+    }
+
+    private static int tryFulfilledRequirementWithResource(
+            List<InternalResourceInfo> internalResource,
+            int numUnfulfilled,
+            ResourceProfile requiredResource,
+            JobID jobId) {
+        final Iterator<InternalResourceInfo> internalResourceInfoItr = 
internalResource.iterator();
+        while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
+            final InternalResourceInfo currentTaskManager = 
internalResourceInfoItr.next();
+            while (numUnfulfilled > 0
+                    && currentTaskManager.tryAllocateSlotForJob(jobId, 
requiredResource)) {
+                numUnfulfilled--;
+            }
+            if 
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+                internalResourceInfoItr.remove();
+            }
+        }
+        return numUnfulfilled;
     }
 
     private static ResourceCounter 
tryFulfillRequirementsForJobWithRegisteredResources(

Review comment:
       This can also be deduplicated, if we break matching existing pending 
resources and allocating new pending resources into two loops.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
##########
@@ -139,119 +162,108 @@ private static ResourceCounter 
tryFulfillRequirementsForJobWithRegisteredResourc
         return outstandingRequirements;
     }
 
-    private static int tryFindSlotsForRequirement(
-            JobID jobId,
-            ResourceRequirement resourceRequirement,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> 
registeredResources,
-            ResourceAllocationResult.Builder resultBuilder) {
-        final ResourceProfile requiredResource = 
resourceRequirement.getResourceProfile();
-
-        int numUnfulfilled = resourceRequirement.getNumberOfRequiredSlots();
-        while (numUnfulfilled > 0) {
-            final Optional<InstanceID> matchedTaskManager =
-                    findMatchingTaskManager(requiredResource, 
registeredResources);
-
-            if (!matchedTaskManager.isPresent()) {
-                // exit loop early; we won't find a matching slot for this 
requirement
-                break;
-            }
-
-            final ResourceProfile effectiveProfile =
-                    getEffectiveResourceProfile(
-                            requiredResource, 
registeredResources.get(matchedTaskManager.get()).f1);
-            resultBuilder.addAllocationOnRegisteredResource(
-                    jobId, matchedTaskManager.get(), effectiveProfile);
-            deductionRegisteredResource(
-                    registeredResources, matchedTaskManager.get(), 
effectiveProfile);
-            numUnfulfilled--;
-        }
-        return numUnfulfilled;
-    }
-
-    private static Optional<InstanceID> findMatchingTaskManager(
-            ResourceProfile requirement,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> 
registeredResources) {
-        return registeredResources.entrySet().stream()
-                .filter(
-                        taskManager ->
-                                canFulfillRequirement(
-                                        getEffectiveResourceProfile(
-                                                requirement, 
taskManager.getValue().f1),
-                                        taskManager.getValue().f0))
-                .findAny()
-                .map(Map.Entry::getKey);
-    }
-
     private static boolean canFulfillRequirement(
             ResourceProfile requirement, ResourceProfile resourceProfile) {
         return resourceProfile.allFieldsNoLessThan(requirement);
     }
 
-    private static void deductionRegisteredResource(
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> 
registeredResources,
-            InstanceID instanceId,
-            ResourceProfile resourceProfile) {
-        registeredResources.compute(
-                instanceId,
-                (id, tuple2) -> {
-                    Preconditions.checkNotNull(tuple2);
-                    if 
(tuple2.f0.subtract(resourceProfile).equals(ResourceProfile.ZERO)) {
-                        return null;
-                    } else {
-                        return Tuple2.of(tuple2.f0.subtract(resourceProfile), 
tuple2.f1);
-                    }
-                });
-    }
-
-    private static Optional<PendingTaskManagerId> findPendingManagerToFulfill(
-            ResourceProfile resourceProfile,
-            Map<PendingTaskManagerId, ResourceProfile> availableResources) {
-        return availableResources.entrySet().stream()
-                .filter(entry -> 
entry.getValue().allFieldsNoLessThan(resourceProfile))
-                .findAny()
-                .map(Map.Entry::getKey);
-    }
-
     private void tryFulfillRequirementsForJobWithPendingResources(
             JobID jobId,
             ResourceCounter unfulfilledRequirements,
-            Map<PendingTaskManagerId, ResourceProfile> availableResources,
+            List<InternalResourceInfo> availableResources,
             ResourceAllocationResult.Builder resultBuilder) {
         for (Map.Entry<ResourceProfile, Integer> missingResource :
                 unfulfilledRequirements.getResourcesWithCount()) {
+            int numUnfulfilled =
+                    tryFulfilledRequirementWithResource(
+                            availableResources,
+                            missingResource.getValue(),
+                            missingResource.getKey(),
+                            jobId);
+
+            if (numUnfulfilled == 0) {
+                // All the requirements fulfilled.
+                continue;
+            }
+
             // for this strategy, all pending resources should have the same 
default slot resource
             final ResourceProfile effectiveProfile =
                     getEffectiveResourceProfile(
                             missingResource.getKey(), 
defaultSlotResourceProfile);
-            for (int i = 0; i < missingResource.getValue(); i++) {
-                Optional<PendingTaskManagerId> matchedPendingTaskManager =
-                        findPendingManagerToFulfill(effectiveProfile, 
availableResources);
-                if (matchedPendingTaskManager.isPresent()) {
-                    availableResources.compute(
-                            matchedPendingTaskManager.get(),
-                            ((pendingTaskManagerId, resourceProfile) ->
-                                    Preconditions.checkNotNull(resourceProfile)
-                                            .subtract(effectiveProfile)));
+
+            if (!totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
+                // Can not fulfill this resource type will the default worker.
+                resultBuilder.addUnfulfillableJob(jobId);
+                continue;
+            }
+
+            while (numUnfulfilled > 0) {
+                // Circularly add new pending task manager
+                final PendingTaskManager newPendingTaskManager =
+                        new PendingTaskManager(totalResourceProfile, 
numSlotsPerWorker);
+                
resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
+                ResourceProfile remainResource = totalResourceProfile;
+                while (numUnfulfilled > 0
+                        && canFulfillRequirement(effectiveProfile, 
remainResource)) {
+                    numUnfulfilled--;
                     resultBuilder.addAllocationOnPendingResource(
-                            jobId, matchedPendingTaskManager.get(), 
effectiveProfile);
-                } else {
-                    if 
(totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
-                        // Add new pending task manager
-                        final PendingTaskManager pendingTaskManager =
-                                new PendingTaskManager(totalResourceProfile, 
numSlotsPerWorker);
-                        
resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
-                        resultBuilder.addAllocationOnPendingResource(
-                                jobId,
-                                pendingTaskManager.getPendingTaskManagerId(),
-                                effectiveProfile);
-                        availableResources.put(
-                                pendingTaskManager.getPendingTaskManagerId(),
-                                
totalResourceProfile.subtract(effectiveProfile));
-                    } else {
-                        resultBuilder.addUnfulfillableJob(jobId);
-                        break;
-                    }
+                            jobId,
+                            newPendingTaskManager.getPendingTaskManagerId(),
+                            effectiveProfile);
+                    remainResource = remainResource.subtract(effectiveProfile);
                 }
+                if (!remainResource.equals(ResourceProfile.ZERO)) {
+                    availableResources.add(
+                            new InternalResourceInfo(
+                                    
newPendingTaskManager.getPendingTaskManagerId(),
+                                    defaultSlotResourceProfile,
+                                    remainResource,
+                                    resultBuilder));
+                }
+            }
+        }
+    }
+
+    private static class InternalResourceInfo {
+        private final ResourceProfile defaultSlotProfile;
+        private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+        private ResourceProfile availableProfile;
+
+        InternalResourceInfo(
+                InstanceID instanceId,
+                ResourceProfile defaultSlotProfile,
+                ResourceProfile availableProfile,
+                ResourceAllocationResult.Builder resultBuilder) {
+            this.defaultSlotProfile = defaultSlotProfile;
+            this.availableProfile = availableProfile;
+            this.allocationConsumer =
+                    (jobId, slotProfile) ->
+                            resultBuilder.addAllocationOnRegisteredResource(
+                                    jobId, instanceId, slotProfile);
+        }
+
+        InternalResourceInfo(
+                PendingTaskManagerId pendingTaskManagerId,
+                ResourceProfile defaultSlotProfile,
+                ResourceProfile availableProfile,
+                ResourceAllocationResult.Builder resultBuilder) {
+            this.defaultSlotProfile = defaultSlotProfile;
+            this.availableProfile = availableProfile;
+            this.allocationConsumer =
+                    (jobId, slotProfile) ->
+                            resultBuilder.addAllocationOnPendingResource(
+                                    jobId, pendingTaskManagerId, slotProfile);
+        }

Review comment:
       It might not be necessary for `InternalResourceInfo` to understand the 
differences between registered and pending resources.
   
   We could have the constructor like the following,
   ```
   InternalResourceInfo(
                   ResourceProfile defaultSlotProfile,
                   ResourceProfile availableProfile,
                   BiConsumer<JobID, ResourceProfile> resourceAllocationAction) 
{
           // xxx
   }
   ```
   and decide the resource allocation action in `getRegisteredResources` and 
`getPendingResources`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to