Hi, I think you could be hitting KAFKA-17635 <https://issues.apache.org/jira/browse/KAFKA-17635> which has been fixed in Kafka Streams v 3.7.2 . It's been released this week, is it possible to upgrade and try it out?
-Bill On Tue, Dec 17, 2024 at 4:10 AM TheKaztek <thekaz...@gmail.com> wrote: > Hi, we have a worrying problem in the project where we use kafka streams. > > > > We had an incident where during heavy load on our app (dozens of millions > of records in 15 minutes span on an input topic of the stream) we decided > to add additional instances of the app (5 -> 10) and some of the > already-running instances were restarted (there are 16 partitions on an > input topic, 5 brokers in a cluster). > > > > During the restart/rebalance we saw a massive drop in consumer group lag on > one of repartition topics (in 1 minute after rebalance it went down by 40 > million records from around 50 million). This fact is visible in broker > logs, where we can see requests to delete records on that topic (so > effectively moving forward logStartOffset) by a stream thread. This is a > normal operation of course for a repartition topic, but in this case the > offset was forwarded by a concerning number of messages, for example 3.5 > million records forward on one of the partitions (there are 16 partitions > on that repartition topic). We are worried that those messages were > effectively lost somehow and are trying to figure out the reason for that. > > > > I checked if segment had not been deleted by retention policies, but it was > not as there was only a single segment for this partition at that moment > and retention could not be executed anyway due to given settings: > > retention.bytes=-1 > > retention.ms=1 week > > > > Other settings for the repartition topic: replicas = 3, min.insync.replicas > = 1, unclean.leader.election.enable = false, segment.bytes=1GB > > We created this repartition topic manually due to security policies at the > company > > > > We use kafka streams client library in version 3.7.1 > > Kafka cluster is pretty old, version 3.4.0 > > > > A simplified topology is as follows: > > > > Input topic Input topic > 2 Input topic 3 > > | > | | > > | > | | > > | > | | > > JOIN <———————————— KTable > | > > > | > | > > > | > | > > .selectKey() > | > > > | > | > > > | > | > > JOIN <———————————————————————————- KTable > > | > > | > > .process() ———————> changelog 1 > > | > > .process() ———————> changelog 2 > > | > > .process() ———————> changelog 1 > > | > > .process() ———————> changelog 3 > > | > > | > > Output topic > > > > The problem occurs at the repartition topic at the .selectKey() operation > > > > Here is a part of the client log showing the offset change for one of the > partitions during startup/rebalance of instances: > > > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for > partition xxx-repartition-11 to position FetchPosition{offset=386328485, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)], > epoch=133}} > > > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for > partition xxx-repartition-11 to position FetchPosition{offset=386328485, > offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional > [xxx:9093 (id: 5 rack: xxx)], epoch=133}} > > > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, > currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)], > epoch=133}} is out of range for partition xxx-repartition-11, resetting > offset > > > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, > currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack: xxx)], > epoch=133}} is out of range for partition xxx-repartition-11, resetting > offset > > > > 11:17:36.834 Setting offset for partition xxx-repartition-11 to the > committed offset FetchPosition{offset=382808207, > offsetEpoch=Optional.empty, current > leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)], > epoch=133}} > > > > And some relevant broker log: > > > > 11:19:44.323 [UnifiedLog partition=xxx-repartition-11, > dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to > 386368143 due to client delete records request > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, > dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to > 386334471 due to leader offset increment > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to > 386334471 due to leader offset increment > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > 386334471 due to leader offset increment > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > 386334471 due to leader offset increment > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > 386334471 due to client delete records request > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > 386334471 due to client delete records request > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > 383149649 due to leader offset increment > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to > 383149649 due to leader offset increment > > 11:15:43.263 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > 383149649 due to client delete records request > > 11:15:13.253 [UnifiedLog partition=xxx-repartition-11, > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > 383116552 due to leader offset increment > > > > What might be the reason for such an enormous offset change? Looking at the > client logs seems that this is rather some corrupted processing, as it > clearly cannot find the committed offset. >