mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688472333
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ########## @@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo( } } + @Override + public CheckpointStatsResult fetchCheckpointStats( + String jobId, Long checkpointId, Configuration conf) { + try (RestClusterClient<String> clusterClient = getClusterClient(conf)) { + var checkpointStatusHeaders = CheckpointStatisticDetailsHeaders.getInstance(); + var parameters = checkpointStatusHeaders.getUnresolvedMessageParameters(); + parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + + // This was needed because the parameter is protected + var checkpointIdPathParameter = + (CheckpointIdPathParameter) Iterables.getLast(parameters.getPathParameters()); + checkpointIdPathParameter.resolve(checkpointId); + + var response = + clusterClient.sendRequest( + checkpointStatusHeaders, parameters, EmptyRequestBody.getInstance()); + + var stats = response.get(); + if (stats == null) { + throw new IllegalStateException("Checkpoint ID %d for job %s does not exist!"); + } else if (stats instanceof CheckpointStatistics.CompletedCheckpointStatistics) { + return CheckpointStatsResult.completed( + ((CheckpointStatistics.CompletedCheckpointStatistics) stats) + .getExternalPath()); + } else if (stats instanceof CheckpointStatistics.FailedCheckpointStatistics) { + return CheckpointStatsResult.error( + ((CheckpointStatistics.FailedCheckpointStatistics) stats) + .getFailureMessage()); + } else if (stats instanceof CheckpointStatistics.PendingCheckpointStatistics) { + return CheckpointStatsResult.pending(); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown checkpoint statistics result class: %s", + stats.getClass().getSimpleName())); + } + } catch (Exception e) { + LOG.error("Exception while fetching checkpoint statistics", e); Review Comment: We should handle cases where the checkpoint statistics are no longer stored on the web server, and we get the following error: ``` Could not find checkpointing statistics for checkpoint 243. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org