rkhachatryan commented on code in PR #26663: URL: https://github.com/apache/flink/pull/26663#discussion_r2155588460
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -1123,6 +1124,29 @@ public boolean hasSufficientResources() { .isPresent(); } + private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( + @Nullable final ExecutionGraph previousExecutionGraph) { + + final CompletedCheckpoint latestCompletedCheckpoint = + Optional.ofNullable(requestCheckpointStats()) + .map(CheckpointStatsSnapshot::getCounts) + .map(CheckpointStatsCounts::getNumberOfCompletedCheckpoints) + // If checkpointing is disabled, completedCheckpointStore calls will + // throw UnsupportedOperationException hence we verify that checkpoint is + // present + // before trying to access it. + .filter(count -> count > 0) + .map(count -> completedCheckpointStore.getLatestCheckpoint()) Review Comment: Also, when checking `DeactivatedCheckpointCompletedCheckpointStore`, I don't see it overriding `getLatestCheckpoint`. So I still don't understand how can we get an exception here (unless it was some testing implementation or some other method call). ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -1123,6 +1124,29 @@ public boolean hasSufficientResources() { .isPresent(); } + private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( + @Nullable final ExecutionGraph previousExecutionGraph) { + + final CompletedCheckpoint latestCompletedCheckpoint = + Optional.ofNullable(requestCheckpointStats()) + .map(CheckpointStatsSnapshot::getCounts) + .map(CheckpointStatsCounts::getNumberOfCompletedCheckpoints) + // If checkpointing is disabled, completedCheckpointStore calls will + // throw UnsupportedOperationException hence we verify that checkpoint is + // present + // before trying to access it. + .filter(count -> count > 0) + .map(count -> completedCheckpointStore.getLatestCheckpoint()) Review Comment: Reading latest checkpoint from stats snapshot is not ideal because it doesn't always return the latest snapshot. In case of contention the stale version is returned might might not contain the checkpoint. That might still be fine (it would result in allocation according to some previous checkpoint or no checkpoints, and the condition is rare). However, I don't understand why do we need this check. Can't we use something like `previousExecutionGraph.getCheckpointCoordinatorConfiguration().isCheckpointingEnabled()`? (modulo null check) ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ########## @@ -196,14 +196,16 @@ public Collection<AllocationScore> calculateScore( private static long estimateSize( KeyGroupRange newRange, VertexAllocationInformation allocation) { KeyGroupRange oldRange = allocation.getKeyGroupRange(); + int numberOfKeyGroups = oldRange.getIntersection(newRange).getNumberOfKeyGroups(); if (allocation.stateSizeInBytes * oldRange.getNumberOfKeyGroups() == 0) { - return 0L; + // As we want to maintain same allocation for local recovery, we should give positive + // score to allocations with the same key group range even when we have no state. + return numberOfKeyGroups > 0 ? 1 : 0; Review Comment: Ideally, this should be a separate commit because it fixes a separate problem. (and `testSlotsPreservationWithNoStateSameParallelism` I guess) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org