Joel Koshy created KAFKA-1305:
---------------------------------

             Summary: Controller can hang on controlled shutdown with auto 
leader balance enabled
                 Key: KAFKA-1305
                 URL: https://issues.apache.org/jira/browse/KAFKA-1305
             Project: Kafka
          Issue Type: Bug
            Reporter: Joel Koshy
             Fix For: 0.8.2


This is relatively easy to reproduce especially when doing a rolling bounce.
What happened here is as follows:

# The previous controller was bounced and broker 265 became the new controller.
# I went on to do a controlled shutdown of broker 265 (the new controller).
# In the mean time the automatically scheduled preferred replica leader 
election process started doing its thing and starts sending 
LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
(t@113 below).
# While that's happening, the controlled shutdown process on 265 succeeds and 
proceeds to deregister itself from ZooKeeper and shuts down the socket server.
# (ReplicaStateMachine actually removes deregistered brokers from the 
controller channel manager's list of brokers to send requests to.  However, 
that removal cannot take place (t@18 below) because preferred replica leader 
election task owns the controller lock.)
# So the request thread to broker 265 gets into infinite retries.
# The entire broker shutdown process is blocked on controller shutdown for the 
same reason (it needs to acquire the controller lock).

Relevant portions from the thread-dump:

{code}
"Controller-265-to-broker-265-send-thread" - Thread t@113
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at 
kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        - locked java.lang.Object@6dbf14a7
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

   Locked ownable synchronizers:
        - None

...

"Thread-4" - Thread t@17
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
        at 
kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
        at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
        at kafka.Kafka$$anon$1.run(Kafka.scala:42)

...

"kafka-scheduler-0" - Thread t@117
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
        - locked java.lang.Object@578b748f
        at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
        at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
        at 
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
        at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
        at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
        at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840

        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530

...

"ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{code}




--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to