Björn, Couple of answers:
>> So, a streams internal topic for aggregation will be of cleanup.policy = >> compact. Yes. (for non-windowed aggregation) However, in your case, you are using a windowed aggregation, and there the policy is "compact,delete". Because for window aggregation, the key space is not bounded, the changelog topic would grow unbounded if there is no retention time (the written key is a combined key for the message-key plus window start time). Thus, windows will be maintained for 1 day (by default) until those records are deleted from the changelog topic (even if there is no tombstone message written). >> Which means that while doing aggregation ignoring tombstones records will >> cause havoc, right? If your key space is unbounded, yes. For a bounded key space, the size of the topic is limited by the number of unique keys -- thus, even if no tombstones are written, the topic will not grow. >> Which means we have to remove the last line so that tombstone records are >> pass through and topic compaction can kick in? Yes. >> And tombstones records will be deleted after "delete.retention.ms”, right? Yes. >> Which defaults to 24 hours - meaning that the internal topic should only >> contain data for 24 hours + window Size? Is this somehow right? No exactly. Note, that KS does not delete windows when window end time passes (windows don't close in KS). This is required to handle late arriving data. Thus, you can have multiple windows per key and KS also applies a retention time for how long to maintain windows before they are dropped (by default, also 24h). You can configure the window retention time via `.until()`. >> Again, thank you very much for taking the time to answer these questions, i >> am feeling a bit stupid here right now :( No need to. It's complex... -Matthias On 4/7/18 1:38 AM, Björn Häuser wrote: > Hello Matthias, > > thank you very much for your patience. > > I am still trying to understand the complete picture here. > > So, a streams internal topic for aggregation will be of cleanup.policy = > compact. > > Which means that while doing aggregation ignoring tombstones records will > cause havoc, right? > > This is more or less our streams setup: > > https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b > <https://gist.github.com/bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b> > > Which means we have to remove the last line so that tombstone records are > pass through and topic compaction can kick in? > > And tombstones records will be deleted after "delete.retention.ms”, right? > Which defaults to 24 hours - meaning that the internal topic should only > contain data for 24 hours + window Size? Is this somehow right? > > Again, thank you very much for taking the time to answer these questions, i > am feeling a bit stupid here right now :( > > Björn > >> On 7. Apr 2018, at 00:45, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Björn, >> >> broker configs are default config but can be overwritten when a topic is >> created. And this happens when Kafka Streams creates internal topics. >> Thus, you need to change the setting Kafka Streams applies when creating >> topics. >> >> Also note: if cleanup.policy = compact, the setting of `log.retention.X` >> do not apply. Those setting only apply if cleanup.policy is set to "delete". >> >> The size of a compacted topic depends on the number of unique keys -- >> there will be one message per key -- if newer message with the same key >> are written, older message with this key can be garbage collected. If a >> message is never updated (and not explicitly deleted with a tomstone >> records, ie, a record with null-value), the record will never be deleted >> for a compacted topic. >> >> >> -Matthias >> >> On 4/6/18 2:10 PM, Björn Häuser wrote: >>> Hello Guozhang >>> >>> thanks. >>> >>> So after reading much more docs I still do not have the complete picture. >>> >>> These are our relevant settings from kafka broker configuration: >>> >>> log.cleanup.policy=delete >>> # set log.retention.bytes to 15 gb >>> log.retention.bytes=16106127360 >>> # set log.retention.hours to 30 days >>> log.retention.hours=720 >>> >>> Though one of internal kafka stream topics (with cleanup.policy = compact) >>> grew to ~40gb today. >>> >>> What am I missing? I thought these settings should take care that a given >>> topic is never >15 gb, right? >>> >>> Thanks >>> Björn >>> >>>> On 29. Mar 2018, at 00:05, Guozhang Wang <wangg...@gmail.com> wrote: >>>> >>>> Hello, >>>> >>>> You can set the topic-level configs via the >>>> StreamsConfig#topicPrefix(String), please find the following web docs >>>> (search for KIP-173): >>>> >>>> https://kafka.apache.org/documentation/streams/upgrade-guide#streams_api_changes_100 >>>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> On Wed, Mar 28, 2018 at 3:23 AM, Björn Häuser <bjoernhaeu...@gmail.com> >>>> wrote: >>>> >>>>> Hello Everyone, >>>>> >>>>> we are running a Kafka Streams Application with does time window >>>>> aggregates (using kafka 1.0.0). >>>>> >>>>> Unfortunately one of the changelog topics is now growing quite a bit in >>>>> size maxing out the brokers. I did not find any settings in the kafka >>>>> stream properties to configure retention and went ahead and set it >>>>> retention.bytes to 15gb. Unfortunately this does not seem to apply and the >>>>> topic size is still around 140gb. >>>>> >>>>> Is this intended? I could not find any documentation about setting the >>>>> retention size for the internal topics. >>>>> >>>>> Thanks >>>>> Björn >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>> >> > >
signature.asc
Description: OpenPGP digital signature