Earlier this year, our cluster crashed multiple times due to the same Fatal error regarding replica offset.
In our production cluster, we set replication factor to 3 and min isr to 2. On the day this error occurred we temporarily doubled the number of producers and all kafka brokers were under high load. After about 30 minutes, the first fatal error happened because one of the replica offset was higher than leader and unclean leader election was disabled. In the following few hours, the same error but across different partitions caused the failure of other brokers. At some point both replicas of the same partition had an offset higher than the leader’s one and the only way to recover for this fatal error and bring the partition back on-line was to enable unclean leader election. Here are some related logs near the crash, some similar lines are skipped. Broker 3 was the crashed broker. For some reason, both broker 1 and broker 2 were updating isr before the error happened. We tried to reproduce this bug in a smaller cluster, but it didn’t happen again. On controller 5 [14:48:37,961] WARN [Controller 5]: Cannot remove replica 2 from ISR of partition [topic1,26] since it is not in the ISR. Leader = 1 ; ISR = List(1, 3) (kafka.controller.KafkaController) [14:48:41,887] WARN [Channel manager on controller 5]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 67344; ClientId: ; DeletePartitions: false; ControllerId: 5; ControllerEpoch: 130; Partitions: [topic1,26] to broker 2, since it is offline. (kafka.controller.ControllerChannelManager) [14:49:05,449] WARN [Controller-5-to-broker-1-send-thread], Controller 5 epoch 130 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:5;ControllerEpoch:130;CorrelationId:67343;ClientId:id_5-host_null-port_9092; [topic1,26] -> (LeaderAndIsrInfo:(Leader:1,ISR:1,3,LeaderEpoch:41,ControllerEpoch:130),ReplicationFactor:3),AllReplicas:2,1,3) [14:52:43,116] WARN [Controller-5-to-broker-3-send-thread], Controller 5 epoch 130 fails to send request Name:UpdateMetadataRequest;Version:0;Controller:5;ControllerEpoch:130;CorrelationId:67343;ClientId:id_5-host_null-port_9092; [topic1,26] -> (LeaderAndIsrInfo:(Leader:1,ISR:1,3,LeaderEpoch:41,ControllerEpoch:130),ReplicationFactor:3),AllReplicas:2,1,3) [14:48:37,961] WARN [Controller 5]: Cannot remove replica 2 from ISR of partition [topic1,26] since it is not in the ISR. Leader = 1 ; ISR = List(1, 3) (kafka.controller.KafkaController) On broker 3 [14:46:22,743] INFO [ReplicaFetcherManager on broker 3] Added fetcher for partitions List([[topic1,26], initOffset 3274057000 to broker id:2,host2,port:9092] ) (kafka.server.ReplicaFetcherManager) [14:52:41,774] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [topic1,26] (kafka.server.ReplicaFetcherManager) [14:52:42,194] INFO [ReplicaFetcherManager on broker 3] Added fetcher for partitions List([topic1,26], initOffset 3274808121 to broker id:1,host:host1,port:9092] (kafka.server.ReplicaFetcherManager) [14:52:42,625] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic topic1, Current leader 1's latest offset 3274790195 is less than replica 3's latest offset 3274808121 (kafka.server.ReplicaFetcherThread) On broker 2 [14:46:23,627] INFO Partition [topic1,26] on broker 2: Expanding ISR for partition [topic1,26] from 2,1 to 2,1,3 (kafka.cluster.Partition) [14:49:04,040] INFO Partition [topic1,26] on broker 2: Shrinking ISR for partition [topic1,26] from 2,1,3 to 2,1 (kafka.cluster.Partition) [14:49:04,047] INFO Partition [topic1,26] on broker 2: Cached zkVersion [252] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [14:49:04,791] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [topic1,26] (kafka.server.ReplicaFetcherManager) [14:49:05,344] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([topic1,26], initOffset 3274531203 to broker id:1,host:host1,port:9092] ) (kafka.server.ReplicaFetcherManager) [14:52:04,409] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [topic1,26] (kafka.server.ReplicaFetcherManager) [14:52:33,845] INFO Partition [topic1,26] on broker 2: Shrinking ISR for partition [topic1,26] from 2,3 to 2 (kafka.cluster.Partition) [14:52:37,264] INFO Partition [topic1,26] on broker 2: Expanding ISR for partition [topic1,26] from 2 to 2,3 (kafka.cluster.Partition) On broker 1 [14:48:35,421] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [topic1,26] (kafka.server.ReplicaFetcherManager) [14:49:17,812] INFO Partition [topic1,26] on broker 1: Expanding ISR for partition [topic1,26] from 1 to 1,2 (kafka.cluster.Partition) [14:49:58,541] INFO Partition [topic1,26] on broker 1: Shrinking ISR for partition [topic1,26] from 1,2 to 1 (kafka.cluster.Partition) [14:49:59,225] INFO Partition [topic1,26] on broker 1: Expanding ISR for partition [topic1,26] from 1 to 1,2 (kafka.cluster.Partition) [14:52:38,602] INFO Partition [topic1,26] on broker 1: Shrinking ISR for partition [topic1,26] from 1,2 to 1 (kafka.cluster.Partition) [14:52:38,611] INFO Partition [topic1,26] on broker 1: Cached zkVersion [267] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [14:52:48,560] INFO Partition [topic1,26] on broker 1: Shrinking ISR for partition [topic1,26] from 1,2 to 1 (kafka.cluster.Partition) [14:52:48,567] INFO Partition [topic1,26] on broker 1: Cached zkVersion [267] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)