[ https://issues.apache.org/jira/browse/KAFKA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot resolved KAFKA-13428. --------------------------------- Resolution: Duplicate > server hang on shutdown > ----------------------- > > Key: KAFKA-13428 > URL: https://issues.apache.org/jira/browse/KAFKA-13428 > Project: Kafka > Issue Type: Improvement > Components: core > Affects Versions: 2.5.0 > Reporter: yuhuo > Priority: Major > > Kafka Server start as follow step: > 1. socketServer.startup > 2. zkClient.registerBroker > 3. socketServer.startDataPlaneProcessors > after step1,the port can be connnected, but default processors queue size is > 20, if there is many connections, the AcceptorThread will wait on processors > queue put. and then, if registerBroker error(such as zk session not expired, > the broker id still exists), server will go shutdown and never start network > processors, AcceptorThread will shutdown fail because thread still wait on > queue, at last server is hang. > stack: > {code:java} > //代码占位符 > ... > "data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" > #35 prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable > [0x00007f5f60f8b000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3) > - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet) > - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at kafka.network.Acceptor.run(SocketServer.scala:534) > at java.lang.Thread.run(Thread.java:748) > "data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 > prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition > [0x00007f5f63315000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006497b88d8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) > at kafka.network.Processor.accept(SocketServer.scala:1002) > at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633) > at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560) > at kafka.network.Acceptor.run(SocketServer.scala:544) > at java.lang.Thread.run(Thread.java:748) > ... > "main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on > condition [0x00007f5f8a0cf000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006497b8df8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430) > at kafka.network.Acceptor.shutdown(SocketServer.scala:517) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267) > at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213) > at > kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267) > - locked <0x00000006497b8e98> (a kafka.network.SocketServer) > at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617) > at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617) > at kafka.server.KafkaServer.startup(KafkaServer.scala:358) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)