Hi Guys!

I use kafka streams reduce function and it creates some state store change
log kafka internal topic ( like
app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ).

I wanted to set *retention bytes* and change *cleanup policy* to *delete*
to prevent the storage being full.  I set following configs in kafka
streams code:

Properties props = new Properties();

props.put(StreamsConfig.TOPIC_PREFIX +
TopicConfig.RETENTION_BYTES_CONFIG,
Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX +
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);

KafkaStreams streams = new KafkaStreams(builder.build(), props);

However, when a new topic is generated only the *retention* config is
applied to the newly generated internal topic and *cleanup policy* remains
*compact*.

Is there any missing step to do so ?  ( or Isn't it possible to set
internal topics cleanup policy to delete ?)

I use kafka version 1.0.0 and kafka-streams version 1.0.0

Thanks in advance 🙏

Reply via email to