Hi,

I think you could be hitting KAFKA-17635
<https://issues.apache.org/jira/browse/KAFKA-17635> which has been fixed in
Kafka Streams v 3.7.2 .
It's been released this week, is it possible to upgrade and try it out?

-Bill

On Tue, Dec 17, 2024 at 4:10 AM TheKaztek <thekaz...@gmail.com> wrote:

> Hi, we have a worrying problem in the project where we use kafka streams.
>
>
>
> We had an incident where during heavy load on our app (dozens of millions
> of records in 15 minutes span on an input topic of the stream) we decided
> to add additional instances of the app (5 -> 10) and some of the
> already-running instances were restarted (there are 16 partitions on an
> input topic, 5 brokers in a cluster).
>
>
>
> During the restart/rebalance we saw a massive drop in consumer group lag on
> one of repartition topics (in 1 minute after rebalance it went down by 40
> million records from around 50 million). This fact is visible in broker
> logs, where we can see requests to delete records on that topic (so
> effectively moving forward logStartOffset) by a stream thread. This is a
> normal operation of course for a repartition topic, but in this case the
> offset was forwarded by a concerning number of messages, for example 3.5
> million records forward on one of the partitions (there are 16 partitions
> on that repartition topic). We are worried that those messages were
> effectively lost somehow and are trying to figure out the reason for that.
>
>
>
> I checked if segment had not been deleted by retention policies, but it was
> not as there was only a single segment for this partition at that moment
> and retention could not be executed anyway due to given settings:
>
> retention.bytes=-1
>
> retention.ms=1 week
>
>
>
> Other settings for the repartition topic: replicas = 3, min.insync.replicas
> = 1, unclean.leader.election.enable = false, segment.bytes=1GB
>
> We created this repartition topic manually due to security policies at the
> company
>
>
>
> We use kafka streams client library in version 3.7.1
>
> Kafka cluster is pretty old, version 3.4.0
>
>
>
> A simplified topology is as follows:
>
>
>
> Input topic                                           Input topic
> 2                                  Input topic 3
>
>         |
>  |                                                     |
>
>         |
>  |                                                     |
>
>         |
>    |                                                     |
>
>     JOIN  <————————————  KTable
>     |
>
>
> |
>                      |
>
>
> |
>                      |
>
> .selectKey()
>                       |
>
>
> |
>                      |
>
>
> |
>            |
>
>     JOIN <———————————————————————————-  KTable
>
>         |
>
>         |
>
>     .process() ———————> changelog 1
>
>         |
>
>     .process()  ———————> changelog 2
>
>         |
>
>     .process()  ———————> changelog 1
>
>         |
>
>      .process()  ———————> changelog 3
>
>         |
>
>         |
>
> Output topic
>
>
>
> The problem occurs at the repartition topic at the .selectKey() operation
>
>
>
> Here is a part of the client log showing the offset change for one of the
> partitions during startup/rebalance of instances:
>
>
>
> 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> partition xxx-repartition-11 to position FetchPosition{offset=386328485,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)],
> epoch=133}}
>
>
>
> 11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
> partition xxx-repartition-11 to position FetchPosition{offset=386328485,
> offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional
> [xxx:9093 (id: 5 rack: xxx)], epoch=133}}
>
>
>
> 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
> epoch=133}} is out of range for partition xxx-repartition-11, resetting
> offset
>
>
>
> 11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
> FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack: xxx)],
> epoch=133}} is out of range for partition xxx-repartition-11, resetting
> offset
>
>
>
> 11:17:36.834 Setting offset for partition xxx-repartition-11 to the
> committed offset FetchPosition{offset=382808207,
> offsetEpoch=Optional.empty, current
> leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
> epoch=133}}
>
>
>
> And some relevant broker log:
>
>
>
> 11:19:44.323 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to
> 386368143 due to client delete records request
>
> 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to
> 386334471 due to leader offset increment
>
> 11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> 386334471 due to leader offset increment
>
> 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> 386334471 due to leader offset increment
>
> 11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> 386334471 due to leader offset increment
>
> 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> 386334471 due to client delete records request
>
> 11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> 386334471 due to client delete records request
>
> 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> 383149649 due to leader offset increment
>
> 11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
> 383149649 due to leader offset increment
>
> 11:15:43.263 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
> 383149649 due to client delete records request
>
> 11:15:13.253 [UnifiedLog partition=xxx-repartition-11,
> dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
> 383116552 due to leader offset increment
>
>
>
> What might be the reason for such an enormous offset change? Looking at the
> client logs seems that this is rather some corrupted processing, as it
> clearly cannot find the committed offset.
>

Reply via email to