XComp commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r2084647835


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -22,56 +22,126 @@
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ * 
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to 
the minimal number of
+ * task managers.
+ */
 public class DefaultSlotAssigner implements SlotAssigner {
 
+    static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";
+
+    private final @Nullable String executionTarget;
+    private final boolean minimalTaskManagerPreferred;
+
+    DefaultSlotAssigner(@Nullable String executionTarget, boolean 
minimalTaskManagerPreferred) {
+        this.executionTarget = executionTarget;
+        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
+    }
+
     @Override
     public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        checkMinimumRequiredSlots(jobInformation, freeSlots);
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        final Collection<? extends SlotInfo> pickedSlots =
+                pickSlotsIfNeeded(allGroups.size(), freeSlots);
+
+        Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
         Collection<SlotAssignment> assignments = new ArrayList<>();
         for (ExecutionSlotSharingGroup group : allGroups) {
             assignments.add(new SlotAssignment(iterator.next(), group));
         }
         return assignments;
     }
 
-    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-        slotSharingGroup
-                .getJobVertexIds()
-                .forEach(
-                        jobVertexId -> {
-                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
-                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
-                                sharedSlotToVertexAssignment
-                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
-                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
-                            }
-                        });
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
+    private Collection<? extends SlotInfo> pickSlotsIfNeeded(
+            int requestExecutionSlotSharingGroups, Collection<? extends 
SlotInfo> freeSlots) {
+        Collection<? extends SlotInfo> pickedSlots = freeSlots;
+        // To avoid the sort-work loading.
+        if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
+                && minimalTaskManagerPreferred
+                && freeSlots.size() > requestExecutionSlotSharingGroups) {

Review Comment:
   ```suggestion
           if 
(APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
                   && minimalTaskManagerPreferred
                   // To avoid the sort-work loading.
                   && freeSlots.size() > requestExecutionSlotSharingGroups) {
   ```
   nit: the final condition is the actual target for the comment



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##########
@@ -93,14 +107,11 @@ public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
 
         final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> 
slotSharingGroupMetaInfo =
-                SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
+                getSlotSharingGroupMetaInfos(jobInformation);
 
-        final int minimumRequiredSlots =
-                slotSharingGroupMetaInfo.values().stream()
-                        .map(SlotSharingGroupMetaInfo::getMaxLowerBound)
-                        .reduce(0, Integer::sum);
+        final int minimumRequiredSlots = 
getMinimumRequiredSlots(slotSharingGroupMetaInfo);
 
-        if (minimumRequiredSlots > freeSlots.size()) {
+        if (getMinimumRequiredSlots(slotSharingGroupMetaInfo) > 
freeSlots.size()) {

Review Comment:
   ```suggestion
           if (minimumRequiredSlots > freeSlots.size()) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -22,56 +22,126 @@
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
+
+/**
+ * Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. Specifically,
+ * when the cluster is deployed in application mode and the {@link
+ * 
org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
+ * is enabled, execution slot sharing groups are preferentially assigned to 
the minimal number of
+ * task managers.
+ */
 public class DefaultSlotAssigner implements SlotAssigner {
 
+    static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";

Review Comment:
   ```suggestion
       @VisibleForTesting
       static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";
   ```
   nit



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -677,6 +679,31 @@ public InlineElement getDescription() {
                                             
code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    @Experimental
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+            key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+                    .booleanType()
+                    .defaultValue(true)

Review Comment:
   ```suggestion
                       .defaultValue(false)
   ```
   Shouldn't we have the feature disabled by default to have a more stable 
upgrade? Users can enable it if they need to explicitly.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java:
##########
@@ -41,9 +42,21 @@ public TestSlotInfo(ResourceProfile resourceProfile) {
         this(new AllocationID(), resourceProfile);
     }
 
+    public TestSlotInfo(TaskManagerLocation tml) {
+        this(new AllocationID(), ResourceProfile.ANY, tml);
+    }
+
     public TestSlotInfo(AllocationID allocationId, ResourceProfile 
resourceProfile) {
+        this(allocationId, resourceProfile, new LocalTaskManagerLocation());
+    }
+
+    public TestSlotInfo(

Review Comment:
   ```suggestion
       private TestSlotInfo(
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java:
##########
@@ -62,6 +62,8 @@ class SlotSharingSlotAllocatorTest {
     private static final IsSlotAvailableAndFreeFunction 
TEST_IS_SLOT_FREE_FUNCTION =
             ignored -> true;
     private static final boolean DISABLE_LOCAL_RECOVERY = false;
+    private static final String NULL_EXECUTION_TARGET = null;
+    private static final boolean MINIMAL_TASK_MANAGER_PREFERRED = false;

Review Comment:
   ```suggestion
       private static final boolean MINIMAL_TASK_MANAGER_PREFERRED_DISABLED = 
false;
   ```
   nit: the naming is confusing. I would expect it to be enabled based on the 
name.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    private static final JobVertex jobVertex = new 
JobVertex("testingJobVertex");
+    private static final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+
+    @Parameter int parallelism;
+
+    @Parameter(value = 1)
+    Collection<? extends SlotInfo> freeSlots;
+
+    @Parameter(value = 2)
+    List<TaskManagerLocation> minimalTaskExecutors;
+
+    @TestTemplate
+    void testAssignSlots() {
+        final SlotAssigner slotAssigner =
+                new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, 
true);
+        final VertexInformation vertexInfo =
+                new TestVertexInformation(jobVertex.getID(), parallelism, 
slotSharingGroup);
+        final VertexParallelism vertexParallel =
+                new VertexParallelism(
+                        singletonMap(vertexInfo.getJobVertexID(), 
vertexInfo.getParallelism()));
+        final JobInformation jobInformation = new 
TestJobInformation(singletonList(vertexInfo));
+        final Set<TaskManagerLocation> keptTaskExecutors =
+                slotAssigner.assignSlots(jobInformation, freeSlots, 
vertexParallel, null).stream()
+                        .map(assignment -> 
assignment.getSlotInfo().getTaskManagerLocation())

Review Comment:
   I'm not a big fan of filtering based on the `TaskManagerLocation` here: We 
could achieve the same by checking what slots are in the end picked for the 
slot assignment. Because that's what the `SlotAssigner` actually does.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -677,6 +679,31 @@ public InlineElement getDescription() {
                                             
code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    @Experimental
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Boolean> 
SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
+            key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This parameter defines whether 
the adaptive scheduler prioritizes "
+                                                    + "using the minimum 
number of %s when scheduling tasks.",
+                                            code("TaskManagers"))
+                                    .linebreak()
+                                    .text(
+                                            "Note, this parameter is 
introduced exclusively in Flink 1.20 LTS release line and suitable for 
non-enabling %s. "

Review Comment:
   ```suggestion
                                               "Note, this parameter is 
introduced exclusively in Flink 1.20 LTS release line and suitable if %s is not 
enabled. "
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    private static final JobVertex jobVertex = new 
JobVertex("testingJobVertex");
+    private static final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+
+    @Parameter int parallelism;
+
+    @Parameter(value = 1)
+    Collection<? extends SlotInfo> freeSlots;
+
+    @Parameter(value = 2)
+    List<TaskManagerLocation> minimalTaskExecutors;

Review Comment:
   ```suggestion
       List<TaskManagerLocation> expectedTaskManagerLocations;
   ```
   nit



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.APPLICATION_MODE_EXECUTION_TARGET;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DefaultSlotAssigner}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultSlotAssignerTest {
+
+    private static final TaskManagerLocation tml1 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot2OfTml1 = new TestSlotInfo(tml1);
+    private static final SlotInfo slot3OfTml1 = new TestSlotInfo(tml1);
+
+    private static final TaskManagerLocation tml2 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot2OfTml2 = new TestSlotInfo(tml2);
+    private static final SlotInfo slot3OfTml2 = new TestSlotInfo(tml2);
+
+    private static final TaskManagerLocation tml3 = new 
LocalTaskManagerLocation();
+    private static final SlotInfo slot1OfTml3 = new TestSlotInfo(tml3);
+    private static final SlotInfo slot2OfTml3 = new TestSlotInfo(tml3);
+
+    private static final List<SlotInfo> allSlots =
+            Arrays.asList(
+                    slot1OfTml1,
+                    slot2OfTml1,
+                    slot3OfTml1,
+                    slot1OfTml2,
+                    slot2OfTml2,
+                    slot3OfTml2,
+                    slot1OfTml3,
+                    slot2OfTml3);
+
+    private static final JobVertex jobVertex = new 
JobVertex("testingJobVertex");
+    private static final SlotSharingGroup slotSharingGroup = new 
SlotSharingGroup();
+
+    @Parameter int parallelism;
+
+    @Parameter(value = 1)
+    Collection<? extends SlotInfo> freeSlots;
+
+    @Parameter(value = 2)
+    List<TaskManagerLocation> minimalTaskExecutors;
+
+    @TestTemplate
+    void testAssignSlots() {
+        final SlotAssigner slotAssigner =
+                new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, 
true);
+        final VertexInformation vertexInfo =
+                new TestVertexInformation(jobVertex.getID(), parallelism, 
slotSharingGroup);
+        final VertexParallelism vertexParallel =
+                new VertexParallelism(
+                        singletonMap(vertexInfo.getJobVertexID(), 
vertexInfo.getParallelism()));
+        final JobInformation jobInformation = new 
TestJobInformation(singletonList(vertexInfo));
+        final Set<TaskManagerLocation> keptTaskExecutors =
+                slotAssigner.assignSlots(jobInformation, freeSlots, 
vertexParallel, null).stream()
+                        .map(assignment -> 
assignment.getSlotInfo().getTaskManagerLocation())
+                        .collect(Collectors.toSet());
+        
assertThat(minimalTaskExecutors).containsExactlyInAnyOrderElementsOf(keptTaskExecutors);
+    }
+
+    @Parameters(name = "parallelism={0}, freeSlots={1}, 
minimalTaskExecutors={2}")

Review Comment:
   ```suggestion
       @Parameters(name = "parallelism={0}, freeSlots={1}, 
expectedTaskManagerLocations={2}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to