[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1305:
------------------------------

    Description: 
This is relatively easy to reproduce especially when doing a rolling bounce.
What happened here is as follows:

1. The previous controller was bounced and broker 265 became the new controller.
2. I went on to do a controlled shutdown of broker 265 (the new controller).
3. In the mean time the automatically scheduled preferred replica leader 
election process started doing its thing and starts sending 
LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
(t@113 below).
4. While that's happening, the controlled shutdown process on 265 succeeds and 
proceeds to deregister itself from ZooKeeper and shuts down the socket server.
5. (ReplicaStateMachine actually removes deregistered brokers from the 
controller channel manager's list of brokers to send requests to.  However, 
that removal cannot take place (t@18 below) because preferred replica leader 
election task owns the controller lock.)
6. So the request thread to broker 265 gets into infinite retries.
7. The entire broker shutdown process is blocked on controller shutdown for the 
same reason (it needs to acquire the controller lock).

Relevant portions from the thread-dump:

"Controller-265-to-broker-265-send-thread" - Thread t@113
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at 
kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        - locked java.lang.Object@6dbf14a7
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

   Locked ownable synchronizers:
        - None

...

"Thread-4" - Thread t@17
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
        at 
kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
        at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
        at kafka.Kafka$$anon$1.run(Kafka.scala:42)

...

"kafka-scheduler-0" - Thread t@117
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
        - locked java.lang.Object@578b748f
        at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
        at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
        at 
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
        at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
        at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
        at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840

        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530

...

"ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)


  was:
This is relatively easy to reproduce especially when doing a rolling bounce.
What happened here is as follows:

# The previous controller was bounced and broker 265 became the new controller.
# I went on to do a controlled shutdown of broker 265 (the new controller).
# In the mean time the automatically scheduled preferred replica leader 
election process started doing its thing and starts sending 
LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
(t@113 below).
# While that's happening, the controlled shutdown process on 265 succeeds and 
proceeds to deregister itself from ZooKeeper and shuts down the socket server.
# (ReplicaStateMachine actually removes deregistered brokers from the 
controller channel manager's list of brokers to send requests to.  However, 
that removal cannot take place (t@18 below) because preferred replica leader 
election task owns the controller lock.)
# So the request thread to broker 265 gets into infinite retries.
# The entire broker shutdown process is blocked on controller shutdown for the 
same reason (it needs to acquire the controller lock).

Relevant portions from the thread-dump:

{code}
"Controller-265-to-broker-265-send-thread" - Thread t@113
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at 
kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        - locked java.lang.Object@6dbf14a7
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

   Locked ownable synchronizers:
        - None

...

"Thread-4" - Thread t@17
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
        at 
kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
        at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
        at kafka.utils.Logging$class.swallow(Logging.scala:94)
        at kafka.utils.Utils$.swallow(Utils.scala:46)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
        at 
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
        at kafka.Kafka$$anon$1.run(Kafka.scala:42)

...

"kafka-scheduler-0" - Thread t@117
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
        - locked java.lang.Object@578b748f
        at 
kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
        at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
        at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
        at 
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
        at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
        at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
        at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
        at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

   Locked ownable synchronizers:
        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840

        - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530

...

"ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
   java.lang.Thread.State: WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
kafka-scheduler-0
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
        at kafka.utils.Utils$.inLock(Utils.scala:536)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{code}



> Controller can hang on controlled shutdown with auto leader balance enabled
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-1305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1305
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>             Fix For: 0.8.2
>
>
> This is relatively easy to reproduce especially when doing a rolling bounce.
> What happened here is as follows:
> 1. The previous controller was bounced and broker 265 became the new 
> controller.
> 2. I went on to do a controlled shutdown of broker 265 (the new controller).
> 3. In the mean time the automatically scheduled preferred replica leader 
> election process started doing its thing and starts sending 
> LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
> (t@113 below).
> 4. While that's happening, the controlled shutdown process on 265 succeeds 
> and proceeds to deregister itself from ZooKeeper and shuts down the socket 
> server.
> 5. (ReplicaStateMachine actually removes deregistered brokers from the 
> controller channel manager's list of brokers to send requests to.  However, 
> that removal cannot take place (t@18 below) because preferred replica leader 
> election task owns the controller lock.)
> 6. So the request thread to broker 265 gets into infinite retries.
> 7. The entire broker shutdown process is blocked on controller shutdown for 
> the same reason (it needs to acquire the controller lock).
> Relevant portions from the thread-dump:
> "Controller-265-to-broker-265-send-thread" - Thread t@113
>    java.lang.Thread.State: TIMED_WAITING
>       at java.lang.Thread.sleep(Native Method)
>       at 
> kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
>       at kafka.utils.Utils$.swallow(Utils.scala:167)
>       at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>       at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>       at kafka.utils.Logging$class.swallow(Logging.scala:94)
>       at kafka.utils.Utils$.swallow(Utils.scala:46)
>       at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>       - locked java.lang.Object@6dbf14a7
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>    Locked ownable synchronizers:
>       - None
> ...
> "Thread-4" - Thread t@17
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
> kafka-scheduler-0
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>       at kafka.utils.Utils$.inLock(Utils.scala:536)
>       at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
>       at 
> kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
>       at kafka.utils.Utils$.swallow(Utils.scala:167)
>       at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>       at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>       at kafka.utils.Logging$class.swallow(Logging.scala:94)
>       at kafka.utils.Utils$.swallow(Utils.scala:46)
>       at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
>       at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
>       at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> ...
> "kafka-scheduler-0" - Thread t@117
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>       at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>       at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
>       - locked java.lang.Object@578b748f
>       at 
> kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>       at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
>       at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
>       at 
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
>       at kafka.utils.Utils$.inLock(Utils.scala:538)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
>       at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
>       at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>       at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
>       at 
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
>       at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>       at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
>    Locked ownable synchronizers:
>       - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
>       - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530
> ...
> "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
> kafka-scheduler-0
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>       at kafka.utils.Utils$.inLock(Utils.scala:536)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
>       at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to