Hi again, On another note, does it really make much sense to limit by time this way? wouldn't it be sufficient with just the 50mb? In use-cases like ours, restarting/rebuilding the state opened almost 30K extra files on each broker (It seems it is doing the repartitioning much faster than the remaining transformations, so it becomes a quite big "buffer").
It looked like it was not possible to configure this (thereby i had to patch the file directly), maybe it would be desirable to make it configurable for use-cases like this one if, not considered a main use-case? Once my app caught up (took like 8 hours), the amount of open files decreased again and it looks like the cleanup is doing its job. Kind regards Niklas On Wed, Oct 10, 2018 at 4:38 PM Niklas Lönn <niklas.l...@gmail.com> wrote: > Thanks Guozhang, > > Thanks for a very good answer! > I now understand, so the idea is that the client cleans up after itself > and that way there is a minimal amount of garbage in the repartition topic. > > We actually figured out we had another max open files limit we did hit > indeed, and adjusting that limit we now successfully managed to start our > application without crashing the brokers. > > However, I think I discovered a bug in the repartitioning setup, let me > first try to explain our setup: > We have a compacted topic, containing mostly short lived values, where > tombstones normally are created within some hours, but could be delayed as > much as a month. > I suspect the repartition segments honor the timestamps of the records, > and when resetting the application we process records that are quite old, > therefore creating many many segments and a lot of open files as a result. > > When running my application I noticed these messages: > Fetch offset 213792 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting > offset > Fetch offset 110227 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting > offset > Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset > 233302. > Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset > 119914. > > This effectively made my application skip messages and I verified by > patching RepartitionTopicConfig.java that it is due to the undefined > retention.ms, leaving a default retention on the records meaning that my > application was competing with the log cleaner. > > By adding this line i got rid of these messages: > tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // > Infinite > > My understanding is that this should be safe as the cleanup is handled by > the client invoking the admin api? > > Kind regards > Niklas > > > On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Niklas, >> >> Default value of segment.ms is set to 10min as part of this project >> (introduced in Kafka 1.1.0): >> >> https://jira.apache.org/jira/browse/KAFKA-6150 >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API >> >> In KIP-204 (KAFKA-6150), we added admin request to periodically delete >> records immediately upon committing offsets, to make repartition topics >> really "transient", and along with it we set the default segment.ms to >> 10min. The rationale is that to make record purging effective, we need to >> have smaller segment size so that we can delete those files after the >> purged offset is larger that the segment's last offset in time. >> >> >> Which Kafka version are you using currently? Did you observe that data >> purging did not happen (otherwise segment files should be garbage >> collected >> quickly), or is your traffic very small or commit infrequently which >> resulted in ineffective purging? >> >> >> Guozhang >> >> >> >> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.l...@gmail.com> >> wrote: >> >> > Hi, >> > >> > Recently we experienced a problem when resetting a streams application, >> > doing quite a lot of operations based on 2 compacted source topics, >> with 20 >> > partitions. >> > >> > We crashed entire broker cluster with TooManyOpenFiles exception (We >> have a >> > multi million limit already) >> > >> > When inspecting the internal topics configuration I noticed that the >> > repartition topics have a default config of: >> > *Configs:segment.bytes=52428800,segment.index.bytes= >> > 52428800,cleanup.policy=delete,segment.ms >> > <http://segment.ms>=600000* >> > >> > My source topic is a compacted topic used as a KTable, and lets assume I >> > have data for every segment of 10min, I would quickly get 1.440 segments >> > per partition per day. >> > >> > Since this repartition topic is not even compacted, I cant understand >> the >> > reasoning behind having a default of 10min segment.ms and 50mb >> > segment.bytes? >> > >> > Is there any best process regarding this? Potentially we could crash the >> > cluster every-time we need to reset an application. >> > >> > And does it make sense that it would keep so many open files at the same >> > time in the first place? Could it be a bug in file management of the >> Kafka >> > broker? >> > >> > Kind regards >> > Niklas >> > >> >> >> >> -- >> -- Guozhang >> >