gyfora commented on code in PR #216: URL: https://github.com/apache/flink-kubernetes-operator/pull/216#discussion_r874058450
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ########## @@ -131,15 +136,72 @@ private Optional<String> observeTriggeredSavepointProgress( LOG.info("Savepoint status updated with latest completed savepoint info"); currentSavepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint()); + updateSavepointHistory( + currentSavepointInfo, savepointFetchResult.getSavepoint(), deployedConfig, true); return Optional.empty(); } + @VisibleForTesting + void updateSavepointHistory( + SavepointInfo currentSavepointInfo, + Savepoint newSavepoint, + Configuration deployedConfig, + boolean cleanup) { + List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory(); + if (savepointHistory == null) { + currentSavepointInfo.setSavepointHistory(savepointHistory = new ArrayList<>()); + } + if (!savepointHistory.isEmpty()) { + Savepoint recentSp = savepointHistory.get(savepointHistory.size() - 1); + if (recentSp.getLocation().equals(newSavepoint.getLocation())) { + return; + } + } + savepointHistory.add(newSavepoint); + + if (!cleanup) { + return; + } + + // maintain history + int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount(); + while (savepointHistory.size() > maxCount) { + // remove oldest entries + disposeSavepointQuietly(savepointHistory.remove(0), deployedConfig); + } + + Duration maxAge = configManager.getOperatorConfiguration().getSavepointHistoryMaxAge(); + long maxTms = System.currentTimeMillis() - maxAge.toMillis(); + Iterator<Savepoint> it = savepointHistory.iterator(); + while (it.hasNext()) { + Savepoint sp = it.next(); + if (sp.getTimeStamp() < maxTms && sp != newSavepoint) { + it.remove(); + disposeSavepointQuietly(sp, deployedConfig); + } + } + } + + private void disposeSavepointQuietly(Savepoint sp, Configuration conf) { + try { + LOG.info("Disposing savepoint {}", sp); + flinkService.disposeSavepoint(sp.getLocation(), conf); + } catch (Exception e) { + // savepoint dispose error should nota affect the deployment + LOG.error("Exception while disposing savepoint {}", sp.getLocation(), e); + } + } + private void observeLatestSavepoint( SavepointInfo savepointInfo, String jobID, Configuration deployedConfig) { try { flinkService .getLastCheckpoint(JobID.fromHexString(jobID), deployedConfig) - .ifPresent(savepointInfo::updateLastSavepoint); + .ifPresent( + sp -> { + savepointInfo.updateLastSavepoint(sp); + updateSavepointHistory(savepointInfo, sp, deployedConfig, false); Review Comment: I am afraid this will only work for Flink 1.15 and it's not enough. You need to also add this to `FlinkService#cancelJob` where it would be recorded regardless of Flink version. There is no absolute guarantee that it will be recorded either way but having it in both places eliminates the bulk of the possibilities :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org