gyfora commented on code in PR #216:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/216#discussion_r874060431


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -131,15 +136,72 @@ private Optional<String> 
observeTriggeredSavepointProgress(
 
         LOG.info("Savepoint status updated with latest completed savepoint 
info");
         
currentSavepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
+        updateSavepointHistory(
+                currentSavepointInfo, savepointFetchResult.getSavepoint(), 
deployedConfig, true);
         return Optional.empty();
     }
 
+    @VisibleForTesting
+    void updateSavepointHistory(
+            SavepointInfo currentSavepointInfo,
+            Savepoint newSavepoint,
+            Configuration deployedConfig,
+            boolean cleanup) {
+        List<Savepoint> savepointHistory = 
currentSavepointInfo.getSavepointHistory();
+        if (savepointHistory == null) {
+            currentSavepointInfo.setSavepointHistory(savepointHistory = new 
ArrayList<>());
+        }
+        if (!savepointHistory.isEmpty()) {
+            Savepoint recentSp = savepointHistory.get(savepointHistory.size() 
- 1);
+            if (recentSp.getLocation().equals(newSavepoint.getLocation())) {
+                return;
+            }
+        }
+        savepointHistory.add(newSavepoint);
+
+        if (!cleanup) {
+            return;
+        }
+
+        // maintain history
+        int maxCount = 
configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+        while (savepointHistory.size() > maxCount) {
+            // remove oldest entries
+            disposeSavepointQuietly(savepointHistory.remove(0), 
deployedConfig);
+        }
+
+        Duration maxAge = 
configManager.getOperatorConfiguration().getSavepointHistoryMaxAge();
+        long maxTms = System.currentTimeMillis() - maxAge.toMillis();
+        Iterator<Savepoint> it = savepointHistory.iterator();
+        while (it.hasNext()) {
+            Savepoint sp = it.next();
+            if (sp.getTimeStamp() < maxTms && sp != newSavepoint) {
+                it.remove();
+                disposeSavepointQuietly(sp, deployedConfig);
+            }
+        }
+    }
+
+    private void disposeSavepointQuietly(Savepoint sp, Configuration conf) {
+        try {
+            LOG.info("Disposing savepoint {}", sp);
+            flinkService.disposeSavepoint(sp.getLocation(), conf);
+        } catch (Exception e) {
+            // savepoint dispose error should nota affect the deployment
+            LOG.error("Exception while disposing savepoint {}", 
sp.getLocation(), e);
+        }
+    }
+
     private void observeLatestSavepoint(
             SavepointInfo savepointInfo, String jobID, Configuration 
deployedConfig) {
         try {
             flinkService
                     .getLastCheckpoint(JobID.fromHexString(jobID), 
deployedConfig)
-                    .ifPresent(savepointInfo::updateLastSavepoint);
+                    .ifPresent(
+                            sp -> {
+                                savepointInfo.updateLastSavepoint(sp);
+                                updateSavepointHistory(savepointInfo, sp, 
deployedConfig, false);

Review Comment:
   You could cover this quite nicely with a FlinkVersion parameterized tests. 
The TestingFlinkService behaves according to the Flink version (keeping the 
Flink deployment around in Finished state after cancel for 1.15 and removing 
for earlier versions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to