[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182258#comment-14182258 ]
Jun Rao commented on KAFKA-1716: -------------------------------- Hmm, then it's not clear why SimpleConsumer is blocked on reading from the socket. Could you turn on the request log on the broker and see if the fetch request just takes long or it never completes? > hang during shutdown of ZookeeperConsumerConnector > -------------------------------------------------- > > Key: KAFKA-1716 > URL: https://issues.apache.org/jira/browse/KAFKA-1716 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: Sean Fay > Assignee: Neha Narkhede > > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to > wedge in the case that some consumer fetcher threads receive messages during > the shutdown process. > Shutdown thread: > {code} -- Parking to wait for: > java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) > at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > at > scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) > at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) > at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) > at > scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] > at > kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > at > kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) > at > kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} > ConsumerFetcherThread: > {code} -- Parking to wait for: > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at kafka/utils/Utils$.inLock(Utils.scala:538) > at > kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > at > kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) > at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)