RocMarshal commented on code in PR #25218: URL: https://github.com/apache/flink/pull/25218#discussion_r1759680801
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java: ########## @@ -48,14 +50,30 @@ public Collection<SlotAssignment> assignSlots( allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); } - Iterator<? extends SlotInfo> iterator = freeSlots.iterator(); + Iterator<? extends SlotInfo> iterator = + selectSlotsInMinimalTaskExecutors(freeSlots, allGroups, Collections.emptyList()) + .iterator(); Review Comment: After discussion offline: If it is in application mode and the number of slots for each TM is equal, the commented is a good idea to optimize it. However, there are the following situations for compatibility here: 1. Deployment mode: application & session; 2. Some user configurations may cause the number of slots on TM to be different in session mode - For the application mode Sort the TM in reverse order according to the number of slots, and then start using slots, which is a good choice because it can use the minimum number of TMs (at job scope side/vision). - For the session mode: Sorting TM in ascending order based on the number of slots and then starting to use slots is a good choice, as it allows for the minimum number of TM((at `ResourceManager`/`session-cluster` scope side/vision)) to be used for multiple jobs are running. It's mentioned that although this solution may not achieve maximum benefits(the number of slots for each TM is not equal) in application mode, it can still have quite good results(when the number of slots for each TM is equal). So, sorting TM in reverse order based on the number of slots and selecting its is a good choice. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java: ########## @@ -18,18 +18,123 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; -/** Interface for assigning slots to slot sharing groups. */ +import static java.util.function.Function.identity; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; + +/** The Interface for assigning slots to slot sharing groups. */ @Internal public interface SlotAssigner { + /** + * The helper class to represent the allocation score on the specified group and allocated slot. + */ + class AllocationScore implements Comparable<AllocationScore> { + + private final String groupId; + private final AllocationID allocationId; + private final long score; + + public AllocationScore(String groupId, AllocationID allocationId, long score) { + this.groupId = groupId; + this.allocationId = allocationId; + this.score = score; + } + + public String getGroupId() { + return groupId; + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public long getScore() { + return score; + } + + @Override + public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { Review Comment: Thx for the comment. I'd want to remote the redundant reference and keep the `AllocationScore` in `AllocationScore`. Because it's used in the all implementations of `SlotAssigner`. Please let me know what's your opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org