azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r287733029
########## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ########## @@ -376,39 +391,57 @@ public StateTtlConfig build() { private static final long serialVersionUID = 1373998465131443873L; } - final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class); + private final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class); - public void activate(Strategies strategy) { + private void activate(Strategies strategy) { activate(strategy, EMPTY_STRATEGY); } - public void activate(Strategies strategy, CleanupStrategy config) { + private void activate(Strategies strategy, CleanupStrategy config) { strategies.put(strategy, config); } public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } + public boolean isCleanupInBackground() { + return isCleanupInBackground; + } + + private void setCleanupInBackground(boolean cleanupInBackground) { + isCleanupInBackground = cleanupInBackground; + } + @Nullable public IncrementalCleanupStrategy getIncrementalCleanupStrategy() { - return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP); + if (isCleanupInBackground()) { + return (IncrementalCleanupStrategy) strategies.getOrDefault(Strategies.INCREMENTAL_CLEANUP, DEFAULT_INCREMENTAL_CLEANUP_STRATEGY); + } else { + return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP); + } } public boolean inRocksdbCompactFilter() { - return strategies.containsKey(Strategies.ROCKSDB_COMPACTION_FILTER); + return getRocksdbCompactFilterCleanupStrategy() != null; } @Nullable public RocksdbCompactFilterCleanupStrategy getRocksdbCompactFilterCleanupStrategy() { - return (RocksdbCompactFilterCleanupStrategy) strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER); + if (isCleanupInBackground()) { + return (RocksdbCompactFilterCleanupStrategy) strategies.getOrDefault(Strategies.ROCKSDB_COMPACTION_FILTER, DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY); + } else { + return (RocksdbCompactFilterCleanupStrategy) strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER); + } } } /** Configuration of cleanup strategy while taking the full snapshot. */ public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy { private static final long serialVersionUID = 3109278696501988780L; + public static final IncrementalCleanupStrategy DEFAULT_INCREMENTAL_CLEANUP_STRATEGY = new IncrementalCleanupStrategy(10, true); Review comment: it can be at least package private. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services