Joel Koshy created KAFKA-916: -------------------------------- Summary: Deadlock between fetcher shutdown and handling partitions with error Key: KAFKA-916 URL: https://issues.apache.org/jira/browse/KAFKA-916 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8
Here is another consumer deadlock that we encountered. All consumers are vulnerable to this during a rebalance if there happen to be partitions in error. On a rebalance, the fetcher manager closes all fetchers and this holds on to the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] While the fetcher manager is iterating over fetchers to stop them, a fetcher that is yet to be stopped hits an error on a partition and proceeds to handle partitions with error [t2]. This handling involves looking up the fetcher for that partition and then removing it from the fetcher's set of partitions to consume. This requires grabbing the same map lock in [t1], hence the deadlock. [t1] 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b waiting on condition [0x00007f1b2bd38000] 2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking) 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) 2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a java.util.concurrent.CountDownLatch$Sync) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a java.lang.Object) 2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) 2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) 2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-22_20:23:11.95773 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) 2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a java.lang.Object) 2013-05-22_20:23:11.95773 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680) 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754) 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74) 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69) 2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a java.lang.Object) 2013-05-22_20:23:11.95774 at kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46) 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33) 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721) 2013-05-22_20:23:11.95775 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) 2013-05-22_20:23:11.95776 at kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) 2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-22_20:23:11.95776 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-22_20:23:11.95776 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) 2013-05-22_20:23:11.95777 at scala.collection.immutable.List.foreach(List.scala:45) 2013-05-22_20:23:11.95777 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 2013-05-22_20:23:11.95777 at scala.collection.immutable.List.map(List.scala:45) 2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) 2013-05-22_20:23:11.95777 at kafka.tools.MirrorMaker.main(MirrorMaker.scala) [t2] 2013-05-22_20:23:11.87465 "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 tid=0x00007f196401a800 nid=0x717a waiting for monitor entry [0x00007f19bf0ef000] 2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object monitor) 2013-05-22_20:23:11.87467 at kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57) ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> (a java.lang.Object) 2013-05-22_20:23:11.87468 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) 2013-05-22_20:23:11.95682 at kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) 2013-05-22_20:23:11.95683 at scala.collection.mutable.HashSet.foreach(HashSet.scala:61) 2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170) 2013-05-22_20:23:11.95684 at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69) 2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168) 2013-05-22_20:23:11.95684 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 2013-05-22_20:23:11.95684 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2013-05-22_20:23:11.95686 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000] 2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking) 2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method) 2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 2013-05-22_20:23:11.95687 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-22_20:23:11.95687 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) 2013-05-22_20:23:11.95687 at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) 2013-05-22_20:23:11.95687 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira