Updating Kafka Streams would be enough. -Bill
On Wed, Dec 18, 2024 at 6:50 AM TheKaztek <thekaz...@gmail.com> wrote: > Would updating the kafka streams client library be enough ? Or should the > cluster be updated to ? > > wt., 17 gru 2024 o 19:28 Bill Bejeck <b...@confluent.io.invalid> > napisał(a): > > > Hi, > > > > I think you could be hitting KAFKA-17635 > > <https://issues.apache.org/jira/browse/KAFKA-17635> which has been fixed > > in > > Kafka Streams v 3.7.2 . > > It's been released this week, is it possible to upgrade and try it out? > > > > -Bill > > > > On Tue, Dec 17, 2024 at 4:10 AM TheKaztek <thekaz...@gmail.com> wrote: > > > > > Hi, we have a worrying problem in the project where we use kafka > streams. > > > > > > > > > > > > We had an incident where during heavy load on our app (dozens of > millions > > > of records in 15 minutes span on an input topic of the stream) we > decided > > > to add additional instances of the app (5 -> 10) and some of the > > > already-running instances were restarted (there are 16 partitions on an > > > input topic, 5 brokers in a cluster). > > > > > > > > > > > > During the restart/rebalance we saw a massive drop in consumer group > lag > > on > > > one of repartition topics (in 1 minute after rebalance it went down by > 40 > > > million records from around 50 million). This fact is visible in broker > > > logs, where we can see requests to delete records on that topic (so > > > effectively moving forward logStartOffset) by a stream thread. This is > a > > > normal operation of course for a repartition topic, but in this case > the > > > offset was forwarded by a concerning number of messages, for example > 3.5 > > > million records forward on one of the partitions (there are 16 > partitions > > > on that repartition topic). We are worried that those messages were > > > effectively lost somehow and are trying to figure out the reason for > > that. > > > > > > > > > > > > I checked if segment had not been deleted by retention policies, but it > > was > > > not as there was only a single segment for this partition at that > moment > > > and retention could not be executed anyway due to given settings: > > > > > > retention.bytes=-1 > > > > > > retention.ms=1 week > > > > > > > > > > > > Other settings for the repartition topic: replicas = 3, > > min.insync.replicas > > > = 1, unclean.leader.election.enable = false, segment.bytes=1GB > > > > > > We created this repartition topic manually due to security policies at > > the > > > company > > > > > > > > > > > > We use kafka streams client library in version 3.7.1 > > > > > > Kafka cluster is pretty old, version 3.4.0 > > > > > > > > > > > > A simplified topology is as follows: > > > > > > > > > > > > Input topic Input topic > > > 2 Input topic 3 > > > > > > | > > > | | > > > > > > | > > > | | > > > > > > | > > > | | > > > > > > JOIN <———————————— KTable > > > | > > > > > > > > > | > > > | > > > > > > > > > | > > > | > > > > > > .selectKey() > > > | > > > > > > > > > | > > > | > > > > > > > > > | > > > | > > > > > > JOIN <———————————————————————————- KTable > > > > > > | > > > > > > | > > > > > > .process() ———————> changelog 1 > > > > > > | > > > > > > .process() ———————> changelog 2 > > > > > > | > > > > > > .process() ———————> changelog 1 > > > > > > | > > > > > > .process() ———————> changelog 3 > > > > > > | > > > > > > | > > > > > > Output topic > > > > > > > > > > > > The problem occurs at the repartition topic at the .selectKey() > operation > > > > > > > > > > > > Here is a part of the client log showing the offset change for one of > the > > > partitions during startup/rebalance of instances: > > > > > > > > > > > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for > > > partition xxx-repartition-11 to position > FetchPosition{offset=386328485, > > > offsetEpoch=Optional.empty, > > > currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)], > > > epoch=133}} > > > > > > > > > > > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for > > > partition xxx-repartition-11 to position > FetchPosition{offset=386328485, > > > offsetEpoch=0ptional.empty, > currentLeader=LeaderAndEpoch{leader=0ptional > > > [xxx:9093 (id: 5 rack: xxx)], epoch=133}} > > > > > > > > > > > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position > > > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, > > > currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: > xxx)], > > > epoch=133}} is out of range for partition xxx-repartition-11, resetting > > > offset > > > > > > > > > > > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position > > > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty, > > > currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack: > > xxx)], > > > epoch=133}} is out of range for partition xxx-repartition-11, resetting > > > offset > > > > > > > > > > > > 11:17:36.834 Setting offset for partition xxx-repartition-11 to the > > > committed offset FetchPosition{offset=382808207, > > > offsetEpoch=Optional.empty, current > > > leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)], > > > epoch=133}} > > > > > > > > > > > > And some relevant broker log: > > > > > > > > > > > > 11:19:44.323 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to > > > 386368143 due to client delete records request > > > > > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to > > > 386334471 due to leader offset increment > > > > > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to > > > 386334471 due to leader offset increment > > > > > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > > > 386334471 due to leader offset increment > > > > > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > > > 386334471 due to leader offset increment > > > > > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > > > 386334471 due to client delete records request > > > > > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > > > 386334471 due to client delete records request > > > > > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > > > 383149649 due to leader offset increment > > > > > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to > > > 383149649 due to leader offset increment > > > > > > 11:15:43.263 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to > > > 383149649 due to client delete records request > > > > > > 11:15:13.253 [UnifiedLog partition=xxx-repartition-11, > > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to > > > 383116552 due to leader offset increment > > > > > > > > > > > > What might be the reason for such an enormous offset change? Looking at > > the > > > client logs seems that this is rather some corrupted processing, as it > > > clearly cannot find the committed offset. > > > > > >