rkhachatryan commented on code in PR #26663:
URL: https://github.com/apache/flink/pull/26663#discussion_r2155588460


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1123,6 +1124,29 @@ public boolean hasSufficientResources() {
                 .isPresent();
     }
 
+    private JobAllocationsInformation 
getJobAllocationsInformationFromGraphAndState(
+            @Nullable final ExecutionGraph previousExecutionGraph) {
+
+        final CompletedCheckpoint latestCompletedCheckpoint =
+                Optional.ofNullable(requestCheckpointStats())
+                        .map(CheckpointStatsSnapshot::getCounts)
+                        
.map(CheckpointStatsCounts::getNumberOfCompletedCheckpoints)
+                        // If checkpointing is disabled, 
completedCheckpointStore calls will
+                        // throw UnsupportedOperationException hence we verify 
that checkpoint is
+                        // present
+                        // before trying to access it.
+                        .filter(count -> count > 0)
+                        .map(count -> 
completedCheckpointStore.getLatestCheckpoint())

Review Comment:
   Also, when checking `DeactivatedCheckpointCompletedCheckpointStore`, I don't 
see it overriding `getLatestCheckpoint`. 
   So I still don't understand how can we get an exception here (unless it was 
some testing implementation or some other method call).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1123,6 +1124,29 @@ public boolean hasSufficientResources() {
                 .isPresent();
     }
 
+    private JobAllocationsInformation 
getJobAllocationsInformationFromGraphAndState(
+            @Nullable final ExecutionGraph previousExecutionGraph) {
+
+        final CompletedCheckpoint latestCompletedCheckpoint =
+                Optional.ofNullable(requestCheckpointStats())
+                        .map(CheckpointStatsSnapshot::getCounts)
+                        
.map(CheckpointStatsCounts::getNumberOfCompletedCheckpoints)
+                        // If checkpointing is disabled, 
completedCheckpointStore calls will
+                        // throw UnsupportedOperationException hence we verify 
that checkpoint is
+                        // present
+                        // before trying to access it.
+                        .filter(count -> count > 0)
+                        .map(count -> 
completedCheckpointStore.getLatestCheckpoint())

Review Comment:
   Reading latest checkpoint from stats snapshot is not ideal because it 
doesn't always return the latest snapshot.
   In case of contention the stale version is returned might might not contain 
the checkpoint.
   That might still be fine (it would result in allocation according to some 
previous checkpoint or no checkpoints, and the condition is rare).
   
   However, I don't understand why do we need this check. 
   Can't we use something like 
`previousExecutionGraph.getCheckpointCoordinatorConfiguration().isCheckpointingEnabled()`?
 (modulo null check)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -196,14 +196,16 @@ public Collection<AllocationScore> calculateScore(
     private static long estimateSize(
             KeyGroupRange newRange, VertexAllocationInformation allocation) {
         KeyGroupRange oldRange = allocation.getKeyGroupRange();
+        int numberOfKeyGroups = 
oldRange.getIntersection(newRange).getNumberOfKeyGroups();
         if (allocation.stateSizeInBytes * oldRange.getNumberOfKeyGroups() == 
0) {
-            return 0L;
+            // As we want to maintain same allocation for local recovery, we 
should give positive
+            // score to allocations with the same key group range even when we 
have no state.
+            return numberOfKeyGroups > 0 ? 1 : 0;

Review Comment:
   Ideally, this should be a separate commit because it fixes a separate 
problem.
   
   (and `testSlotsPreservationWithNoStateSameParallelism` I guess)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to