Updating Kafka Streams would be enough.

-Bill

On Wed, Dec 18, 2024 at 6:50 AM TheKaztek <thekaz...@gmail.com> wrote:

> Would updating the kafka streams client library be enough ? Or should the
> cluster be updated to ?
>
> wt., 17 gru 2024 o 19:28 Bill Bejeck <b...@confluent.io.invalid>
> napisał(a):
>
> > Hi,
> >
> > I think you could be hitting KAFKA-17635
> > <https://issues.apache.org/jira/browse/KAFKA-17635> which has been fixed
> > in
> > Kafka Streams v 3.7.2 .
> > It's been released this week, is it possible to upgrade and try it out?
> >
> > -Bill
> >
> > On Tue, Dec 17, 2024 at 4:10 AM TheKaztek <thekaz...@gmail.com> wrote:
> >
> > > Hi, we have a worrying problem in the project where we use kafka
> streams.
> > >
> > >
> > >
> > > We had an incident where during heavy load on our app (dozens of
> millions
> > > of records in 15 minutes span on an input topic of the stream) we
> decided
> > > to add additional instances of the app (5 -> 10) and some of the
> > > already-running instances were restarted (there are 16 partitions on an
> > > input topic, 5 brokers in a cluster).
> > >
> > >
> > >
> > > During the restart/rebalance we saw a massive drop in consumer group
> lag
> > on
> > > one of repartition topics (in 1 minute after rebalance it went down by
> 40
> > > million records from around 50 million). This fact is visible in broker
> > > logs, where we can see requests to delete records on that topic (so
> > > effectively moving forward logStartOffset) by a stream thread. This is
> a
> > > normal operation of course for a repartition topic, but in this case
> the
> > > offset was forwarded by a concerning number of messages, for example
> 3.5
> > > million records forward on one of the partitions (there are 16
> partitions
> > > on that repartition topic). We are worried that those messages were
> > > effectively lost somehow and are trying to figure out the reason for
> > that.
> > >
> > >
> > >
> > > I checked if segment had not been deleted by retention policies, but it
> > was
> > > not as there was only a single segment for this partition at that
> moment
> > > and retention could not be executed anyway due to given settings:
> > >
> > > retention.bytes=-1
> > >
> > > retention.ms=1 week
> > >
> > >
> > >
> > > Other settings for the repartition topic: replicas = 3,
> > min.insync.replicas
> > > = 1, unclean.leader.election.enable = false, segment.bytes=1GB
> > >
> > > We created this repartition topic manually due to security policies at
> > the
> > > company
> > >
> > >
> > >
> > > We use kafka streams client library in version 3.7.1
> > >
> > > Kafka cluster is pretty old, version 3.4.0
> > >
> > >
> > >
> > > A simplified topology is as follows:
> > >
> > >
> > >
> > > Input topic                                           Input topic
> > > 2                                  Input topic 3
> > >
> > >         |
> > >  |                                                     |
> > >
> > >         |
> > >  |                                                     |
> > >
> > >         |
> > >    |                                                     |
> > >
> > >     JOIN  <————————————  KTable
> > >     |
> > >
> > >
> > > |
> > >                      |
> > >
> > >
> > > |
> > >                      |
> > >
> > > .selectKey()
> > >                       |
> > >
> > >
> > > |
> > >                      |
> > >
> > >
> > > |
> > >            |
> > >
> > >     JOIN <———————————————————————————-  KTable
> > >
> > >         |
> > >
> > >         |
> > >
> > >     .process() ———————> changelog 1
> > >
> > >         |
> > >
> > >     .process()  ———————> changelog 2
> > >
> > >         |
> > >
> > >     .process()  ———————> changelog 1
> > >
> > >         |
> > >
> > >      .process()  ———————> changelog 3
> > >
> > >         |
> > >
> > >         |
> > >
> > > Output topic
> > >
> > >
> > >
> > > The problem occurs at the repartition topic at the .selectKey()
> operation
> > >
> > >
> > >
> > > Here is a part of the client log showing the offset change for one of
> the
> > > partitions during startup/rebalance of instances:
> > >
> > >
> > >
> > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> > > partition xxx-repartition-11 to position
> FetchPosition{offset=386328485,
> > > offsetEpoch=Optional.empty,
> > > currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)],
> > > epoch=133}}
> > >
> > >
> > >
> > > 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> > > partition xxx-repartition-11 to position
> FetchPosition{offset=386328485,
> > > offsetEpoch=0ptional.empty,
> currentLeader=LeaderAndEpoch{leader=0ptional
> > > [xxx:9093 (id: 5 rack: xxx)], epoch=133}}
> > >
> > >
> > >
> > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> > > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> > > currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack:
> xxx)],
> > > epoch=133}} is out of range for partition xxx-repartition-11, resetting
> > > offset
> > >
> > >
> > >
> > > 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> > > FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> > > currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack:
> > xxx)],
> > > epoch=133}} is out of range for partition xxx-repartition-11, resetting
> > > offset
> > >
> > >
> > >
> > > 11:17:36.834 Setting offset for partition xxx-repartition-11 to the
> > > committed offset FetchPosition{offset=382808207,
> > > offsetEpoch=Optional.empty, current
> > > leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
> > > epoch=133}}
> > >
> > >
> > >
> > > And some relevant broker log:
> > >
> > >
> > >
> > > 11:19:44.323 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > > 386368143 due to client delete records request
> > >
> > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > > 386334471 due to leader offset increment
> > >
> > > 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > > 386334471 due to leader offset increment
> > >
> > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > > 386334471 due to leader offset increment
> > >
> > > 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > > 386334471 due to leader offset increment
> > >
> > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > > 386334471 due to client delete records request
> > >
> > > 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > > 386334471 due to client delete records request
> > >
> > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > > 383149649 due to leader offset increment
> > >
> > > 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> > > 383149649 due to leader offset increment
> > >
> > > 11:15:43.263 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> > > 383149649 due to client delete records request
> > >
> > > 11:15:13.253 [UnifiedLog partition=xxx-repartition-11,
> > > dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> > > 383116552 due to leader offset increment
> > >
> > >
> > >
> > > What might be the reason for such an enormous offset change? Looking at
> > the
> > > client logs seems that this is rather some corrupted processing, as it
> > > clearly cannot find the committed offset.
> > >
> >
>

Reply via email to