Hi,

I'm running a Kafka streams app(v2.1.0) with windowed function. But
after 24 hours running, local disc usage increased from 5G to 20G and
keeps increasing. From what I googled, once I introduced `windowedBy`,
it should remove old data automatically.

My topology looks like below:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

One thing I can't understand is, from this topology, it will create
two internal repartition topics, as repartition-03 and repartition-14
for two groupBy actions. From the disc, all machines which are taking
repartition-03 tasks are having high disc usage and seems never
removing old data while machines which are running repartition-14
tasks are always under low disc usage.

When I log in to the machines, I found different path for those two
machines as below:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014

/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

Why they are having different path? 2_40 is one of the repartition-14
tasks and it has rocksdb in the path while the other doesn't contain
rocksdb. Meanwhile, taks 1_4 keeps couple folders like
KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000 but with different
suffix.

I though once I introduced windowedBy function, rocksdb will remove
old data when window is expired? And why above two internal
repartition topics have different path and retention behavior?

Any help is highly appreciated! Thanks!

Reply via email to