Hi, we have a worrying problem in the project where we use kafka streams.
We had an incident where during heavy load on our app (dozens of millions of records in 15 minutes span on an input topic of the stream) we decided to add additional instances of the app (5 -> 10) and some of the already-running instances were restarted (there are 16 partitions on an input topic, 5 brokers in a cluster). During the restart/rebalance we saw a massive drop in consumer group lag on one of repartition topics (in 1 minute after rebalance it went down by 40 million records from around 50 million). This fact is visible in broker logs, where we can see requests to delete records on that topic (so effectively moving forward logStartOffset) by a stream thread. This is a normal operation of course for a repartition topic, but in this case the offset was forwarded by a concerning number of messages, for example 3.5 million records forward on one of the partitions (there are 16 partitions on that repartition topic). We are worried that those messages were effectively lost somehow and are trying to figure out the reason for that. I checked if segment had not been deleted by retention policies, but it was not as there was only a single segment for this partition at that moment and retention could not be executed anyway due to given settings: retention.bytes=-1 retention.ms=1 week Other settings for the repartition topic: replicas = 3, min.insync.replicas = 1, unclean.leader.election.enable = false, segment.bytes=1GB We created this repartition topic manually due to security policies at the company We use kafka streams client library in version 3.7.1 Kafka cluster is pretty old, version 3.4.0 A simplified topology is as follows: Input topic Input topic 2 Input topic 3 | | | | | | | | | JOIN <———————————— KTable | | | | | .selectKey() | | | | | JOIN <———————————————————————————- KTable | | .process() ———————> changelog 1 | .process() ———————> changelog 2 | .process() ———————> changelog 1 | .process() ———————> changelog 3 | | Output topic The problem occurs at the repartition topic at the .selectKey() operation Here is a part of the client log showing the offset change for one of the partitions during startup/rebalance of instances: 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for partition xxx-repartition-11 to position FetchPosition{offset=386328485, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)], epoch=133}} 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for partition xxx-repartition-11 to position FetchPosition{offset=386328485, offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional [xxx:9093 (id: 5 rack: xxx)], epoch=133}} 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)], epoch=133}} is out of range for partition xxx-repartition-11, resetting offset 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack: xxx)], epoch=133}} is out of range for partition xxx-repartition-11, resetting offset 11:17:36.834 Setting offset for partition xxx-repartition-11 to the committed offset FetchPosition{offset=382808207, offsetEpoch=Optional.empty, current leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)], epoch=133}} And some relevant broker log: 11:19:44.323 [UnifiedLog partition=xxx-repartition-11, dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to 386368143 due to client delete records request 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to 386334471 due to leader offset increment 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to 386334471 due to leader offset increment 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to 386334471 due to leader offset increment 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to 386334471 due to leader offset increment 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to 386334471 due to client delete records request 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to 386334471 due to client delete records request 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to 383149649 due to leader offset increment 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to 383149649 due to leader offset increment 11:15:43.263 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to 383149649 due to client delete records request 11:15:13.253 [UnifiedLog partition=xxx-repartition-11, dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to 383116552 due to leader offset increment What might be the reason for such an enormous offset change? Looking at the client logs seems that this is rather some corrupted processing, as it clearly cannot find the committed offset.