[ https://issues.apache.org/jira/browse/KAFKA-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chia-Ping Tsai resolved KAFKA-19407. ------------------------------------ Resolution: Fixed trunk: https://github.com/apache/kafka/commit/959021de59fb03bafc8a5e0a8cfde748c38a3c9c 4.1: https://github.com/apache/kafka/commit/1dad77615d345a68cddaa50416279c34b6f4ac8b 4.0: https://github.com/apache/kafka/commit/9fcfe546d1313b77ad5dc6b10be3095fe6d32569 > OffsetsOutOfOrderException on followers due to the race condition in the > leader > ------------------------------------------------------------------------------- > > Key: KAFKA-19407 > URL: https://issues.apache.org/jira/browse/KAFKA-19407 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.3.2 > Reporter: Haruki Okada > Assignee: Haruki Okada > Priority: Major > Fix For: 4.0.1, 4.1.0 > > Attachments: image-2025-06-13-23-01-40-371.png > > > h2. Environment > * Kafka version: 3.3.2 (But we suppose this issue still exists in latest > Kafka) > * Replication factor: 3 > * Topic's cleanup.policy: delete > h2. Phenomenon > We experienced a partition in our cluster got UnderMinISR suddenly without > any hardware/network issue or any operation. > {code:java} > [2025-06-10 20:27:14,310] INFO [Partition topic-X-49 broker=17] Shrinking ISR > from 15,16,17 to 17. Leader: (highWatermark: 572579089, endOffset: > 572579215). Out of sync replicas: (brokerId: 15, endOffset: 572579089) > (brokerId: 16, endOffset: 572579089). (kafka.cluster.Partition) > {code} > On both followers, we saw below log: > {code:java} > [2025-06-10 20:26:59,804] ERROR [ReplicaFetcher replicaId=16, leaderId=17, > fetcherId=1] Unexpected error occurred while processing data for partition > topic-X-49 at offset 572579089 (kafka.server.ReplicaFetcherThread) > kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append > to topic-X-49: ArrayBuffer(572579089, 572579090, 572579091, 572579092, > 572579093, 572579094, 572579095, 572579096, 572579097, 572579098, 572579099, > 572579100, 572579101, 572579102, 572579103, 572579104, 572579105, 572579106, > 572579107, 572579108, 572579109, 572579089,... > {code} > The log tells there's an offset regression after 572579109 (to 572579089). > h2. Analysis > h3. The cause of the offset regression > We dumped active log segment on the leader and confirmed that the leader's > log actually contains records where non-monotonic offsets assigned. > So the problem must exists on the leader rather than the follower. > On the leader, we found below log is output right before the follower > experiences OffsetsOutOfOrderException. > {code:java} > [2025-06-10 20:26:37,463] ERROR [ReplicaManager broker=17] Error processing > append operation on partition topic-X-49 (kafka.server.ReplicaManager) > java.lang.IllegalStateException: Attempt to append a timestamp > (1749554796970) to slot 1 no larger than the last timestamp appended > (1749554797191) to > /path/to/kafka-log-dir/topic-X-49/00000000000572579060.timeindex. > at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:128) > at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) > at kafka.log.LogSegment.append(LogSegment.scala:167) > at kafka.log.LocalLog.append(LocalLog.scala:442) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:950) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666) > at kafka.server.KafkaApis.handle(KafkaApis.scala:175) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.base/java.lang.Thread.run(Thread.java:829) > {code} > This explains the offset regression as below: > * Due to the IllegalStateException in the middle of handling produce > request, LogEndOffset wasn't updated while records were already written to > the log segment. > * When the leader handles subsequent produce requests, it assigns same > offset to the record batch > ** => Non-monotonic offset sequence happens and it causes > OffsetsOutOfOrderException on followers > h3. The cause of IllegalStateException > The exception tells that trying to append timestamp 1749554796970 which is > smaller than the last entry in the time index (1749554797191), that is > invalid. > The timestamp about to be appended here is > [LogSegment#maxTimestampSoFar|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L355]. > It means that maxTimestampSoFar regressed somehow. > After reading the code, we found that below race-condition is possible and it > explains the phenomenon we experienced: > !image-2025-06-13-23-01-40-371.png|width=597,height=471! > Actually we confirmed the log-segment was rolled out right before the > phenomenon (i.e. entering [if block at > LogSegment#L106|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L106] > is possible) and there were clients calling listOffsets API on the > partition, which matches to the scenario. > h2. Suggested Fix > We should protect [maxTimestampAndOffsetSoFar > here|https://github.com/apache/kafka/blob/4.0.0/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java#L195-L197] > by double-checked locking -- This message was sent by Atlassian Jira (v8.20.10#820010)