[ 
https://issues.apache.org/jira/browse/KAFKA-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-19407.
------------------------------------
    Resolution: Fixed

trunk: 
https://github.com/apache/kafka/commit/959021de59fb03bafc8a5e0a8cfde748c38a3c9c

4.1: 
https://github.com/apache/kafka/commit/1dad77615d345a68cddaa50416279c34b6f4ac8b

4.0: 
https://github.com/apache/kafka/commit/9fcfe546d1313b77ad5dc6b10be3095fe6d32569

> OffsetsOutOfOrderException on followers due to the race condition in the 
> leader
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-19407
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19407
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.3.2
>            Reporter: Haruki Okada
>            Assignee: Haruki Okada
>            Priority: Major
>             Fix For: 4.0.1, 4.1.0
>
>         Attachments: image-2025-06-13-23-01-40-371.png
>
>
> h2. Environment
>  * Kafka version: 3.3.2 (But we suppose this issue still exists in latest 
> Kafka)
>  * Replication factor: 3
>  * Topic's cleanup.policy: delete
> h2. Phenomenon
> We experienced a partition in our cluster got UnderMinISR suddenly without 
> any hardware/network issue or any operation.
> {code:java}
> [2025-06-10 20:27:14,310] INFO [Partition topic-X-49 broker=17] Shrinking ISR 
> from 15,16,17 to 17. Leader: (highWatermark: 572579089, endOffset: 
> 572579215). Out of sync replicas: (brokerId: 15, endOffset: 572579089) 
> (brokerId: 16, endOffset: 572579089). (kafka.cluster.Partition)
> {code}
> On both followers, we saw below log:
> {code:java}
> [2025-06-10 20:26:59,804] ERROR [ReplicaFetcher replicaId=16, leaderId=17, 
> fetcherId=1] Unexpected error occurred while processing data for partition 
> topic-X-49 at offset 572579089 (kafka.server.ReplicaFetcherThread)
> kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
> to topic-X-49: ArrayBuffer(572579089, 572579090, 572579091, 572579092, 
> 572579093, 572579094, 572579095, 572579096, 572579097, 572579098, 572579099, 
> 572579100, 572579101, 572579102, 572579103, 572579104, 572579105, 572579106, 
> 572579107, 572579108, 572579109, 572579089,...
> {code}
> The log tells there's an offset regression after 572579109 (to 572579089).
> h2. Analysis
> h3. The cause of the offset regression
> We dumped active log segment on the leader and confirmed that the leader's 
> log actually contains records where non-monotonic offsets assigned.
> So the problem must exists on the leader rather than the follower.
> On the leader, we found below log is output right before the follower 
> experiences OffsetsOutOfOrderException.
> {code:java}
> [2025-06-10 20:26:37,463] ERROR [ReplicaManager broker=17] Error processing 
> append operation on partition topic-X-49 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Attempt to append a timestamp 
> (1749554796970) to slot 1 no larger than the last timestamp appended 
> (1749554797191) to 
> /path/to/kafka-log-dir/topic-X-49/00000000000572579060.timeindex.
>         at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:128)
>         at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
>         at kafka.log.LogSegment.append(LogSegment.scala:167)
>         at kafka.log.LocalLog.append(LocalLog.scala:442)
>         at kafka.log.UnifiedLog.append(UnifiedLog.scala:950)
>         at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
>         at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
>         at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
>         at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>         at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>         at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
>         at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> {code}
> This explains the offset regression as below:
>  * Due to the IllegalStateException in the middle of handling produce 
> request, LogEndOffset wasn't updated while records were already written to 
> the log segment.
>  * When the leader handles subsequent produce requests, it assigns same 
> offset to the record batch
>  ** => Non-monotonic offset sequence happens and it causes 
> OffsetsOutOfOrderException on followers
> h3. The cause of IllegalStateException
> The exception tells that trying to append timestamp 1749554796970 which is 
> smaller than the last entry in the time index (1749554797191), that is 
> invalid.
> The timestamp about to be appended here is 
> [LogSegment#maxTimestampSoFar|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L355].
>  It means that maxTimestampSoFar regressed somehow.
> After reading the code, we found that below race-condition is possible and it 
> explains the phenomenon we experienced:
> !image-2025-06-13-23-01-40-371.png|width=597,height=471!
> Actually we confirmed the log-segment was rolled out right before the 
> phenomenon (i.e. entering [if block at 
> LogSegment#L106|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L106]
>  is possible) and there were clients calling listOffsets API on the 
> partition, which matches to the scenario.
> h2. Suggested Fix
> We should protect [maxTimestampAndOffsetSoFar 
> here|https://github.com/apache/kafka/blob/4.0.0/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java#L195-L197]
>  by double-checked locking



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to