Would updating the kafka streams client library be enough ? Or should the
cluster be updated to ?

wt., 17 gru 2024 o 19:28 Bill Bejeck <b...@confluent.io.invalid> napisał(a):

> Hi,
>
> I think you could be hitting KAFKA-17635
> <https://issues.apache.org/jira/browse/KAFKA-17635> which has been fixed
> in
> Kafka Streams v 3.7.2 .
> It's been released this week, is it possible to upgrade and try it out?
>
> -Bill
>
> On Tue, Dec 17, 2024 at 4:10 AM TheKaztek <thekaz...@gmail.com> wrote:
>
> > Hi, we have a worrying problem in the project where we use kafka streams.
> >
> >
> >
> > We had an incident where during heavy load on our app (dozens of millions
> > of records in 15 minutes span on an input topic of the stream) we decided
> > to add additional instances of the app (5 -> 10) and some of the
> > already-running instances were restarted (there are 16 partitions on an
> > input topic, 5 brokers in a cluster).
> >
> >
> >
> > During the restart/rebalance we saw a massive drop in consumer group lag
> on
> > one of repartition topics (in 1 minute after rebalance it went down by 40
> > million records from around 50 million). This fact is visible in broker
> > logs, where we can see requests to delete records on that topic (so
> > effectively moving forward logStartOffset) by a stream thread. This is a
> > normal operation of course for a repartition topic, but in this case the
> > offset was forwarded by a concerning number of messages, for example 3.5
> > million records forward on one of the partitions (there are 16 partitions
> > on that repartition topic). We are worried that those messages were
> > effectively lost somehow and are trying to figure out the reason for
> that.
> >
> >
> >
> > I checked if segment had not been deleted by retention policies, but it
> was
> > not as there was only a single segment for this partition at that moment
> > and retention could not be executed anyway due to given settings:
> >
> > retention.bytes=-1
> >
> > retention.ms=1 week
> >
> >
> >
> > Other settings for the repartition topic: replicas = 3,
> min.insync.replicas
> > = 1, unclean.leader.election.enable = false, segment.bytes=1GB
> >
> > We created this repartition topic manually due to security policies at
> the
> > company
> >
> >
> >
> > We use kafka streams client library in version 3.7.1
> >
> > Kafka cluster is pretty old, version 3.4.0
> >
> >
> >
> > A simplified topology is as follows:
> >
> >
> >
> > Input topic                                           Input topic
> > 2                                  Input topic 3
> >
> >         |
> >  |                                                     |
> >
> >         |
> >  |                                                     |
> >
> >         |
> >    |                                                     |
> >
> >     JOIN  <————————————  KTable
> >     |
> >
> >
> > |
> >                      |
> >
> >
> > |
> >                      |
> >
> > .selectKey()
> >                       |
> >
> >
> > |
> >                      |
> >
> >
> > |
> >            |
> >
> >     JOIN <———————————————————————————-  KTable
> >
> >         |
> >
> >         |
> >
> >     .process() ———————> changelog 1
> >
> >         |
> >
> >     .process()  ———————> changelog 2
> >
> >         |
> >
> >     .process()  ———————> changelog 1
> >
> >         |
> >
> >      .process()  ———————> changelog 3
> >
> >         |
> >
> >         |
> >
> > Output topic
> >
> >
> >
> > The problem occurs at the repartition topic at the .selectKey() operation
> >
> >
> >
> > Here is a part of the client log showing the offset change for one of the
> > partitions during startup/rebalance of instances:
> >
> >
> >
> > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> > partition xxx-repartition-11 to position FetchPosition{offset=386328485,
> > offsetEpoch=Optional.empty,
> > currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)],
> > epoch=133}}
> >
> >
> >
> > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> > partition xxx-repartition-11 to position FetchPosition{offset=386328485,
> > offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional
> > [xxx:9093 (id: 5 rack: xxx)], epoch=133}}
> >
> >
> >
> > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> > currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
> > epoch=133}} is out of range for partition xxx-repartition-11, resetting
> > offset
> >
> >
> >
> > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> > currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack:
> xxx)],
> > epoch=133}} is out of range for partition xxx-repartition-11, resetting
> > offset
> >
> >
> >
> > 11:17:36.834 Setting offset for partition xxx-repartition-11 to the
> > committed offset FetchPosition{offset=382808207,
> > offsetEpoch=Optional.empty, current
> > leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
> > epoch=133}}
> >
> >
> >
> > And some relevant broker log:
> >
> >
> >
> > 11:19:44.323 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > 386368143 due to client delete records request
> >
> > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > 386334471 due to leader offset increment
> >
> > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > 386334471 due to leader offset increment
> >
> > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > 386334471 due to leader offset increment
> >
> > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > 386334471 due to leader offset increment
> >
> > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > 386334471 due to client delete records request
> >
> > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > 386334471 due to client delete records request
> >
> > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > 383149649 due to leader offset increment
> >
> > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > 383149649 due to leader offset increment
> >
> > 11:15:43.263 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > 383149649 due to client delete records request
> >
> > 11:15:13.253 [UnifiedLog partition=xxx-repartition-11,
> > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > 383116552 due to leader offset increment
> >
> >
> >
> > What might be the reason for such an enormous offset change? Looking at
> the
> > client logs seems that this is rather some corrupted processing, as it
> > clearly cannot find the committed offset.
> >
>

Reply via email to