Thanks Guozhang 🙏 upgrading kafka stream version to 1.1.0 fixed the issue 👍
On Sat, Sep 1, 2018 at 10:39 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Amir, > > The issue you described seems like an old bug that is resolved since 1.1.0 > (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150). > Could you try out that version? > > You do not need to upgrade broker in order to use newer Streams library > versions. > > Guozhang > > On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki < > zare.ma...@gmail.com> > wrote: > > > Hi Guys! > > > > I use kafka streams reduce function and it creates some state store > change > > log kafka internal topic ( like > > app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ). > > > > I wanted to set *retention bytes* and change *cleanup policy* to *delete* > > to prevent the storage being full. I set following configs in kafka > > streams code: > > > > Properties props = new Properties(); > > > > props.put(StreamsConfig.TOPIC_PREFIX + > > TopicConfig.RETENTION_BYTES_CONFIG, > > Constants.INTERNAL_TOPICS_RETENTION_BYTES); > > props.put(StreamsConfig.TOPIC_PREFIX + > > TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); > > > > KafkaStreams streams = new KafkaStreams(builder.build(), props); > > > > However, when a new topic is generated only the *retention* config is > > applied to the newly generated internal topic and *cleanup policy* > remains > > *compact*. > > > > Is there any missing step to do so ? ( or Isn't it possible to set > > internal topics cleanup policy to delete ?) > > > > I use kafka version 1.0.0 and kafka-streams version 1.0.0 > > > > Thanks in advance 🙏 > > > > > > -- > -- Guozhang >