mateczagany commented on code in PR #860:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/860#discussion_r1721835512


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -219,64 +240,205 @@ private void 
observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
     }
 
     /** Clean up and dispose savepoints according to the configured max 
size/age. */
+    private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
+        Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
+        if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
+                ctx.getOperatorConfig(), ctx.getObserveConfig())) {
+            snapshots = 
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
+            if (snapshots == null) {
+                snapshots = Set.of();
+            }
+        }
+
+        cleanupSavepointHistoryLegacy(ctx, snapshots);
+
+        if (CollectionUtil.isNullOrEmpty(snapshots)) {
+            return;
+        }
+        if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) {
+            var savepointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), SAVEPOINT);
+            var checkpointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), CHECKPOINT);
+            Stream.concat(savepointsToDelete.stream(), 
checkpointsToDelete.stream())
+                    .forEach(
+                            snapshot ->
+                                    ctx.getKubernetesClient()
+                                            .resource(snapshot)
+                                            .withTimeoutInMillis(0L)
+                                            .delete());
+        }
+    }
+
+    /**
+     * Returns a list of FlinkStateSnapshot resources that should be cleaned 
up based on age/count
+     * policies.
+     *
+     * @param snapshots list of all snapshots
+     * @param observeConfig observe config
+     * @param operatorConfig operator config
+     * @param snapshotType checkpoint or savepoint
+     * @return set of FlinkStateSnapshot resources to delete
+     */
     @VisibleForTesting
-    void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo 
currentSavepointInfo) {
+    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
+            Collection<FlinkStateSnapshot> snapshots,
+            Configuration observeConfig,
+            FlinkOperatorConfiguration operatorConfig,
+            SnapshotType snapshotType) {
+        var snapshotList =
+                snapshots.stream()
+                        .filter(
+                                s ->
+                                        
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
+                                                
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
+                        .filter(s -> (s.getSpec().isSavepoint() == 
(snapshotType == SAVEPOINT)))
+                        .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .collect(Collectors.toList());
+
+        var lastCompleteSnapshot =
+                snapshotList.stream()
+                        .filter(s -> 
COMPLETED.equals(s.getStatus().getState()))
+                        .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .orElse(null);
+
+        var maxCount = getMaxCountForSnapshotType(observeConfig, 
operatorConfig, snapshotType);
+        var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, 
snapshotType);
+        var result = new HashSet<FlinkStateSnapshot>();
+
+        if (snapshotList.size() < 2) {
+            return result;
+        }
+
+        for (var snapshot : snapshotList) {
+            if (snapshot.equals(lastCompleteSnapshot)) {
+                continue;
+            }
+
+            var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
+            if (snapshotList.size() - result.size() > maxCount || ts < maxTms) 
{
+                result.add(snapshot);
+            }
+        }
 
-        var observeConfig = ctx.getObserveConfig();
-        var flinkService = ctx.getFlinkService();
-        boolean savepointCleanupEnabled =
-                observeConfig.getBoolean(
-                        
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
+        return result;
+    }
 
-        // maintain history
-        List<Savepoint> savepointHistory = 
currentSavepointInfo.getSavepointHistory();
-        if (savepointHistory.size() < 2) {
+    /**
+     * Cleans up the savepoint history of a Flink resource from the old, 
deprecated
+     * savepoint-history. Secondary resources of FlinkStateSnapshot are used 
to determine count of
+     * completed savepoints to be able to properly clean old savepoints based 
on the count policy.
+     *
+     * @param ctx flink resource context
+     * @param allSecondarySnapshotResources all snapshot resources linked to 
this Flink resource
+     */
+    @VisibleForTesting
+    void cleanupSavepointHistoryLegacy(
+            FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> 
allSecondarySnapshotResources) {
+        var maxTms =
+                getMinAgeForSnapshotType(
+                        ctx.getObserveConfig(), ctx.getOperatorConfig(), 
SAVEPOINT);
+        var maxCount =
+                getMaxCountForSnapshotType(
+                        ctx.getObserveConfig(), ctx.getOperatorConfig(), 
SAVEPOINT);
+
+        var completedSavepointCrs =
+                allSecondarySnapshotResources.stream()
+                        .filter(
+                                s ->
+                                        s.getStatus() != null
+                                                && 
COMPLETED.equals(s.getStatus().getState()))
+                        .filter(s -> s.getSpec().isSavepoint())
+                        .count();
+        maxCount = Math.max(0, maxCount - completedSavepointCrs);

Review Comment:
   I have added another `Math.max` to `getMaxCountForSnapshotType`. Here we can 
use 0, as we should be able to clear the list of legacy savepoints if we have 
enough completed FlinkStateSnapshot resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to