gyfora commented on code in PR #860:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/860#discussion_r1721033240


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -219,64 +240,205 @@ private void 
observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
     }
 
     /** Clean up and dispose savepoints according to the configured max 
size/age. */
+    private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
+        Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
+        if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
+                ctx.getOperatorConfig(), ctx.getObserveConfig())) {
+            snapshots = 
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
+            if (snapshots == null) {
+                snapshots = Set.of();
+            }
+        }
+
+        cleanupSavepointHistoryLegacy(ctx, snapshots);
+
+        if (CollectionUtil.isNullOrEmpty(snapshots)) {
+            return;
+        }
+        if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) {
+            var savepointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), SAVEPOINT);
+            var checkpointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), CHECKPOINT);
+            Stream.concat(savepointsToDelete.stream(), 
checkpointsToDelete.stream())
+                    .forEach(
+                            snapshot ->
+                                    ctx.getKubernetesClient()
+                                            .resource(snapshot)
+                                            .withTimeoutInMillis(0L)
+                                            .delete());
+        }
+    }
+
+    /**
+     * Returns a list of FlinkStateSnapshot resources that should be cleaned 
up based on age/count
+     * policies.
+     *
+     * @param snapshots list of all snapshots
+     * @param observeConfig observe config
+     * @param operatorConfig operator config
+     * @param snapshotType checkpoint or savepoint
+     * @return set of FlinkStateSnapshot resources to delete
+     */
     @VisibleForTesting
-    void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo 
currentSavepointInfo) {
+    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
+            Collection<FlinkStateSnapshot> snapshots,
+            Configuration observeConfig,
+            FlinkOperatorConfiguration operatorConfig,
+            SnapshotType snapshotType) {
+        var snapshotList =
+                snapshots.stream()
+                        .filter(
+                                s ->
+                                        
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
+                                                
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
+                        .filter(s -> (s.getSpec().isSavepoint() == 
(snapshotType == SAVEPOINT)))
+                        .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .collect(Collectors.toList());
+
+        var lastCompleteSnapshot =
+                snapshotList.stream()
+                        .filter(s -> 
COMPLETED.equals(s.getStatus().getState()))
+                        .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .orElse(null);
+
+        var maxCount = getMaxCountForSnapshotType(observeConfig, 
operatorConfig, snapshotType);
+        var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, 
snapshotType);
+        var result = new HashSet<FlinkStateSnapshot>();
+
+        if (snapshotList.size() < 2) {
+            return result;
+        }
+
+        for (var snapshot : snapshotList) {
+            if (snapshot.equals(lastCompleteSnapshot)) {
+                continue;
+            }
+
+            var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
+            if (snapshotList.size() - result.size() > maxCount || ts < maxTms) 
{
+                result.add(snapshot);
+            }
+        }
 
-        var observeConfig = ctx.getObserveConfig();
-        var flinkService = ctx.getFlinkService();
-        boolean savepointCleanupEnabled =
-                observeConfig.getBoolean(
-                        
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
+        return result;
+    }
 
-        // maintain history
-        List<Savepoint> savepointHistory = 
currentSavepointInfo.getSavepointHistory();
-        if (savepointHistory.size() < 2) {
+    /**
+     * Cleans up the savepoint history of a Flink resource from the old, 
deprecated
+     * savepoint-history. Secondary resources of FlinkStateSnapshot are used 
to determine count of
+     * completed savepoints to be able to properly clean old savepoints based 
on the count policy.
+     *
+     * @param ctx flink resource context
+     * @param allSecondarySnapshotResources all snapshot resources linked to 
this Flink resource
+     */
+    @VisibleForTesting
+    void cleanupSavepointHistoryLegacy(
+            FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> 
allSecondarySnapshotResources) {
+        var maxTms =
+                getMinAgeForSnapshotType(
+                        ctx.getObserveConfig(), ctx.getOperatorConfig(), 
SAVEPOINT);
+        var maxCount =
+                getMaxCountForSnapshotType(
+                        ctx.getObserveConfig(), ctx.getOperatorConfig(), 
SAVEPOINT);
+
+        var completedSavepointCrs =
+                allSecondarySnapshotResources.stream()
+                        .filter(
+                                s ->
+                                        s.getStatus() != null
+                                                && 
COMPLETED.equals(s.getStatus().getState()))
+                        .filter(s -> s.getSpec().isSavepoint())
+                        .count();
+        maxCount = Math.max(0, maxCount - completedSavepointCrs);

Review Comment:
   Shouldn't we always keep at least 1?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -219,64 +240,205 @@ private void 
observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
     }
 
     /** Clean up and dispose savepoints according to the configured max 
size/age. */
+    private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
+        Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
+        if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
+                ctx.getOperatorConfig(), ctx.getObserveConfig())) {
+            snapshots = 
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
+            if (snapshots == null) {
+                snapshots = Set.of();
+            }
+        }
+
+        cleanupSavepointHistoryLegacy(ctx, snapshots);
+
+        if (CollectionUtil.isNullOrEmpty(snapshots)) {
+            return;
+        }
+        if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) {
+            var savepointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), SAVEPOINT);
+            var checkpointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), CHECKPOINT);
+            Stream.concat(savepointsToDelete.stream(), 
checkpointsToDelete.stream())
+                    .forEach(
+                            snapshot ->
+                                    ctx.getKubernetesClient()
+                                            .resource(snapshot)
+                                            .withTimeoutInMillis(0L)
+                                            .delete());
+        }
+    }
+
+    /**
+     * Returns a list of FlinkStateSnapshot resources that should be cleaned 
up based on age/count
+     * policies.
+     *
+     * @param snapshots list of all snapshots
+     * @param observeConfig observe config
+     * @param operatorConfig operator config
+     * @param snapshotType checkpoint or savepoint
+     * @return set of FlinkStateSnapshot resources to delete
+     */
     @VisibleForTesting
-    void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo 
currentSavepointInfo) {
+    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
+            Collection<FlinkStateSnapshot> snapshots,
+            Configuration observeConfig,
+            FlinkOperatorConfiguration operatorConfig,
+            SnapshotType snapshotType) {
+        var snapshotList =
+                snapshots.stream()
+                        .filter(
+                                s ->
+                                        
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
+                                                
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
+                        .filter(s -> (s.getSpec().isSavepoint() == 
(snapshotType == SAVEPOINT)))
+                        .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .collect(Collectors.toList());
+
+        var lastCompleteSnapshot =
+                snapshotList.stream()
+                        .filter(s -> 
COMPLETED.equals(s.getStatus().getState()))
+                        .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .orElse(null);
+
+        var maxCount = getMaxCountForSnapshotType(observeConfig, 
operatorConfig, snapshotType);
+        var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, 
snapshotType);
+        var result = new HashSet<FlinkStateSnapshot>();
+
+        if (snapshotList.size() < 2) {
+            return result;
+        }
+
+        for (var snapshot : snapshotList) {
+            if (snapshot.equals(lastCompleteSnapshot)) {
+                continue;
+            }
+
+            var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
+            if (snapshotList.size() - result.size() > maxCount || ts < maxTms) 
{
+                result.add(snapshot);
+            }

Review Comment:
   We should probably short circuit this if size becomes 1 otherwise we may 
delete all of them if too old



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to