[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-4273. ------------------------------------ Fix Version/s: 2.1.0 Resolution: Fixed > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > ----------------------------------------------------------------------------------------- > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.1 > Reporter: Davor Poldrugo > Priority: Major > Fix For: 2.1.0 > > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian Jira (v8.20.10#820010)