Hi, we have a worrying problem in the project where we use kafka streams.


We had an incident where during heavy load on our app (dozens of millions
of records in 15 minutes span on an input topic of the stream) we decided
to add additional instances of the app (5 -> 10) and some of the
already-running instances were restarted (there are 16 partitions on an
input topic, 5 brokers in a cluster).



During the restart/rebalance we saw a massive drop in consumer group lag on
one of repartition topics (in 1 minute after rebalance it went down by 40
million records from around 50 million). This fact is visible in broker
logs, where we can see requests to delete records on that topic (so
effectively moving forward logStartOffset) by a stream thread. This is a
normal operation of course for a repartition topic, but in this case the
offset was forwarded by a concerning number of messages, for example 3.5
million records forward on one of the partitions (there are 16 partitions
on that repartition topic). We are worried that those messages were
effectively lost somehow and are trying to figure out the reason for that.



I checked if segment had not been deleted by retention policies, but it was
not as there was only a single segment for this partition at that moment
and retention could not be executed anyway due to given settings:

retention.bytes=-1

retention.ms=1 week



Other settings for the repartition topic: replicas = 3, min.insync.replicas
= 1, unclean.leader.election.enable = false, segment.bytes=1GB

We created this repartition topic manually due to security policies at the
company



We use kafka streams client library in version 3.7.1

Kafka cluster is pretty old, version 3.4.0



A simplified topology is as follows:



Input topic                                           Input topic
2                                  Input topic 3

        |
 |                                                     |

        |
 |                                                     |

        |
   |                                                     |

    JOIN  <————————————  KTable
    |


|
                     |


|
                     |

.selectKey()
                      |


|
                     |


|
           |

    JOIN <———————————————————————————-  KTable

        |

        |

    .process() ———————> changelog 1

        |

    .process()  ———————> changelog 2

        |

    .process()  ———————> changelog 1

        |

     .process()  ———————> changelog 3

        |

        |

Output topic



The problem occurs at the repartition topic at the .selectKey() operation



Here is a part of the client log showing the offset change for one of the
partitions during startup/rebalance of instances:



11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
partition xxx-repartition-11 to position FetchPosition{offset=386328485,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch(xxx=0ptional[xxx:9093 (id: 5 rack: xxx)],
epoch=133}}



11:18:55.359 [Consumer clientId=xxx, groupId=xxx] Resetting offset for
partition xxx-repartition-11 to position FetchPosition{offset=386328485,
offsetEpoch=0ptional.empty, currentLeader=LeaderAndEpoch{leader=0ptional
[xxx:9093 (id: 5 rack: xxx)], epoch=133}}



11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
currentLeader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
epoch=133}} is out of range for partition xxx-repartition-11, resetting
offset



11:18:47.090 [Consumer clientId=xxx, groupId=xxx] Fetch position
FetchPosition{offset=382808207, offsetEpoch=0ptional.empty,
currentLeader=LeaderAndEpoch{leader=Optional [xxx:9093 (id: 5 rack: xxx)],
epoch=133}} is out of range for partition xxx-repartition-11, resetting
offset



11:17:36.834 Setting offset for partition xxx-repartition-11 to the
committed offset FetchPosition{offset=382808207,
offsetEpoch=Optional.empty, current
leader=LeaderAndEpoch{leader=0ptional[xxx:9093 (id: 5 rack: xxx)],
epoch=133}}



And some relevant broker log:



11:19:44.323 [UnifiedLog partition=xxx-repartition-11,
dir=/var/lib/kafka/data-0/kafka-log5] Incremented log start offset to
386368143 due to client delete records request

11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
dir=/var/lib/kafka/data-0/kafka-log2] Incremented log start offset to
386334471 due to leader offset increment

11:19:14.124 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
386334471 due to leader offset increment

11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
386334471 due to leader offset increment

11:19:14.121 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
386334471 due to leader offset increment

11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
386334471 due to client delete records request

11:19:14.109 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
386334471 due to client delete records request

11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
383149649 due to leader offset increment

11:15:43.277 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log2] Incremented log start offset to
383149649 due to leader offset increment

11:15:43.263 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log5] Incremented log start offset to
383149649 due to client delete records request

11:15:13.253 [UnifiedLog partition=xxx-repartition-11,
dir=/var/Lib/kafka/data-0/kafka-log1] Incremented log start offset to
383116552 due to leader offset increment



What might be the reason for such an enormous offset change? Looking at the
client logs seems that this is rather some corrupted processing, as it
clearly cannot find the committed offset.

Reply via email to