[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179574#comment-14179574 ]
Chris Richardson commented on KAFKA-1716: ----------------------------------------- I have some tests that create 30+ ZookeeperConsumerConnectors, use them for a while and then attempt to shut them down. With each test run: a few of them shutdown successfully but most consumers hang in shutdown() with an identical stack trace as shown above. The ConsumerFetcherThread stack trace is different though. I'll repeat the tests in the next couple of days and share what I find. > hang during shutdown of ZookeeperConsumerConnector > -------------------------------------------------- > > Key: KAFKA-1716 > URL: https://issues.apache.org/jira/browse/KAFKA-1716 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: Sean Fay > Assignee: Neha Narkhede > > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to > wedge in the case that some consumer fetcher threads receive messages during > the shutdown process. > Shutdown thread: > {code} -- Parking to wait for: > java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) > at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > at > scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) > at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) > at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) > at > scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] > at > kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > at > kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) > at > kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} > ConsumerFetcherThread: > {code} -- Parking to wait for: > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at kafka/utils/Utils$.inLock(Utils.scala:538) > at > kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > at > kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) > at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)