dmvk commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1119751778
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java: ########## @@ -23,14 +23,27 @@ import java.util.Map; /** - * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)}, describing - * the parallelism each vertex could be scheduled with. - * - * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link - * SlotAllocator#tryReserveResources(VertexParallelism)}. + * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)} among with + * {@link org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment + * slotAssignments}, describing the parallelism each vertex could be scheduled with. */ -public interface VertexParallelism { - Map<JobVertexID, Integer> getMaxParallelismForVertices(); +public class VertexParallelism { Review Comment: If this is not an interface, does this method signature in SlotAllocator still make sense? ``` Optional<? extends VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots); ``` This has also been removed from the contract: ``` * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link * SlotAllocator#tryReserveResources(VertexParallelism)}. ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java: ########## @@ -64,7 +64,7 @@ CompletedCheckpoint addCheckpointAndSubsumeOldestOne( * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was * added. */ - default CompletedCheckpoint getLatestCheckpoint() throws Exception { Review Comment: OT: This should be a in separate commit once the history is fixed up ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -1401,7 +1403,9 @@ public void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvail adaptiveScheduler.tryToAssignSlots( CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( new StateTrackingMockExecutionGraph(), - new CreatingExecutionGraphTest.TestingVertexParallelism())); + new JobSchedulingPlan( + new VertexParallelism(Collections.emptyMap()), Review Comment: nit, would it make sense to introduce `VertexParallelism.empty()`? I've seen this construct in more places. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java: ########## @@ -318,7 +312,7 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( } @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(ExecutionGraph executionGraph) { Review Comment: nit ```suggestion public void goToWaitingForResources(@Nullable ExecutionGraph executionGraph) { ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java: ########## @@ -54,14 +58,35 @@ public interface SlotAllocator { Optional<? extends VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots); + /** + * Same as {@link #determineParallelism(JobInformation, Collection)} but additionally determine + * assignment of slots to execution slot sharing groups. + */ + default Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( + JobInformation jobInformation, + Collection<? extends SlotInfo> slots, + @Nullable ExecutionGraph previousExecutionGraph) { + return determineParallelismAndCalculateAssignment( + jobInformation, + slots, + JobAllocationsInformation.fromGraph( + previousExecutionGraph, + StateSizeEstimates.fromGraph(previousExecutionGraph))); + } Review Comment: Should we have this method? We can also simplify the JobAllocationInformation construction to `JobAllocationsInformation.fromGraph(previousExecutionGraph)`, which seems to eliminate the need even more. I was looking at the mock implementation and realized that this would be hard to mock correctly. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups; +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + + private static class AllocationScore implements Comparable<AllocationScore> { + + private final String groupId; + private final AllocationID allocationId; + + public AllocationScore(String groupId, AllocationID allocationId, long score) { + this.groupId = groupId; + this.allocationId = allocationId; + this.score = score; + } + + private final long score; + + public String getGroupId() { + return groupId; + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public long getScore() { + return score; + } + + @Override + public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { + int result = Long.compare(score, other.score); + if (result != 0) { + return result; + } + result = other.allocationId.compareTo(allocationId); + if (result != 0) { + return result; + } + return other.groupId.compareTo(groupId); + } + } + + @Override + public Collection<SlotAssignment> assignSlots( + JobInformation jobInformation, + Collection<? extends SlotInfo> freeSlots, + VertexParallelism vertexParallelism, + JobAllocationsInformation previousAllocations) { + checkState( + freeSlots.size() >= jobInformation.getSlotSharingGroups().size(), + "Not enough slots to allocate all the slot sharing groups (have: %s, need: %s)", + freeSlots.size(), + jobInformation.getSlotSharingGroups().size()); + + final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); + for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); + } + final Map<JobVertexID, Integer> parallelism = getParallelism(allGroups); + final PriorityQueue<AllocationScore> scores = + calculateScores(jobInformation, previousAllocations, allGroups, parallelism); + + Map<String, ExecutionSlotSharingGroup> groupsById = + allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity())); + Map<AllocationID, SlotInfo> slotsById = + freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, identity())); Review Comment: nit: It would be great to stick with marking variables as final if they're not to be overridden. 1) To keep things consistent; if the method is a mix of both, it feels weird 2) To improve readability; it gives the reader a strong hint that there is nothing magical happening with the variable after the assignment ```suggestion final Map<String, ExecutionSlotSharingGroup> groupsById = allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity())); final Map<AllocationID, SlotInfo> slotsById = freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, identity())); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java: ########## @@ -122,7 +123,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph( } @Override - public void goToWaitingForResources() { + public void goToWaitingForResources(ExecutionGraph executionGraph) { Review Comment: nit ```suggestion public void goToWaitingForResources(@Nullable ExecutionGraph executionGraph) { ``` ########## flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java: ########## @@ -67,12 +68,14 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests local recovery by restarting Flink processes. */ @ExtendWith(TestLoggerExtension.class) class LocalRecoveryITCase { Review Comment: I think I'm missing something here; how does this test local recovery with the AdaptiveScheduler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org