[ 
https://issues.apache.org/jira/browse/KAFKA-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Warren Jin updated KAFKA-2575:
------------------------------
    Description: 
We have 3 brokers, more than 100 topics in production, the default partition 
number is 24 for each topic, the replication factor is 3.

We noticed the following errors in recent days.
2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr 
request correlationId 438501 received from controller 2 epoch 12 for partition 
[LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
java.io.IOException: Expected 3918 entries but found only 3904
        at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
        at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
        at kafka.cluster.Partition.makeLeader(Partition.scala:163)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
        at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
        at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)

It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election, 
then it repeatly pring out the error message:
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14943530 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 13477660 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14988525 from client ReplicaFetcherThread-3-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: 
ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: 
ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)


I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, 
LogManager.Scala, the replication manager write the offsets of all the 
partitions to replication-offset-checkpoint every 5 seconds, and it has the 
internel lock for this file for every OffsetCheckpoint, it shoud be impossible 
that the offset count is 3918, but the actual count of offset entries is 3904? 
Is it the multihread issue that some other thread flush the content to the same 
file due to the internal lock in OffsetCheckPoint? I'm not familiar with the 
Scala, but the 

class OffsetCheckpoint(val file: File) extends Logging {
  private val lock = new Object()

it looks like the instance lock not the static class lock.

If it's the issue in Kafka, is there any quick work around for this problem?
We restart this broker, the error was disappearred, but the replicas for this 
topic is not that correct although it could produce and consume the message.

Please let me know if you need more information.

Thanks and best regards,
 Warren





  was:
We have 3 brokers, more than 100 topics in production, the default partition 
number is 24 for each topic, the replication factor is 3.

We noticed the following errors in recent days.
2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr 
request correlationId 438501 received from controller 2 epoch 12 for partition 
[LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
java.io.IOException: Expected 3918 entries but found only 3904
        at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
        at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
        at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at 
kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
        at kafka.cluster.Partition.makeLeader(Partition.scala:163)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
        at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
        at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
        at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)

It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election, 
then it repeatly pring out the error message:
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14943530 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 13477660 from client ReplicaFetcherThread-2-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with 
correlation id 14988525 from client ReplicaFetcherThread-3-1 on partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for partition 
[LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 (kafka.server.ReplicaManager)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: 
ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)
2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: 
ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> 
PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
0's position -1 since the replica is not recognized to be one of the assigned 
replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
        at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
        at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)


I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, 
LogManager.Scala, the replication manager write the offsets of all the 
partitions to replication-offset-checkpoint every 5 seconds, and it has the 
internel lock for this file for every OffsetCheckpoint, it shoud be impossible 
that the offset count is 3918, but the actual count of offset entries is 3904? 
Is it the multihread issue that some other thread flush the content to the same 
file due to the internal lock in OffsetCheckPoint? I'm not familiar with the 
Scala, but the 

class OffsetCheckpoint(val file: File) extends Logging {
  private val lock = new Object()

it looks like the instance lock not the lock in class leve.

If it's the issue in Kafka, is there any quick work around for this problem?
We restart this broker, the error was disappearred, but the replicas for this 
topic is not that correct although it could produce and consume the message.

Please let me know if you need more information.

Thanks and best regards,
 Warren






> inconsistant offset count in replication-offset-checkpoint during lead 
> election leads to huge exceptions
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2575
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2575
>             Project: Kafka
>          Issue Type: Bug
>          Components: kafka streams
>    Affects Versions: 0.8.2.1
>            Reporter: Warren Jin
>
> We have 3 brokers, more than 100 topics in production, the default partition 
> number is 24 for each topic, the replication factor is 3.
> We noticed the following errors in recent days.
> 2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr 
> request correlationId 438501 received from controller 2 epoch 12 for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
> java.io.IOException: Expected 3918 entries but found only 3904
>       at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
>       at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
>       at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
>       at 
> kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
>       at kafka.utils.Utils$.inLock(Utils.scala:535)
>       at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
>       at kafka.cluster.Partition.makeLeader(Partition.scala:163)
>       at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
>       at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>       at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
>       at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
>       at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election, 
> then it repeatly pring out the error message:
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 14943530 from client ReplicaFetcherThread-2-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15022337 from client ReplicaFetcherThread-1-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15078431 from client ReplicaFetcherThread-0-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 13477660 from client ReplicaFetcherThread-2-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15022337 from client ReplicaFetcherThread-1-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 15078431 from client ReplicaFetcherThread-0-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request 
> with correlation id 14988525 from client ReplicaFetcherThread-3-1 on 
> partition [LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for 
> partition [LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 
> (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
> FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: 
> ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> 
> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> 
> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 0's position -1 since the replica is not recognized to be one of the assigned 
> replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
>       at 
> kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>       at 
> kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: 
> FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: 
> ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> 
> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> 
> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 0's position -1 since the replica is not recognized to be one of the assigned 
> replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
>       at 
> kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
>       at 
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>       at 
> kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, 
> LogManager.Scala, the replication manager write the offsets of all the 
> partitions to replication-offset-checkpoint every 5 seconds, and it has the 
> internel lock for this file for every OffsetCheckpoint, it shoud be 
> impossible that the offset count is 3918, but the actual count of offset 
> entries is 3904? Is it the multihread issue that some other thread flush the 
> content to the same file due to the internal lock in OffsetCheckPoint? I'm 
> not familiar with the Scala, but the 
> class OffsetCheckpoint(val file: File) extends Logging {
>   private val lock = new Object()
> it looks like the instance lock not the static class lock.
> If it's the issue in Kafka, is there any quick work around for this problem?
> We restart this broker, the error was disappearred, but the replicas for this 
> topic is not that correct although it could produce and consume the message.
> Please let me know if you need more information.
> Thanks and best regards,
>  Warren



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to