Follow-up: I think we figured out what was happening. Setting the broker config log.message.timestamp.type=LogAppendTime (instead of the default value CreateTime) stopped the messages disappearing.
The messages in the Streams app's input topic are older than the 24 hours default retention period. On this input topic, we have set an unlimited retention period. Am I right in thinking that in the setting message.timestamp.type=CreateTime, the input message timestamp is carried over to the messages in the repartitioning topic? And furthermore, the time-based retention of messages in the repartitioning topic is based on the message timestamps? If that is the case, then messages that are older than 24 hours are immediately scheduled for deletion as soon as they are copied over into the repartitioning topic. Thus, we have a race between the log cleaner and the consumer in the second stage of the streams app. :-( Is log.message.timestamp.type=LogAppendTime the best way of avoiding this problem? Thanks, Martin > On 29 Jan 2018, at 15:44, Martin Kleppmann <mar...@kleppmann.com> wrote: > > Hi all, > > We are debugging an issue with a Kafka Streams application that is producing > incorrect output. The application is a simple group-by on a key, and then > count. As expected, the application creates a repartitioning topic for the > group-by stage. The problem appears to be that messages are getting lost in > the repartitioning topic. > > Looking at the Kafka broker logs, it appears that the log segments for the > repartitioning topic are getting marked for deletion very aggressively > (within ~2 seconds of being created), so fast that some segments are deleted > before the count stage of the Kafka Streams application has had a chance to > consume the messages. > > I have checked the configuration and I cannot see a reason why the log > segments should be getting deleted so quickly. The following line reports the > creation of the repartitioning topic: > > [2018-01-29 15:31:39,992] INFO Created log for partition > [streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition,0] > in /kafka-data with properties {compression.type -> producer, > message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 100000, > max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, > message.timestamp.type -> CreateTime, min.insync.replicas -> 1, > segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> > 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, > retention.bytes -> 1073741824, delete.retention.ms -> 86400000, > cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> > 3600000, segment.bytes -> 1073741824, retention.ms -> 86400000, > message.timestamp.difference.max.ms -> 9223372036854775807, > segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. > (kafka.log.LogManager) > > As you can see, retention is set to 24 hours, retention size 1 GB, segment > rolling time to 1 hour, segment size 1 GB. For test purposes we are running > the Streams app on a fixed input of 7,000 messages, with a total size of only > about 5.5 MB, so we shouldn't be getting anywhere near the segment or > retention limits. The input topic has only one partition. > > Just two seconds after the topic is created, the broker reports that it is > rolling log segments and scheduling old log segments for deletion: > > [2018-01-29 15:31:41,923] INFO Rolled new log segment for > 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' > in 1 ms. (kafka.log.Log) > [2018-01-29 15:31:41,924] INFO Scheduling log segment 0 for log > streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 > for deletion. (kafka.log.Log) > [2018-01-29 15:31:41,945] INFO Cleared earliest 0 entries from epoch cache > based on passed offset 6582 leaving 1 in EpochFile for partition > streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 > (kafka.server.epoch.LeaderEpochFileCache) > [2018-01-29 15:31:42,923] INFO Rolled new log segment for > 'streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0' > in 2 ms. (kafka.log.Log) > [2018-01-29 15:31:42,924] INFO Scheduling log segment 6582 for log > streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0 > for deletion. (kafka.log.Log) > > 100 seconds later (consistent with the setting for file.delete.delay.ms), the > files are actually deleted: > > [2018-01-29 15:33:21,923] INFO Deleting segment 0 from log > streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. > (kafka.log.Log) > [2018-01-29 15:33:21,929] INFO Deleting index > /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.index.deleted > (kafka.log.OffsetIndex) > [2018-01-29 15:33:21,929] INFO Deleting index > /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000000000.timeindex.deleted > (kafka.log.TimeIndex) > [2018-01-29 15:33:22,925] INFO Deleting segment 6582 from log > streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0. > (kafka.log.Log) > [2018-01-29 15:33:22,926] INFO Deleting index > /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.index.deleted > (kafka.log.OffsetIndex) > [2018-01-29 15:33:22,927] INFO Deleting index > /kafka-data/streamsapp_site_stats-v290118-debug_1517239899448-localstore_log_event_counts-repartition-0/00000000000000006582.timeindex.deleted > (kafka.log.TimeIndex) > > Does anyone know what might be causing the messages in the repartitioning > topic to be deleted so aggressively? > > Thanks, > Martin > >