Joel Koshy created KAFKA-1305: --------------------------------- Summary: Controller can hang on controlled shutdown with auto leader balance enabled Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.2
This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: # The previous controller was bounced and broker 265 became the new controller. # I went on to do a controlled shutdown of broker 265 (the new controller). # In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). # While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. # (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) # So the request thread to broker 265 gets into infinite retries. # The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: {code} "Controller-265-to-broker-265-send-thread" - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... "Thread-4" - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... "kafka-scheduler-0" - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) - locked java.lang.Object@578b748f at kafka.controller.KafkaController.sendRequest(KafkaController.scala:657) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126) at kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088) at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323) at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Locked ownable synchronizers: - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530 ... "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {code} -- This message was sent by Atlassian JIRA (v6.2#6252)