gyfora commented on code in PR #216: URL: https://github.com/apache/flink-kubernetes-operator/pull/216#discussion_r874060431
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ########## @@ -131,15 +136,72 @@ private Optional<String> observeTriggeredSavepointProgress( LOG.info("Savepoint status updated with latest completed savepoint info"); currentSavepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint()); + updateSavepointHistory( + currentSavepointInfo, savepointFetchResult.getSavepoint(), deployedConfig, true); return Optional.empty(); } + @VisibleForTesting + void updateSavepointHistory( + SavepointInfo currentSavepointInfo, + Savepoint newSavepoint, + Configuration deployedConfig, + boolean cleanup) { + List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory(); + if (savepointHistory == null) { + currentSavepointInfo.setSavepointHistory(savepointHistory = new ArrayList<>()); + } + if (!savepointHistory.isEmpty()) { + Savepoint recentSp = savepointHistory.get(savepointHistory.size() - 1); + if (recentSp.getLocation().equals(newSavepoint.getLocation())) { + return; + } + } + savepointHistory.add(newSavepoint); + + if (!cleanup) { + return; + } + + // maintain history + int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount(); + while (savepointHistory.size() > maxCount) { + // remove oldest entries + disposeSavepointQuietly(savepointHistory.remove(0), deployedConfig); + } + + Duration maxAge = configManager.getOperatorConfiguration().getSavepointHistoryMaxAge(); + long maxTms = System.currentTimeMillis() - maxAge.toMillis(); + Iterator<Savepoint> it = savepointHistory.iterator(); + while (it.hasNext()) { + Savepoint sp = it.next(); + if (sp.getTimeStamp() < maxTms && sp != newSavepoint) { + it.remove(); + disposeSavepointQuietly(sp, deployedConfig); + } + } + } + + private void disposeSavepointQuietly(Savepoint sp, Configuration conf) { + try { + LOG.info("Disposing savepoint {}", sp); + flinkService.disposeSavepoint(sp.getLocation(), conf); + } catch (Exception e) { + // savepoint dispose error should nota affect the deployment + LOG.error("Exception while disposing savepoint {}", sp.getLocation(), e); + } + } + private void observeLatestSavepoint( SavepointInfo savepointInfo, String jobID, Configuration deployedConfig) { try { flinkService .getLastCheckpoint(JobID.fromHexString(jobID), deployedConfig) - .ifPresent(savepointInfo::updateLastSavepoint); + .ifPresent( + sp -> { + savepointInfo.updateLastSavepoint(sp); + updateSavepointHistory(savepointInfo, sp, deployedConfig, false); Review Comment: You could cover this quite nicely with a FlinkVersion parameterized tests. The TestingFlinkService behaves according to the Flink version (keeping the Flink deployment around in Finished state after cancel for 1.15 and removing for earlier versions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org