[
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davor Poldrugo updated KAFKA-4273:
----------------------------------
Description:
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that -
data can be discarded.
I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
* messages-prices-join-this-changelog
* messages-prices-join-other-changelog
Since these topics are created as compacted by Kafka Streams, and I don't wan't
to keep data forever, I have altered them to not use compaction. Right now my
RocksDB state stores grow indefinitely, and I don't have any options to define
TTL, or somehow periodically clean the older data.
A "hack" that I use to keep my disk usage low - I have schedulled a job to
periodically stop Kafka Streams instances - one at the time. This triggers a
rebalance, and partitions migrate to other instances. When the instance is
started again, there's another rebalance, and sometimes this instance starts
processing partitions that wasn't processing before the stop - which leads to
deletion of the RocksDB state store for those partitions
(state.cleanup.delay.ms). In the next rebalance the local store is recreated
with a restore consumer - which reads data from - as previously mentioned - a
non compacted topic. And this effectively leads to a "hacked TTL support" in
Kafka Streams DSL.
Questions:
* Do you think would be reasonable to add support in the DSL api to define TTL
for local store?
* Which opens another question - there are use cases which don't need the
intermediate topics to be created as "compact". Could also this be added to the
DSL api? Maybe only this could be added, and this flag should also be used for
the RocksDB TTL. Of course in this case another config would be mandatory - the
retention period or TTL for the intermediate topics and the state stores. I saw
there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015.
* Which also leads to another question, maybe some intermediate topics / state
stores need different TTL, so a it's not as simple as that. But after
KAFKA-3870, it will be easier.
RocksDB supports TTL:
*
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
* https://github.com/facebook/rocksdb/wiki/Time-to-Live
*
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
A somehow similar issue: KAFKA-4212
was:
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that -
data can be discarded.
I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
* messages-prices-join-this-changelog
* messages-prices-join-other-changelog
Since these topics are created as compacted by Kafka Streams, and I don't wan't
to keep data forever, I have altered them to not use compaction. Right now my
RocksDB state stores grow indefinitely, and I don't have any options to define
TTL, or somehow periodically clean the older data. I
A "hack" that I use to keep my disk usage low - I have schedulled a job to
periodically stop Kafka Streams instances - one at the time. This triggers a
rebalance, and partitions migrate to other instances. When the instance is
started again, there's another rebalance, and sometimes this instance starts
processing partitions that wasn't processing before the stop - which leads to
deletion of the RocksDB state store for those partitions
(state.cleanup.delay.ms). In the next rebalance the local store is recreated
with a restore consumer - which reads data from - as previously mentioned - a
non compacted topic. And this effectively leads to a "hacked TTL support" in
Kafka Streams DSL.
Questions:
* Do you think would be reasonable to add support in the DSL api to define TTL
for local store?
* Which opens another question - there are use cases which don't need the
intermediate topics to be created as "compact". Could also this be added to the
DSL api? Maybe only this could be added, and this flag should also be used for
the RocksDB TTL. Of course in this case another config would be mandatory - the
retention period or TTL for the intermediate topics and the state stores. I saw
there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015.
* Which also leads to another question, maybe some intermediate topics / state
stores need different TTL, so a it's not as simple as that. But after
KAFKA-3870, it will be easier.
RocksDB supports TTL:
*
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
* https://github.com/facebook/rocksdb/wiki/Time-to-Live
*
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
A somehow similar issue: KAFKA-4212
> Streams DSL - Add TTL / retention period support for intermediate topics and
> state stores
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Davor Poldrugo
> Assignee: Guozhang Wang
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
> * messages-prices-join-this-changelog
> * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't
> wan't to keep data forever, I have altered them to not use compaction. Right
> now my RocksDB state stores grow indefinitely, and I don't have any options
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to
> periodically stop Kafka Streams instances - one at the time. This triggers a
> rebalance, and partitions migrate to other instances. When the instance is
> started again, there's another rebalance, and sometimes this instance starts
> processing partitions that wasn't processing before the stop - which leads to
> deletion of the RocksDB state store for those partitions
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated
> with a restore consumer - which reads data from - as previously mentioned - a
> non compacted topic. And this effectively leads to a "hacked TTL support" in
> Kafka Streams DSL.
> Questions:
> * Do you think would be reasonable to add support in the DSL api to define
> TTL for local store?
> * Which opens another question - there are use cases which don't need the
> intermediate topics to be created as "compact". Could also this be added to
> the DSL api? Maybe only this could be added, and this flag should also be
> used for the RocksDB TTL. Of course in this case another config would be
> mandatory - the retention period or TTL for the intermediate topics and the
> state stores. I saw there is a new cleanup.policy - compact_and_delete -
> added with KAFKA-4015.
> * Which also leads to another question, maybe some intermediate topics /
> state stores need different TTL, so a it's not as simple as that. But after
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
> *
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
> * https://github.com/facebook/rocksdb/wiki/Time-to-Live
> *
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)