Kafka-streams: setting internal topics cleanup policy to delete doesn't work

2018-09-01 Thread Amir masud zarebidaki
Hi Guys!

I use kafka streams reduce function and it creates some state store change
log kafka internal topic ( like
app-KSTREAM-REDUCE-STATE-STORE-02-changelog ).

I wanted to set *retention bytes* and change *cleanup policy* to *delete*
to prevent the storage being full.  I set following configs in kafka
streams code:

Properties props = new Properties();

props.put(StreamsConfig.TOPIC_PREFIX +
TopicConfig.RETENTION_BYTES_CONFIG,
Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX +
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);

KafkaStreams streams = new KafkaStreams(builder.build(), props);

However, when a new topic is generated only the *retention* config is
applied to the newly generated internal topic and *cleanup policy* remains
*compact*.

Is there any missing step to do so ?  ( or Isn't it possible to set
internal topics cleanup policy to delete ?)

I use kafka version 1.0.0 and kafka-streams version 1.0.0

Thanks in advance 🙏


resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Joseph M'BIMBI-BENE
Hello everyone,

Hopefully this is the appropriate mailing list for my message.
When i am trying to reset the offset of some consumer group, i get some
echo telling me that the offset has indeed been reset to earliest or
latest, but checking right after, the offset is still at its previous
position, and restarting the consumers on the group, they indeed continue
to consume message even after issuing the command to reset the offsets of
the partitions to the latest offset.

I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of the
terminal if that could help you help me.

Thank you in advance. Best regards


Re: resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Patrik Kleindl
Hello
Did you add --execute to the command?
Which command did you use?
Best regards
Patrik

> Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE :
> 
> Hello everyone,
> 
> Hopefully this is the appropriate mailing list for my message. 
> When i am trying to reset the offset of some consumer group, i get some echo 
> telling me that the offset has indeed been reset to earliest or latest, but 
> checking right after, the offset is still at its previous position, and 
> restarting the consumers on the group, they indeed continue to consume 
> message even after issuing the command to reset the offsets of the partitions 
> to the latest offset.
> 
> I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of the 
> terminal if that could help you help me.
> 
> Thank you in advance. Best regards


Re: resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Joseph M'BIMBI-BENE
Oh thank you for pointing that out, in the screenshot i sent i indeed
forgot that parameter.
It was an attempt to reproduce an error encountered on another system

On that system i can see that the source code includes the --execute
command.
I will send you logs, screenshots etc. using the original system when i'm
back at work.

Thank you

On Sat, 1 Sep 2018 at 15:02, Patrik Kleindl  wrote:

> Hello
> Did you add --execute to the command?
> Which command did you use?
> Best regards
> Patrik
>
> > Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE <
> joseph.mbi...@gmail.com>:
> >
> > Hello everyone,
> >
> > Hopefully this is the appropriate mailing list for my message.
> > When i am trying to reset the offset of some consumer group, i get some
> echo telling me that the offset has indeed been reset to earliest or
> latest, but checking right after, the offset is still at its previous
> position, and restarting the consumers on the group, they indeed continue
> to consume message even after issuing the command to reset the offsets of
> the partitions to the latest offset.
> >
> > I am Using Kafka 1.1.1 for scala 2.11, and i will put a screenshot of
> the terminal if that could help you help me.
> >
> > Thank you in advance. Best regards
>


Re: Kafka-streams: setting internal topics cleanup policy to delete doesn't work

2018-09-01 Thread Guozhang Wang
Hello Amir,

The issue you described seems like an old bug that is resolved since 1.1.0
(as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).
Could you try out that version?

You do not need to upgrade broker in order to use newer Streams library
versions.

Guozhang

On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki 
wrote:

> Hi Guys!
>
> I use kafka streams reduce function and it creates some state store change
> log kafka internal topic ( like
> app-KSTREAM-REDUCE-STATE-STORE-02-changelog ).
>
> I wanted to set *retention bytes* and change *cleanup policy* to *delete*
> to prevent the storage being full.  I set following configs in kafka
> streams code:
>
> Properties props = new Properties();
>
> props.put(StreamsConfig.TOPIC_PREFIX +
> TopicConfig.RETENTION_BYTES_CONFIG,
> Constants.INTERNAL_TOPICS_RETENTION_BYTES);
> props.put(StreamsConfig.TOPIC_PREFIX +
> TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
>
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
>
> However, when a new topic is generated only the *retention* config is
> applied to the newly generated internal topic and *cleanup policy* remains
> *compact*.
>
> Is there any missing step to do so ?  ( or Isn't it possible to set
> internal topics cleanup policy to delete ?)
>
> I use kafka version 1.0.0 and kafka-streams version 1.0.0
>
> Thanks in advance 🙏
>



-- 
-- Guozhang


Re: Kafka-streams: setting internal topics cleanup policy to delete doesn't work

2018-09-01 Thread Amir masud zarebidaki
Thanks Guozhang 🙏
upgrading kafka stream version to 1.1.0 fixed the issue 👍

On Sat, Sep 1, 2018 at 10:39 PM Guozhang Wang  wrote:

> Hello Amir,
>
> The issue you described seems like an old bug that is resolved since 1.1.0
> (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).
> Could you try out that version?
>
> You do not need to upgrade broker in order to use newer Streams library
> versions.
>
> Guozhang
>
> On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki <
> zare.ma...@gmail.com>
> wrote:
>
> > Hi Guys!
> >
> > I use kafka streams reduce function and it creates some state store
> change
> > log kafka internal topic ( like
> > app-KSTREAM-REDUCE-STATE-STORE-02-changelog ).
> >
> > I wanted to set *retention bytes* and change *cleanup policy* to *delete*
> > to prevent the storage being full.  I set following configs in kafka
> > streams code:
> >
> > Properties props = new Properties();
> >
> > props.put(StreamsConfig.TOPIC_PREFIX +
> > TopicConfig.RETENTION_BYTES_CONFIG,
> > Constants.INTERNAL_TOPICS_RETENTION_BYTES);
> > props.put(StreamsConfig.TOPIC_PREFIX +
> > TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
> >
> > KafkaStreams streams = new KafkaStreams(builder.build(), props);
> >
> > However, when a new topic is generated only the *retention* config is
> > applied to the newly generated internal topic and *cleanup policy*
> remains
> > *compact*.
> >
> > Is there any missing step to do so ?  ( or Isn't it possible to set
> > internal topics cleanup policy to delete ?)
> >
> > I use kafka version 1.0.0 and kafka-streams version 1.0.0
> >
> > Thanks in advance 🙏
> >
>
>
>
> --
> -- Guozhang
>