Warren Jin created KAFKA-2575:
---------------------------------

             Summary: kafka.server.OffsetCheckpoint found inconsistant offset 
entry count, lead to NotAssignedReplicaException
                 Key: KAFKA-2575
                 URL: https://issues.apache.org/jira/browse/KAFKA-2575
             Project: Kafka
          Issue Type: Bug
          Components: kafka streams
    Affects Versions: 0.8.2.1
            Reporter: Warren Jin


We have more than 100 topics in production, the default partition number is 24 
for each topic.

We noticed the following errors in recent days.
2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr 
request correlationId 438501 received from controller 2 epoch 12 for partition 
[LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
java.io.IOException: Expected 3918 entries but found only 3904
        at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
        at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
        at kafka.cluster.Partition.makeLeader(Partition.scala:163)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
        at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
        at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)

the it repeatly pring out the error message:
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14943530 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 13477660 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14988525 from client ReplicaFetcherThread-3-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: 
ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: 
ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)


I checked the kafka source code, the replication manager write the offsets of 
all the partitions to replication-offset-checkpoint every 5 seconds, and it has 
the internel lock for this file for every OffsetCheckpoint, it shoud be 
impossible that the offset count is 3918, but the actual count of offset 
entries is 3904? Is it the multihread issue that some other thread flush the 
content to the same file due to the internal lock in OffsetCheckPoint.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to