[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14142261#comment-14142261 ]
Jiangjie Qin edited comment on KAFKA-1305 at 9/21/14 12:45 AM: --------------------------------------------------------------- I looked into this problem and it seems to me the issue is mainly because the default controller queue size was too small. The problem flow is as below: 1. Controller 265 received controlled shutdown request 2. Controller 265 put leaderAndIsrRequest into controller message queue and responded to broker 265. 3. Broker 265 received respond from Controller 265, shutdown successfully and de-registerred itself form zk. 4. Controller 265 request send thread started to send leaderAndIsrRequests which are put in step 2 to the brokers. Since broker 265 has already shutdown, it will start infinite retry. At this moment, the controller message queue size will never decrease. (Thread t@113) 5. Scheduled preferred leader election started, grabbed the controller lock and was trying to put the LeaderAndIsr request into controller message queue. However, because the queue size is only 10, it could not finish but just blocking on the put method while still holding the controller lock. (Thread t@117) 6. Broker change listener on controller 265 was triggered because broker path change in step 3, it was trying to grab the controller lock and stop thread t@113, but failed to do that because thread t@117 was holding controller lock and waiting on the controller message queue. Currently the controller message queue size is 10. IMO if we can increase the number to be 100 or even bigger, this problem won't happen again. Actually, in most time, the number of messages in the queue will be small even empty because there should not be too many controller messages. So increasing the queue size won't cause memory consumption to be increase. was (Author: becket_qin): I looked into this problem and it seems to me the issue is mainly because the default controller queue size was too small. The follow is as below: 1. Controller 265 received controlled shutdown request 2. Controller 265 put leaderAndIsrRequest into controller message queue and responded to broker 265. 3. Broker 265 received respond from Controller 265, shutdown successfully and de-registerred itself form zk. 4. Controller 265 request send thread started to send leaderAndIsrRequests which are put in step 2 to the brokers. Since broker 265 has already shutdown, it will start infinite retry. At this moment, the controller message queue size will never decrease. (Thread t@113) 5. Scheduled preferred leader election started, grabbed the controller lock and was trying to put the LeaderAndIsr request into controller message queue. However, because the queue size is only 10, it could not finish but just blocking on the put method while still holding the controller lock. (Thread t@117) 6. Broker change listener on controller 265 was triggered because broker path change in step 3, it was trying to grab the controller lock and stop thread t@113, but failed to do that because thread t@117 was holding controller lock and waiting on the controller message queue. Currently the controller message queue size is 10. IMO if we can increase the number to be 100 or even bigger, this problem won't happen again. Actually, in most time, the number of messages in the queue will be small even empty because there should not be too many controller messages. So increasing the queue size won't cause memory consumption to be increase. > Controller can hang on controlled shutdown with auto leader balance enabled > --------------------------------------------------------------------------- > > Key: KAFKA-1305 > URL: https://issues.apache.org/jira/browse/KAFKA-1305 > Project: Kafka > Issue Type: Bug > Reporter: Joel Koshy > Assignee: Neha Narkhede > Priority: Blocker > Fix For: 0.9.0 > > > This is relatively easy to reproduce especially when doing a rolling bounce. > What happened here is as follows: > 1. The previous controller was bounced and broker 265 became the new > controller. > 2. I went on to do a controlled shutdown of broker 265 (the new controller). > 3. In the mean time the automatically scheduled preferred replica leader > election process started doing its thing and starts sending > LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). > (t@113 below). > 4. While that's happening, the controlled shutdown process on 265 succeeds > and proceeds to deregister itself from ZooKeeper and shuts down the socket > server. > 5. (ReplicaStateMachine actually removes deregistered brokers from the > controller channel manager's list of brokers to send requests to. However, > that removal cannot take place (t@18 below) because preferred replica leader > election task owns the controller lock.) > 6. So the request thread to broker 265 gets into infinite retries. > 7. The entire broker shutdown process is blocked on controller shutdown for > the same reason (it needs to acquire the controller lock). > Relevant portions from the thread-dump: > "Controller-265-to-broker-265-send-thread" - Thread t@113 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at > kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > - locked java.lang.Object@6dbf14a7 > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Locked ownable synchronizers: > - None > ... > "Thread-4" - Thread t@17 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) > at > kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) > at kafka.utils.Utils$.swallowWarn(Utils.scala:46) > at kafka.utils.Logging$class.swallow(Logging.scala:94) > at kafka.utils.Utils$.swallow(Utils.scala:46) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) > at > kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) > at kafka.Kafka$$anon$1.run(Kafka.scala:42) > ... > "kafka-scheduler-0" - Thread t@117 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at > kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) > - locked java.lang.Object@578b748f > at > kafka.controller.KafkaController.sendRequest(KafkaController.scala:657) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126) > at > kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088) > at > kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Locked ownable synchronizers: > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 > - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530 > ... > "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: > kafka-scheduler-0 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)