[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862668#comment-16862668 ]
leibo edited comment on KAFKA-8532 at 6/13/19 4:00 AM: ------------------------------------------------------- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} expiryScheduler.startup() try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) catch { case e: Throwable => close() //此处close的时候,加写锁 throw e } def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) { // program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) { // program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() > controller-event-thread deadlock with zk-session-expiry-handler0 > ---------------------------------------------------------------- > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.1.1 > Reporter: leibo > Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only way to > recovery the kafka cluster is restart kafka server. Bellow is the jstack log > of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x00007fcc9c010000 nid=0xfb22 waiting on condition [0x00007fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x0000000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x00007fceaeec4000 > nid=0x310 waiting on condition [0x00007fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1589) > at > kafka.zk.KafkaZkClient.deletePreferredReplicaElection(KafkaZkClient.scala:989) > at > kafka.controller.KafkaController.removePartitionsFromPreferredReplicaElection(KafkaController.scala:873) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:631) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:266) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1221) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1508) > at > kafka.controller.KafkaController$RegisterBrokerAndReelect$.process(KafkaController.scala:1517) > at > kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:89) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$Lambda$362/918424570.apply$mcV$sp(Unknown > Source) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:89) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Locked ownable synchronizers: > - None -- This message was sent by Atlassian JIRA (v7.6.3#76005)