Joel Koshy created KAFKA-1305:
---------------------------------
Summary: Controller can hang on controlled shutdown with auto
leader balance enabled
Key: KAFKA-1305
URL: https://issues.apache.org/jira/browse/KAFKA-1305
Project: Kafka
Issue Type: Bug
Reporter: Joel Koshy
Fix For: 0.8.2
This is relatively easy to reproduce especially when doing a rolling bounce.
What happened here is as follows:
# The previous controller was bounced and broker 265 became the new controller.
# I went on to do a controlled shutdown of broker 265 (the new controller).
# In the mean time the automatically scheduled preferred replica leader
election process started doing its thing and starts sending
LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).
(t@113 below).
# While that's happening, the controlled shutdown process on 265 succeeds and
proceeds to deregister itself from ZooKeeper and shuts down the socket server.
# (ReplicaStateMachine actually removes deregistered brokers from the
controller channel manager's list of brokers to send requests to. However,
that removal cannot take place (t@18 below) because preferred replica leader
election task owns the controller lock.)
# So the request thread to broker 265 gets into infinite retries.
# The entire broker shutdown process is blocked on controller shutdown for the
same reason (it needs to acquire the controller lock).
Relevant portions from the thread-dump:
{code}
"Controller-265-to-broker-265-send-thread" - Thread t@113
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.Utils$.swallow(Utils.scala:46)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
- locked java.lang.Object@6dbf14a7
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
- None
...
"Thread-4" - Thread t@17
java.lang.Thread.State: WAITING on
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by:
kafka-scheduler-0
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
at kafka.utils.Utils$.inLock(Utils.scala:536)
at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
at
kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.Utils$.swallow(Utils.scala:46)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
at
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
at kafka.Kafka$$anon$1.run(Kafka.scala:42)
...
"kafka-scheduler-0" - Thread t@117
java.lang.Thread.State: WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
at
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
- locked java.lang.Object@578b748f
at
kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
at
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
at
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
at
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
at
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
at
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
at
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Locked ownable synchronizers:
- locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
- locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530
...
"ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
java.lang.Thread.State: WAITING on
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by:
kafka-scheduler-0
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
at kafka.utils.Utils$.inLock(Utils.scala:536)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)