Riley Zimmerman created KAFKA-6798:
--------------------------------------

             Summary: Kafka leader rebalance failures
                 Key: KAFKA-6798
                 URL: https://issues.apache.org/jira/browse/KAFKA-6798
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 1.0.1, 0.10.2.1
            Reporter: Riley Zimmerman


I am running 3 Kafka (version 0.10.2.1 and more recently moved to 1.0.1) with 3 
Zookeeper (v3.4.9) as statefulsets in a kubernetes v1.9.1 deployment.  My 
partitions are replication factor 3.  My main workload involves a kafka streams 
consumer/producer (storing offsets in kafka) and a second kafka consumer 
storing offsets in zookeeper (only commits every 30 seconds).  There are 
~200,000 kafka messages going through each per minute.  The log.retention 
settings are all 4 hours.  I have auto.leader.rebalance.enabled.  

I am randomly having failures during the rebalances.  The result is that 
partitions for both topics and consumer_offsets go out of sync and the 
partition leader becomes -1.  After 4 hours there is another (auto?) rebalance 
and sometimes it sorts itself out.  Sometimes it runs for weeks without 
problems, other times it it happens multiple times in a few days.  It appears 
to happen earlier in test runs if it is going to happen.   
{noformat}
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: -1      
Replicas: 2,0,1 Isr:
        Topic: __consumer_offsets       Partition: 1    Leader: 0       
Replicas: 0,1,2 Isr: 1,2,0
        Topic: __consumer_offsets       Partition: 2    Leader: 1       
Replicas: 1,2,0 Isr: 2,1,0
        Topic: __consumer_offsets       Partition: 3    Leader: -1      
Replicas: 2,1,0 Isr:
{noformat}
{noformat}
[2018-03-20 12:42:32,180] WARN [Controller 2]: Partition [agent.metadata,5] 
failed to complete preferred replica leader election. Leader is -1 
(kafka.controller.KafkaController)
{noformat}
{noformat}
[2018-03-20 11:02:32,099] TRACE Controller 2 epoch 27 started leader election 
for partition [__consumer_offsets,30] (state.change.logger)
[2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 encountered error while 
electing leader for partition [__consumer_offsets,30] due to: Preferred replica 
2 for partition [__consumer_offsets,30] is either not alive or not in the isr. 
Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]. 
(state.change.logger)
[2018-03-20 11:02:32,101] ERROR Controller 2 epoch 27 initiated state change 
for partition [__consumer_offsets,30] from OnlinePartition to OnlinePartition 
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing 
leader for partition [__consumer_offsets,30] due to: Preferred replica 2 for 
partition [__consumer_offsets,30] is either not alive or not in the isr. 
Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}].
        at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362)
        at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202)
        at 
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141)
        at 
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
        at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:140)
        at 
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:662)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1230)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16$$anonfun$apply$5.apply(KafkaController.scala:1225)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16.apply(KafkaController.scala:1222)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$16.apply(KafkaController.scala:1221)
        at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1221)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1203)
        at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
        at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1203)
        at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:352)
        at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(Thread.java:811)
Caused by: kafka.common.StateChangeFailedException: Preferred replica 2 for 
partition [__consumer_offsets,30] is either not alive or not in the isr. 
Current leader and ISR: [{"leader":-1,"leader_epoch":59,"isr":[]}]
        at 
kafka.controller.PreferredReplicaPartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:157)
        at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:339)
        ... 31 more
{noformat}
There are these messages in the zookeeper logs, but they are happening all of 
the time, not only when the failures happen:
{noformat}
2018-03-29 04:46:43,495 [myid:0] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of 
stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, 
likely client has closed socket
        at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
        at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
        at java.lang.Thread.run(Thread.java:811)
{noformat}
{noformat}
2018-03-29 08:56:46,195 [myid:1] - INFO  [ProcessThread(sid:1 
cport:-1)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x62633bc4724c26 type:setData cxid:0x654465 
zxid:0x100361191 txntype:-1 reqpath:n/a Error 
Path:/brokers/topics/metric.json/partitions/1/state Error:KeeperErrorCode = 
BadVersion for /brokers/topics/metric.json/partitions/1/state
2018-03-29 08:56:46,201 [myid:1] - INFO  [ProcessThread(sid:1 
cport:-1)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x62633bc4724c26 type:setData cxid:0x654467 
zxid:0x100361192 txntype:-1 reqpath:n/a Error 
Path:/brokers/topics/metric.json/partitions/10/state Error:KeeperErrorCode = 
BadVersion for /brokers/topics/metric.json/partitions/10/state
{noformat}
I saw https://issues.apache.org/jira/browse/KAFKA-4084 which involves major 
changes to the rebalances.  I'm in the process of moving to kafka 1.1.0 to see 
if it helps.  

 Any advice on what else to look into would be appreciated.  

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to