Kafka-streams: setting internal topics cleanup policy to delete doesn't work
Hi Guys! I use kafka streams reduce function and it creates some state store change log kafka internal topic ( like app-KSTREAM-REDUCE-STATE-STORE-02-changelog ). I wanted to set *retention bytes* and change *cleanup policy* to *delete* to prevent the storage being full. I set following configs in kafka streams code: Properties props = new Properties(); props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES); props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); KafkaStreams streams = new KafkaStreams(builder.build(), props); However, when a new topic is generated only the *retention* config is applied to the newly generated internal topic and *cleanup policy* remains *compact*. Is there any missing step to do so ? ( or Isn't it possible to set internal topics cleanup policy to delete ?) I use kafka version 1.0.0 and kafka-streams version 1.0.0 Thanks in advance 🙏
resetting consumer group offset to earliest and to-latest not working
Hello everyone, Hopefully this is the appropriate mailing list for my message. When i am trying to reset the offset of some consumer group, i get some echo telling me that the offset has indeed been reset to earliest or latest, but checking right after, the offset is still at its previous position, and restarting the consumers on the group, they indeed continue to consume message even after issuing the command to reset the offsets of the partitions to the latest offset. I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of the terminal if that could help you help me. Thank you in advance. Best regards
Re: resetting consumer group offset to earliest and to-latest not working
Hello Did you add --execute to the command? Which command did you use? Best regards Patrik > Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE : > > Hello everyone, > > Hopefully this is the appropriate mailing list for my message. > When i am trying to reset the offset of some consumer group, i get some echo > telling me that the offset has indeed been reset to earliest or latest, but > checking right after, the offset is still at its previous position, and > restarting the consumers on the group, they indeed continue to consume > message even after issuing the command to reset the offsets of the partitions > to the latest offset. > > I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of the > terminal if that could help you help me. > > Thank you in advance. Best regards
Re: resetting consumer group offset to earliest and to-latest not working
Oh thank you for pointing that out, in the screenshot i sent i indeed forgot that parameter. It was an attempt to reproduce an error encountered on another system On that system i can see that the source code includes the --execute command. I will send you logs, screenshots etc. using the original system when i'm back at work. Thank you On Sat, 1 Sep 2018 at 15:02, Patrik Kleindl wrote: > Hello > Did you add --execute to the command? > Which command did you use? > Best regards > Patrik > > > Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE < > joseph.mbi...@gmail.com>: > > > > Hello everyone, > > > > Hopefully this is the appropriate mailing list for my message. > > When i am trying to reset the offset of some consumer group, i get some > echo telling me that the offset has indeed been reset to earliest or > latest, but checking right after, the offset is still at its previous > position, and restarting the consumers on the group, they indeed continue > to consume message even after issuing the command to reset the offsets of > the partitions to the latest offset. > > > > I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of > the terminal if that could help you help me. > > > > Thank you in advance. Best regards >
Re: Kafka-streams: setting internal topics cleanup policy to delete doesn't work
Hello Amir, The issue you described seems like an old bug that is resolved since 1.1.0 (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150). Could you try out that version? You do not need to upgrade broker in order to use newer Streams library versions. Guozhang On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki wrote: > Hi Guys! > > I use kafka streams reduce function and it creates some state store change > log kafka internal topic ( like > app-KSTREAM-REDUCE-STATE-STORE-02-changelog ). > > I wanted to set *retention bytes* and change *cleanup policy* to *delete* > to prevent the storage being full. I set following configs in kafka > streams code: > > Properties props = new Properties(); > > props.put(StreamsConfig.TOPIC_PREFIX + > TopicConfig.RETENTION_BYTES_CONFIG, > Constants.INTERNAL_TOPICS_RETENTION_BYTES); > props.put(StreamsConfig.TOPIC_PREFIX + > TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); > > KafkaStreams streams = new KafkaStreams(builder.build(), props); > > However, when a new topic is generated only the *retention* config is > applied to the newly generated internal topic and *cleanup policy* remains > *compact*. > > Is there any missing step to do so ? ( or Isn't it possible to set > internal topics cleanup policy to delete ?) > > I use kafka version 1.0.0 and kafka-streams version 1.0.0 > > Thanks in advance 🙏 > -- -- Guozhang
Re: Kafka-streams: setting internal topics cleanup policy to delete doesn't work
Thanks Guozhang 🙏 upgrading kafka stream version to 1.1.0 fixed the issue 👍 On Sat, Sep 1, 2018 at 10:39 PM Guozhang Wang wrote: > Hello Amir, > > The issue you described seems like an old bug that is resolved since 1.1.0 > (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150). > Could you try out that version? > > You do not need to upgrade broker in order to use newer Streams library > versions. > > Guozhang > > On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki < > zare.ma...@gmail.com> > wrote: > > > Hi Guys! > > > > I use kafka streams reduce function and it creates some state store > change > > log kafka internal topic ( like > > app-KSTREAM-REDUCE-STATE-STORE-02-changelog ). > > > > I wanted to set *retention bytes* and change *cleanup policy* to *delete* > > to prevent the storage being full. I set following configs in kafka > > streams code: > > > > Properties props = new Properties(); > > > > props.put(StreamsConfig.TOPIC_PREFIX + > > TopicConfig.RETENTION_BYTES_CONFIG, > > Constants.INTERNAL_TOPICS_RETENTION_BYTES); > > props.put(StreamsConfig.TOPIC_PREFIX + > > TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); > > > > KafkaStreams streams = new KafkaStreams(builder.build(), props); > > > > However, when a new topic is generated only the *retention* config is > > applied to the newly generated internal topic and *cleanup policy* > remains > > *compact*. > > > > Is there any missing step to do so ? ( or Isn't it possible to set > > internal topics cleanup policy to delete ?) > > > > I use kafka version 1.0.0 and kafka-streams version 1.0.0 > > > > Thanks in advance 🙏 > > > > > > -- > -- Guozhang >