[
https://issues.apache.org/jira/browse/KAFKA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot resolved KAFKA-13428.
---------------------------------
Resolution: Duplicate
> server hang on shutdown
> -----------------------
>
> Key: KAFKA-13428
> URL: https://issues.apache.org/jira/browse/KAFKA-13428
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Affects Versions: 2.5.0
> Reporter: yuhuo
> Priority: Major
>
> Kafka Server start as follow step:
> 1. socketServer.startup
> 2. zkClient.registerBroker
> 3. socketServer.startDataPlaneProcessors
> after step1,the port can be connnected, but default processors queue size is
> 20, if there is many connections, the AcceptorThread will wait on processors
> queue put. and then, if registerBroker error(such as zk session not expired,
> the broker id still exists), server will go shutdown and never start network
> processors, AcceptorThread will shutdown fail because thread still wait on
> queue, at last server is hang.
> stack:
> {code:java}
> //代码占位符
> ...
> "data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094"
> #35 prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable
> [0x00007f5f60f8b000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
> - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at kafka.network.Acceptor.run(SocketServer.scala:534)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34
> prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition
> [0x00007f5f63315000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006497b88d8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
> at kafka.network.Processor.accept(SocketServer.scala:1002)
> at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
> at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
> at kafka.network.Acceptor.run(SocketServer.scala:544)
> at java.lang.Thread.run(Thread.java:748)
> ...
> "main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on
> condition [0x00007f5f8a0cf000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000006497b8df8> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
> at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
> at
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
> at
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
> at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown
> Source)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
> at
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
> - locked <0x00000006497b8e98> (a kafka.network.SocketServer)
> at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
> at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown
> Source)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
>
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)