dmvk commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1119751778


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java:
##########
@@ -23,14 +23,27 @@
 import java.util.Map;
 
 /**
- * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)}, describing
- * the parallelism each vertex could be scheduled with.
- *
- * <p>{@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#tryReserveResources(VertexParallelism)}.
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)} among with
+ * {@link 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment
+ * slotAssignments}, describing the parallelism each vertex could be scheduled 
with.
  */
-public interface VertexParallelism {
-    Map<JobVertexID, Integer> getMaxParallelismForVertices();
+public class VertexParallelism {

Review Comment:
   If this is not an interface, does this method signature in SlotAllocator 
still make sense?
   
   ```
       Optional<? extends VertexParallelism> determineParallelism(
               JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
   ```
   
   This has also been removed from the contract:
   
   ```
    * <p>{@link SlotAllocator} implementations may encode additional 
information to be used in {@link
    * SlotAllocator#tryReserveResources(VertexParallelism)}.
    ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java:
##########
@@ -64,7 +64,7 @@ CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
      * Returns the latest {@link CompletedCheckpoint} instance or 
<code>null</code> if none was
      * added.
      */
-    default CompletedCheckpoint getLatestCheckpoint() throws Exception {

Review Comment:
   OT: This should be a in separate commit once the history is fixed up



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -1401,7 +1403,9 @@ public void 
testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvail
                 adaptiveScheduler.tryToAssignSlots(
                         
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
                                 new StateTrackingMockExecutionGraph(),
-                                new 
CreatingExecutionGraphTest.TestingVertexParallelism()));
+                                new JobSchedulingPlan(
+                                        new 
VertexParallelism(Collections.emptyMap()),

Review Comment:
   nit, would it make sense to introduce `VertexParallelism.empty()`? I've seen 
this construct in more places.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java:
##########
@@ -318,7 +312,7 @@ public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(
         }
 
         @Override
-        public void goToWaitingForResources() {
+        public void goToWaitingForResources(ExecutionGraph executionGraph) {

Review Comment:
   nit
   
   ```suggestion
           public void goToWaitingForResources(@Nullable ExecutionGraph 
executionGraph) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java:
##########
@@ -54,14 +58,35 @@ public interface SlotAllocator {
     Optional<? extends VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
 
+    /**
+     * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+     * assignment of slots to execution slot sharing groups.
+     */
+    default Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> slots,
+            @Nullable ExecutionGraph previousExecutionGraph) {
+        return determineParallelismAndCalculateAssignment(
+                jobInformation,
+                slots,
+                JobAllocationsInformation.fromGraph(
+                        previousExecutionGraph,
+                        StateSizeEstimates.fromGraph(previousExecutionGraph)));
+    }

Review Comment:
   Should we have this method? We can also simplify the 
JobAllocationInformation construction to 
`JobAllocationsInformation.fromGraph(previousExecutionGraph)`, which seems to 
eliminate the need even more.
   
   I was looking at the mock implementation and realized that this would be 
hard to mock correctly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+    private static class AllocationScore implements 
Comparable<AllocationScore> {
+
+        private final String groupId;
+        private final AllocationID allocationId;
+
+        public AllocationScore(String groupId, AllocationID allocationId, long 
score) {
+            this.groupId = groupId;
+            this.allocationId = allocationId;
+            this.score = score;
+        }
+
+        private final long score;
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public AllocationID getAllocationId() {
+            return allocationId;
+        }
+
+        public long getScore() {
+            return score;
+        }
+
+        @Override
+        public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+            int result = Long.compare(score, other.score);
+            if (result != 0) {
+                return result;
+            }
+            result = other.allocationId.compareTo(allocationId);
+            if (result != 0) {
+                return result;
+            }
+            return other.groupId.compareTo(groupId);
+        }
+    }
+
+    @Override
+    public Collection<SlotAssignment> assignSlots(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> freeSlots,
+            VertexParallelism vertexParallelism,
+            JobAllocationsInformation previousAllocations) {
+        checkState(
+                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),
+                "Not enough slots to allocate all the slot sharing groups 
(have: %s, need: %s)",
+                freeSlots.size(),
+                jobInformation.getSlotSharingGroups().size());
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+        }
+        final Map<JobVertexID, Integer> parallelism = 
getParallelism(allGroups);
+        final PriorityQueue<AllocationScore> scores =
+                calculateScores(jobInformation, previousAllocations, 
allGroups, parallelism);
+
+        Map<String, ExecutionSlotSharingGroup> groupsById =
+                
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
+        Map<AllocationID, SlotInfo> slotsById =
+                freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, 
identity()));

Review Comment:
   nit: It would be great to stick with marking variables as final if they're 
not to be overridden.
   
   1) To keep things consistent; if the method is a mix of both, it feels weird
   2) To improve readability; it gives the reader a strong hint that there is 
nothing magical happening with the variable after the assignment
   
   ```suggestion
           final Map<String, ExecutionSlotSharingGroup> groupsById =
                   
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
           final Map<AllocationID, SlotInfo> slotsById =
                   freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, 
identity()));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java:
##########
@@ -122,7 +123,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph(
         }
 
         @Override
-        public void goToWaitingForResources() {
+        public void goToWaitingForResources(ExecutionGraph executionGraph) {

Review Comment:
   nit
   
   ```suggestion
           public void goToWaitingForResources(@Nullable ExecutionGraph 
executionGraph) {
   ```



##########
flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java:
##########
@@ -67,12 +68,14 @@
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests local recovery by restarting Flink processes. */
 @ExtendWith(TestLoggerExtension.class)
 class LocalRecoveryITCase {

Review Comment:
   I think I'm missing something here; how does this test local recovery with 
the AdaptiveScheduler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to