[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14159830#comment-14159830 ]
Jun Rao commented on KAFKA-1305: -------------------------------- Thinking about this a bit more. As a short term fix, I think Jiangjie's suggestion actually has a point. Basically, if a channel queue is full (since the target broker is down), a thread that tries to put a request into the queue will block while holding the controller lock. This essentially will stall the controller since it can't process any other events, which is bad. Now, imagine what if we make the channel queue unbounded. In this case, no thread will block on putting requests into the queue. So, if a broker is down, the controller will always be able to act on it, which will clear up the queue and remove the channel to the dead broker. The only down side is that if there are outstanding requests in a queue, new important requests may be delayed. This is not a big concern because in the common case, the channel queue shouldn't build up. Sriharsha, Could you do a bit of testing on this? Basically, set the default value of controller.message.queue.size to sth large (e.g., 10K). Create a cluster with a few K partitions per broker. Enable auto leader balancing and keep doing rolling bounces of the cluster (with controlled shutdown enabled) and see there is any issue. Ideally, we want to hit the case that the auto leader balancing happens concurrently with the controlled shutdown in the controller. So, you may want to play with leader.imbalance.check.interval.seconds. > Controller can hang on controlled shutdown with auto leader balance enabled > --------------------------------------------------------------------------- > > Key: KAFKA-1305 > URL: https://issues.apache.org/jira/browse/KAFKA-1305 > Project: Kafka > Issue Type: Bug > Reporter: Joel Koshy > Assignee: Sriharsha Chintalapani > Priority: Blocker > Fix For: 0.8.2, 0.9.0 > > > This is relatively easy to reproduce especially when doing a rolling bounce. > What happened here is as follows: > 1. The previous controller was bounced and broker 265 became the new > controller. > 2. I went on to do a controlled shutdown of broker 265 (the new controller). > 3. In the mean time the automatically scheduled preferred replica leader > election process started doing its thing and starts sending > LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). > (t@113 below). > 4. While that's happening, the controlled shutdown process on 265 succeeds > and proceeds to deregister itself from ZooKeeper and shuts down the socket > server. > 5. (ReplicaStateMachine actually removes deregistered brokers from the > controller channel manager's list of brokers to send requests to. However, > that removal cannot take place (t@18 below) because preferred replica leader > election task owns the controller lock.) > 6. So the request thread to broker 265 gets into infinite retries. > 7. The entire broker shutdown process is blocked on controller shutdown for > the same reason (it needs to acquire the controller lock). > Relevant portions from the thread-dump: > "Controller-265-to-broker-265-send-thread" - Thread t@113 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at > kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > - locked java.lang.Object@6dbf14a7 > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Locked ownable synchronizers: > - None > ... > "Thread-4" - Thread t@17 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) > at > kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) > at > kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) > at kafka.Kafka$$anon$1.run(Kafka.scala:42) > ... > "kafka-scheduler-0" - Thread t@117 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at > kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) > - locked java.lang.Object@578b748f > at > kafka.controller.KafkaController.sendRequest(KafkaController.scala:657) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126) > at > kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088) > at > kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Locked ownable synchronizers: > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530 > ... > "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)