Thanks Guozhang, Thanks for a very good answer! I now understand, so the idea is that the client cleans up after itself and that way there is a minimal amount of garbage in the repartition topic.
We actually figured out we had another max open files limit we did hit indeed, and adjusting that limit we now successfully managed to start our application without crashing the brokers. However, I think I discovered a bug in the repartitioning setup, let me first try to explain our setup: We have a compacted topic, containing mostly short lived values, where tombstones normally are created within some hours, but could be delayed as much as a month. I suspect the repartition segments honor the timestamps of the records, and when resetting the application we process records that are quite old, therefore creating many many segments and a lot of open files as a result. When running my application I noticed these messages: Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting offset Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting offset Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset 233302. Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset 119914. This effectively made my application skip messages and I verified by patching RepartitionTopicConfig.java that it is due to the undefined retention.ms, leaving a default retention on the records meaning that my application was competing with the log cleaner. By adding this line i got rid of these messages: tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite My understanding is that this should be safe as the cleanup is handled by the client invoking the admin api? Kind regards Niklas On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Niklas, > > Default value of segment.ms is set to 10min as part of this project > (introduced in Kafka 1.1.0): > > https://jira.apache.org/jira/browse/KAFKA-6150 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API > > In KIP-204 (KAFKA-6150), we added admin request to periodically delete > records immediately upon committing offsets, to make repartition topics > really "transient", and along with it we set the default segment.ms to > 10min. The rationale is that to make record purging effective, we need to > have smaller segment size so that we can delete those files after the > purged offset is larger that the segment's last offset in time. > > > Which Kafka version are you using currently? Did you observe that data > purging did not happen (otherwise segment files should be garbage collected > quickly), or is your traffic very small or commit infrequently which > resulted in ineffective purging? > > > Guozhang > > > > On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.l...@gmail.com> wrote: > > > Hi, > > > > Recently we experienced a problem when resetting a streams application, > > doing quite a lot of operations based on 2 compacted source topics, with > 20 > > partitions. > > > > We crashed entire broker cluster with TooManyOpenFiles exception (We > have a > > multi million limit already) > > > > When inspecting the internal topics configuration I noticed that the > > repartition topics have a default config of: > > *Configs:segment.bytes=52428800,segment.index.bytes= > > 52428800,cleanup.policy=delete,segment.ms > > <http://segment.ms>=600000* > > > > My source topic is a compacted topic used as a KTable, and lets assume I > > have data for every segment of 10min, I would quickly get 1.440 segments > > per partition per day. > > > > Since this repartition topic is not even compacted, I cant understand the > > reasoning behind having a default of 10min segment.ms and 50mb > > segment.bytes? > > > > Is there any best process regarding this? Potentially we could crash the > > cluster every-time we need to reset an application. > > > > And does it make sense that it would keep so many open files at the same > > time in the first place? Could it be a bug in file management of the > Kafka > > broker? > > > > Kind regards > > Niklas > > > > > > -- > -- Guozhang >