mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691974263
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ########## @@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo( } } + @Override + public CheckpointStatsResult fetchCheckpointStats( + String jobId, Long checkpointId, Configuration conf) { + try (RestClusterClient<String> clusterClient = getClusterClient(conf)) { + var checkpointStatusHeaders = CheckpointStatisticDetailsHeaders.getInstance(); + var parameters = checkpointStatusHeaders.getUnresolvedMessageParameters(); + parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + + // This was needed because the parameter is protected + var checkpointIdPathParameter = + (CheckpointIdPathParameter) Iterables.getLast(parameters.getPathParameters()); + checkpointIdPathParameter.resolve(checkpointId); + + var response = + clusterClient.sendRequest( + checkpointStatusHeaders, parameters, EmptyRequestBody.getInstance()); + + var stats = response.get(); + if (stats == null) { + throw new IllegalStateException("Checkpoint ID %d for job %s does not exist!"); + } else if (stats instanceof CheckpointStatistics.CompletedCheckpointStatistics) { + return CheckpointStatsResult.completed( + ((CheckpointStatistics.CompletedCheckpointStatistics) stats) + .getExternalPath()); + } else if (stats instanceof CheckpointStatistics.FailedCheckpointStatistics) { + return CheckpointStatsResult.error( + ((CheckpointStatistics.FailedCheckpointStatistics) stats) + .getFailureMessage()); + } else if (stats instanceof CheckpointStatistics.PendingCheckpointStatistics) { + return CheckpointStatsResult.pending(); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown checkpoint statistics result class: %s", + stats.getClass().getSimpleName())); + } + } catch (Exception e) { + LOG.error("Exception while fetching checkpoint statistics", e); Review Comment: This commit made it so we ignore errors when fetching checkpoint stats, since at that point we have determined that the checkpoint was successful anyways. We just set an empty path: https://github.com/apache/flink-kubernetes-operator/pull/821/commits/637b3053b9644b3d6cadd1fb6a05e1f5aab75fa6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org