Hi,
As per Eno suggestion I have pre-created internal changelog topics with
increased max.message.bytes config to handle big messages that gets
incremented over the time.

As Matthias has pointed that we cannot use retention.ms setting to delete
older message data after a given time, is there a way to purge older
messages from my changelog topic.

Remember my changelog topic is
key=list of objects
and this grows with time.

So I would like these to be deleted from time to time because I would have
already consumed the objects so that key/value can be deleted.
Later if I get a new object for the same key then that's a new message and
old data has no use for the streaming application.

So how can I achieve the following.
Would retention.bytes help here?

Is there a way if i can set expire after or something like that at message
level and some kafka thread would purge those messages.

Thanks
Sachin



On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> My two cents:
>
> Changelog topics are compacted topics, thus they do not have a
> retention time (there is an exception for windowed KTable changlog
> topics that are compacted and do have a retention time though).
>
> However, I do not understand how changing retention time should fix
> the issue. If your list of values grows and exceed max.message.byte
> you will need to increase this parameter (or shrink you value).
>
> Besides this, Eno's answer is the way to go. In order to figure out
> internal topic names, you can use KafkaStreams#toString().
>
>
> - -Matthias
>
>
>
> On 11/8/16 11:14 AM, Eno Thereska wrote:
> > Hi Sachin,
> >
> > One option right now would be to precreate all internal topics in
> > Kafka, and only after that start the Kafka Streams application.
> > This would require you knowing the internal name of the topics (in
> > this case you probably already know it, but I agree that in general
> > this is a bit cumbersome).
> >
> > Eno
> >
> >> On 8 Nov 2016, at 18:10, Sachin Mittal <sjmit...@gmail.com>
> >> wrote:
> >>
> >> Per message payload size. The basic question is how can I control
> >> the internal change log topics parameters so as to avoid these
> >> errors.
> >>
> >>
> >> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna <krishna...@gmail.com>
> >> wrote:
> >>
> >>> Are you talking about total messages and therefore size or per
> >>> message payload size.
> >>>
> >>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> >>> <sjmit...@gmail.com> wrote:
> >>>
> >>>> Message size itself increases over the time.
> >>>>
> >>>> Message is something like key=[list on objects]
> >>>>
> >>>> This increases with time and then at a point kafka is not
> >>>> able to add any message to its topic because message size is
> >>>> greater than max.message.bytes. Since this is an internal
> >>>> topic based off a table I don't know how can I control this
> >>>> topic.
> >>>>
> >>>> If I can set some retention.ms for this topic then I can
> >>>> purge old messages thereby ensuring that message size stays
> >>>> within limit.
> >>>>
> >>>> Thanks Sachin
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
> >>>> <eno.there...@gmail.com> wrote:
> >>>>
> >>>>> Hi Sachin,
> >>>>>
> >>>>> Could you clarify what you mean by "message size
> >>>>> increases"? Are
> >>> messages
> >>>>> going to the changelog topic increasing in size? Or is the
> >>>>> changelog
> >>>> topic
> >>>>> getting full?
> >>>>>
> >>>>> Thanks Eno
> >>>>>
> >>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal
> >>>>>> <sjmit...@gmail.com> wrote:
> >>>>>>
> >>>>>> Hi, We are using aggregation by key on a kstream to
> >>>>>> create a ktable. As I read from
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>> Kafka+Streams%3A+Internal+Data+Management
> >>>>>> it creates an internal changelog topic.
> >>>>>>
> >>>>>> However over the time the streaming application is run
> >>>>>> message size increases and it starts throwing
> >>>>>> max.message.bytes exception.
> >>>>>>
> >>>>>> Is there a way to control the retention.ms time for
> >>>>>> internal
> >>> changelog
> >>>>>> topics so that messages are purged before they exceed
> >>>>>> this size.
> >>>>>>
> >>>>>> If not is there a way to control or avoid such an error.
> >>>>>>
> >>>>>> Thanks Sachin
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> -- Radha Krishna, Proddaturi 253-234-5657
> >>>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0
> S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U
> PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck
> pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix
> 8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7
> xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk
> IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+
> 5HjW76lHmYRUtdFl+RTlTxNUy/yfjnIXzLjQqHEnzcIPdnJY2lM6iUj94JPzFMUE
> nY6z68PoXdKZw2VkkkiB7bnyaH1wRFD+AZKQH8ZoH2axYExg+MxJk+Fhcd+E2yU/
> TL8b6lEcvwHOUU13H0ztSBUIJsjdh8aLVpSTvVtClDGKJJpueNznsbxf4TiVGoOm
> INFNIJFfnZ2c9rOH8AGJHkdIjkJaAB8DbxP4pYoNTPboCjeFFe/B3dBUlxLkWiDq
> Ny16O/mM8+6ydEG8ZzcA
> =+92S
> -----END PGP SIGNATURE-----
>

Reply via email to