rkhachatryan commented on code in PR #26663: URL: https://github.com/apache/flink/pull/26663#discussion_r2155584543
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -1123,6 +1124,29 @@ public boolean hasSufficientResources() { .isPresent(); } + private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( + @Nullable final ExecutionGraph previousExecutionGraph) { + + final CompletedCheckpoint latestCompletedCheckpoint = + Optional.ofNullable(requestCheckpointStats()) + .map(CheckpointStatsSnapshot::getCounts) + .map(CheckpointStatsCounts::getNumberOfCompletedCheckpoints) + // If checkpointing is disabled, completedCheckpointStore calls will + // throw UnsupportedOperationException hence we verify that checkpoint is + // present + // before trying to access it. + .filter(count -> count > 0) + .map(count -> completedCheckpointStore.getLatestCheckpoint()) Review Comment: Reading latest checkpoint from stats snapshot is not ideal because it doesn't always return the latest snapshot: in case of contention, a stale version is returned (which might not contain the most recent checkpoint, or any checkpoints at all). It might still be fine (it would result in allocation according to some previous checkpoint or no checkpoints, and the condition is rare). However, I don't understand why do we need this check. Can't we use something like `previousExecutionGraph.getCheckpointCoordinatorConfiguration().isCheckpointingEnabled()`? (modulo null check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org