gyfora commented on code in PR #860: URL: https://github.com/apache/flink-kubernetes-operator/pull/860#discussion_r1721033240
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ########## @@ -219,64 +240,205 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job } /** Clean up and dispose savepoints according to the configured max size/age. */ + private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) { + Set<FlinkStateSnapshot> snapshots = Collections.emptySet(); + if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled( + ctx.getOperatorConfig(), ctx.getObserveConfig())) { + snapshots = ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class); + if (snapshots == null) { + snapshots = Set.of(); + } + } + + cleanupSavepointHistoryLegacy(ctx, snapshots); + + if (CollectionUtil.isNullOrEmpty(snapshots)) { + return; + } + if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) { + var savepointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT); + var checkpointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), CHECKPOINT); + Stream.concat(savepointsToDelete.stream(), checkpointsToDelete.stream()) + .forEach( + snapshot -> + ctx.getKubernetesClient() + .resource(snapshot) + .withTimeoutInMillis(0L) + .delete()); + } + } + + /** + * Returns a list of FlinkStateSnapshot resources that should be cleaned up based on age/count + * policies. + * + * @param snapshots list of all snapshots + * @param observeConfig observe config + * @param operatorConfig operator config + * @param snapshotType checkpoint or savepoint + * @return set of FlinkStateSnapshot resources to delete + */ @VisibleForTesting - void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo currentSavepointInfo) { + Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp( + Collection<FlinkStateSnapshot> snapshots, + Configuration observeConfig, + FlinkOperatorConfiguration operatorConfig, + SnapshotType snapshotType) { + var snapshotList = + snapshots.stream() + .filter( + s -> + CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains( + FlinkStateSnapshotUtils.getSnapshotTriggerType(s))) + .filter(s -> (s.getSpec().isSavepoint() == (snapshotType == SAVEPOINT))) + .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .collect(Collectors.toList()); + + var lastCompleteSnapshot = + snapshotList.stream() + .filter(s -> COMPLETED.equals(s.getStatus().getState())) + .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .orElse(null); + + var maxCount = getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType); + var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType); + var result = new HashSet<FlinkStateSnapshot>(); + + if (snapshotList.size() < 2) { + return result; + } + + for (var snapshot : snapshotList) { + if (snapshot.equals(lastCompleteSnapshot)) { + continue; + } + + var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli(); + if (snapshotList.size() - result.size() > maxCount || ts < maxTms) { + result.add(snapshot); + } + } - var observeConfig = ctx.getObserveConfig(); - var flinkService = ctx.getFlinkService(); - boolean savepointCleanupEnabled = - observeConfig.getBoolean( - KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED); + return result; + } - // maintain history - List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory(); - if (savepointHistory.size() < 2) { + /** + * Cleans up the savepoint history of a Flink resource from the old, deprecated + * savepoint-history. Secondary resources of FlinkStateSnapshot are used to determine count of + * completed savepoints to be able to properly clean old savepoints based on the count policy. + * + * @param ctx flink resource context + * @param allSecondarySnapshotResources all snapshot resources linked to this Flink resource + */ + @VisibleForTesting + void cleanupSavepointHistoryLegacy( + FlinkResourceContext<CR> ctx, Set<FlinkStateSnapshot> allSecondarySnapshotResources) { + var maxTms = + getMinAgeForSnapshotType( + ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT); + var maxCount = + getMaxCountForSnapshotType( + ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT); + + var completedSavepointCrs = + allSecondarySnapshotResources.stream() + .filter( + s -> + s.getStatus() != null + && COMPLETED.equals(s.getStatus().getState())) + .filter(s -> s.getSpec().isSavepoint()) + .count(); + maxCount = Math.max(0, maxCount - completedSavepointCrs); Review Comment: Shouldn't we always keep at least 1? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ########## @@ -219,64 +240,205 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job } /** Clean up and dispose savepoints according to the configured max size/age. */ + private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) { + Set<FlinkStateSnapshot> snapshots = Collections.emptySet(); + if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled( + ctx.getOperatorConfig(), ctx.getObserveConfig())) { + snapshots = ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class); + if (snapshots == null) { + snapshots = Set.of(); + } + } + + cleanupSavepointHistoryLegacy(ctx, snapshots); + + if (CollectionUtil.isNullOrEmpty(snapshots)) { + return; + } + if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) { + var savepointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT); + var checkpointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), CHECKPOINT); + Stream.concat(savepointsToDelete.stream(), checkpointsToDelete.stream()) + .forEach( + snapshot -> + ctx.getKubernetesClient() + .resource(snapshot) + .withTimeoutInMillis(0L) + .delete()); + } + } + + /** + * Returns a list of FlinkStateSnapshot resources that should be cleaned up based on age/count + * policies. + * + * @param snapshots list of all snapshots + * @param observeConfig observe config + * @param operatorConfig operator config + * @param snapshotType checkpoint or savepoint + * @return set of FlinkStateSnapshot resources to delete + */ @VisibleForTesting - void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo currentSavepointInfo) { + Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp( + Collection<FlinkStateSnapshot> snapshots, + Configuration observeConfig, + FlinkOperatorConfiguration operatorConfig, + SnapshotType snapshotType) { + var snapshotList = + snapshots.stream() + .filter( + s -> + CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains( + FlinkStateSnapshotUtils.getSnapshotTriggerType(s))) + .filter(s -> (s.getSpec().isSavepoint() == (snapshotType == SAVEPOINT))) + .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .collect(Collectors.toList()); + + var lastCompleteSnapshot = + snapshotList.stream() + .filter(s -> COMPLETED.equals(s.getStatus().getState())) + .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .orElse(null); + + var maxCount = getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType); + var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType); + var result = new HashSet<FlinkStateSnapshot>(); + + if (snapshotList.size() < 2) { + return result; + } + + for (var snapshot : snapshotList) { + if (snapshot.equals(lastCompleteSnapshot)) { + continue; + } + + var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli(); + if (snapshotList.size() - result.size() > maxCount || ts < maxTms) { + result.add(snapshot); + } Review Comment: We should probably short circuit this if size becomes 1 otherwise we may delete all of them if too old -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org