[ https://issues.apache.org/jira/browse/KAFKA-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14296572#comment-14296572 ]
yufeng.chen commented on KAFKA-1903: ------------------------------------ kafka_2.11-0.8.2-beta doesn't have the issue, the deleteTopicThread has not use the ReentrantLock whith KafkaController together, the deleteTopicThread has it's own ReentrantLock. Thanks ! > Zk Expiration causes controller deadlock > ---------------------------------------- > > Key: KAFKA-1903 > URL: https://issues.apache.org/jira/browse/KAFKA-1903 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 0.8.1, 0.8.1.1 > Environment: java version "1.7.0_55" > Java(TM) SE Runtime Environment (build 1.7.0_55-b13) > Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode) > kafka_2.9.2-0.8.1 > Reporter: yufeng.chen > Assignee: Neha Narkhede > Priority: Critical > > when controller encounter a ZK expired, zookeeper node /broker/ids lost one > kafkk controler. If there has three node, e.g. 1 2, 3; and the 1 start > delete-topic-method thread. At this time, node 1 will lost. Why? The reason > is that: when ZK expiration happened, the zk-event-thread will call > KafkaController.SessionExpirationListener.handleNewSession method. if the > zk-event-thread has the controllerContext.controllerLock, will call > onControllerResignation->deleteTopicManager.shutdown()->deleteTopicsThread.shutdown(). > And the delete-topic-thread is working, and await at > awaitTopicDeletionNotification() method。 Zk-event-thread call > deleteTopicsThread.shutdown() and wait until the run() method execute > compelely. Because the zk-event-thread has the lock, > "deleteTopicsCond.await()" whill not be really "interruted " . Then > zk-event-thread whill pause, not execute the > kafkaHealthcheck->SessionExpireListener.handleNewSession。 The controller will > not register again. The jstack log : > "delete-topics-thread" prio=10 tid=0x00007fb0bc21b000 nid=0x2825 waiting on > condition [0x00007fb0f534a000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000e4952da0> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > "ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181" daemon prio=10 > tid=0x00007fb10038e800 nid=0x7d93 waiting on condition [0x00007fb0f544a000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000e4f4a760> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) > at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)