luca-p-castelli commented on code in PR #948: URL: https://github.com/apache/flink-kubernetes-operator/pull/948#discussion_r1968237377
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ########## @@ -441,21 +443,33 @@ private long getMaxCountForSnapshotType( } private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) { - var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); - ctx.getFlinkService() - .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) - .ifPresentOrElse( - snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), - () -> { - if (ReconciliationUtils.isJobCancelled(status)) { - // For cancelled jobs the observed savepoint is always definite, - // so if empty we know the job doesn't have any - // checkpoints/savepoints - jobStatus.setUpgradeSavepointPath(null); - } - }); + try { + ctx.getFlinkService() + .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) + .ifPresentOrElse( + snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), + () -> { + if (ReconciliationUtils.isJobCancelled(status)) { + // For cancelled jobs the observed savepoint is always definite, + // so if empty we know the job doesn't have any + // checkpoints/savepoints + jobStatus.setUpgradeSavepointPath(null); + } + }); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, RestClientException.class) + .map(ex -> ex.getMessage().contains("Checkpointing has not been enabled")) Review Comment: Good point. I'll add logic to check for null. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ########## @@ -441,21 +443,33 @@ private long getMaxCountForSnapshotType( } private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) { - var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); - ctx.getFlinkService() - .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) - .ifPresentOrElse( - snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), - () -> { - if (ReconciliationUtils.isJobCancelled(status)) { - // For cancelled jobs the observed savepoint is always definite, - // so if empty we know the job doesn't have any - // checkpoints/savepoints - jobStatus.setUpgradeSavepointPath(null); - } - }); + try { + ctx.getFlinkService() + .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) + .ifPresentOrElse( + snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), + () -> { + if (ReconciliationUtils.isJobCancelled(status)) { + // For cancelled jobs the observed savepoint is always definite, + // so if empty we know the job doesn't have any + // checkpoints/savepoints + jobStatus.setUpgradeSavepointPath(null); + } + }); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, RestClientException.class) + .map(ex -> ex.getMessage().contains("Checkpointing has not been enabled")) Review Comment: Good point. I'm not sure. I'll add logic to check for null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org