Sean Fay created KAFKA-1716: ------------------------------- Summary: hang during shutdown of ZookeeperConsumerConnector Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede
It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code} -- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code} -- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)