[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143383#comment-14143383 ]
Guozhang Wang commented on KAFKA-1305: -------------------------------------- Thanks [~becket_qin] for the great findings. It seems to me that as long as the controller's channel manager is async, no matter how large is its queue the corner-case issue can still happen in (i.e. request blocked in the queue for brokers that is already shutdown but the ZK watcher not fired yet), and causing some chain of lock conflicts. Currently the controller has multiple threads for admin commands, ZK listeners, scheduled operations (leader electioner), etc, which complicates the locking mechanism inside controller. After going through the code I think it would be better to refactor the controller as following: 1. Besides the async channel manager's sender thread, we use only a single controller thread and have a single working queue for the controller thread. 3. ZK fire handling logic determines the event (topic/partition/broker change, admin operation, etc), and put the task into the queue. 4. Scheduled task is also created periodically and put into the queue. 5. The controller did one task at a time, which do not need to compete locks on controller metadata. 6. Make the channel manager's queue size infinite and add a metric on monitoring its size. With this the controller logic would be easier to read / debug, may also help KAFKA-1558. The downside is that since a single thread is used, it loses parallelism for controller task handling, and the unbounded channel queue may also be an issue (when there is a bug). But since controller tasks are usually rare in practice, this should not be an issue. > Controller can hang on controlled shutdown with auto leader balance enabled > --------------------------------------------------------------------------- > > Key: KAFKA-1305 > URL: https://issues.apache.org/jira/browse/KAFKA-1305 > Project: Kafka > Issue Type: Bug > Reporter: Joel Koshy > Assignee: Neha Narkhede > Priority: Blocker > Fix For: 0.9.0 > > > This is relatively easy to reproduce especially when doing a rolling bounce. > What happened here is as follows: > 1. The previous controller was bounced and broker 265 became the new > controller. > 2. I went on to do a controlled shutdown of broker 265 (the new controller). > 3. In the mean time the automatically scheduled preferred replica leader > election process started doing its thing and starts sending > LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). > (t@113 below). > 4. While that's happening, the controlled shutdown process on 265 succeeds > and proceeds to deregister itself from ZooKeeper and shuts down the socket > server. > 5. (ReplicaStateMachine actually removes deregistered brokers from the > controller channel manager's list of brokers to send requests to. However, > that removal cannot take place (t@18 below) because preferred replica leader > election task owns the controller lock.) > 6. So the request thread to broker 265 gets into infinite retries. > 7. The entire broker shutdown process is blocked on controller shutdown for > the same reason (it needs to acquire the controller lock). > Relevant portions from the thread-dump: > "Controller-265-to-broker-265-send-thread" - Thread t@113 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at > kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > - locked java.lang.Object@6dbf14a7 > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Locked ownable synchronizers: > - None > ... > "Thread-4" - Thread t@17 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) > at > kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) > at > kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) > at kafka.Kafka$$anon$1.run(Kafka.scala:42) > ... > "kafka-scheduler-0" - Thread t@117 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at > kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) > - locked java.lang.Object@578b748f > at > kafka.controller.KafkaController.sendRequest(KafkaController.scala:657) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126) > at > kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088) > at > kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Locked ownable synchronizers: > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530 > ... > "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)