mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691974263


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo(
         }
     }
 
+    @Override
+    public CheckpointStatsResult fetchCheckpointStats(
+            String jobId, Long checkpointId, Configuration conf) {
+        try (RestClusterClient<String> clusterClient = getClusterClient(conf)) 
{
+            var checkpointStatusHeaders = 
CheckpointStatisticDetailsHeaders.getInstance();
+            var parameters = 
checkpointStatusHeaders.getUnresolvedMessageParameters();
+            parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
+
+            // This was needed because the parameter is protected
+            var checkpointIdPathParameter =
+                    (CheckpointIdPathParameter) 
Iterables.getLast(parameters.getPathParameters());
+            checkpointIdPathParameter.resolve(checkpointId);
+
+            var response =
+                    clusterClient.sendRequest(
+                            checkpointStatusHeaders, parameters, 
EmptyRequestBody.getInstance());
+
+            var stats = response.get();
+            if (stats == null) {
+                throw new IllegalStateException("Checkpoint ID %d for job %s 
does not exist!");
+            } else if (stats instanceof 
CheckpointStatistics.CompletedCheckpointStatistics) {
+                return CheckpointStatsResult.completed(
+                        ((CheckpointStatistics.CompletedCheckpointStatistics) 
stats)
+                                .getExternalPath());
+            } else if (stats instanceof 
CheckpointStatistics.FailedCheckpointStatistics) {
+                return CheckpointStatsResult.error(
+                        ((CheckpointStatistics.FailedCheckpointStatistics) 
stats)
+                                .getFailureMessage());
+            } else if (stats instanceof 
CheckpointStatistics.PendingCheckpointStatistics) {
+                return CheckpointStatsResult.pending();
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unknown checkpoint statistics result class: 
%s",
+                                stats.getClass().getSimpleName()));
+            }
+        } catch (Exception e) {
+            LOG.error("Exception while fetching checkpoint statistics", e);

Review Comment:
   This commit made it so we ignore errors when fetching checkpoint stats, 
since at that point we have determined that the checkpoint was successful 
anyways. We just set an empty path: 
https://github.com/apache/flink-kubernetes-operator/pull/821/commits/637b3053b9644b3d6cadd1fb6a05e1f5aab75fa6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to