David Jacot created KAFKA-9796: ---------------------------------- Summary: Broker shutdown could be stuck forever under certain conditions Key: KAFKA-9796 URL: https://issues.apache.org/jira/browse/KAFKA-9796 Project: Kafka Issue Type: Bug Reporter: David Jacot Assignee: David Jacot
During the broker initialisation, the Acceptor threads are started early to know the bound port and delays starting the processors to the end of the initialisation sequence. We have found out that the shutdown of a broker could be stuck forever under the following conditions: - the shutdown procedure is started before the processors are started; - the `newConnections` queues of the processors are full; and - an extra new connection has been accepted but can't be queued up in a processor. For instance, this could happen if a `NodeExistsException` is raised when the broker tries to register itself in ZK. When the above conditions happens, the shutting down triggers the shutdown of the acceptor threads and waits until they are (first thread dump bellow). If an acceptor as a pending connection which can't be queued up in a processor, it ends up waiting until space is made is new queue to accept the new connection (second thread dump bellow). As the processors are not started, the new connection queues are not drained so it never releases the acceptor thread. *Shutdown wait on acceptor to shutdown* {noformat} "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s tid=0x00007f625001c800 nid=0x272 waiting on condition [0x00007f6257ca4000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) - parking to wait for <0x0000000689a61800> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232) at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430) at kafka.network.Acceptor.shutdown(SocketServer.scala:521) at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267) at kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267) at kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213) at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267) - locked <0x0000000689a61ac0> (a kafka.network.SocketServer) at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806) at kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown Source) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806) at kafka.server.KafkaServer.startup(KafkaServer.scala:522) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {noformat} *Acceptor waits on processor to accept the new connection* {noformat} "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000 nid=0x2ca waiting on condition [0x00007f6157130000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) - parking to wait for <0x0000000689a7cad8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.5/AbstractQueuedSynchronizer.java:2081) at java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.5/ArrayBlockingQueue.java:367) at kafka.network.Processor.accept(SocketServer.scala:1020) at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639) at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566) at kafka.network.Acceptor.run(SocketServer.scala:550) at java.lang.Thread.run(java.base@11.0.5/Thread.java:834) {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)