[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13665299#comment-13665299 ]
Jun Rao commented on KAFKA-916: ------------------------------- Thanks for finding this. The deadlock is introduced because AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread to stop. This is only happening for the ConsumerFetcherThread. So, one possible fix is to remove the error partitions from the fetcher directly in ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through ConsumerFetcherManager. This will break the cycle. > Deadlock between fetcher shutdown and handling partitions with error > -------------------------------------------------------------------- > > Key: KAFKA-916 > URL: https://issues.apache.org/jira/browse/KAFKA-916 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Joel Koshy > Fix For: 0.8 > > > Here is another consumer deadlock that we encountered. All consumers are > vulnerable to this during a rebalance if there happen to be partitions in > error. > On a rebalance, the fetcher manager closes all fetchers and this holds on to > the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] > While the fetcher manager is iterating over fetchers to stop them, a fetcher > that is yet to be stopped hits an error on a partition and proceeds to > handle partitions with error [t2]. This handling involves looking up the > fetcher for that partition and then removing it from the fetcher's set of > partitions to consume. This requires grabbing the same map lock in [t1], > hence the deadlock. > [t1] > 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b > waiting on condition [0x00007f1b2bd38000] > 2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a > java.util.concurrent.CountDownLatch$Sync) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > 2013-05-22_20:23:11.95768 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95770 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > 2013-05-22_20:23:11.95771 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > 2013-05-22_20:23:11.95771 at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) > ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a > java.lang.Object) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > 2013-05-22_20:23:11.95772 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > 2013-05-22_20:23:11.95773 at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > 2013-05-22_20:23:11.95773 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) > 2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a > java.lang.Object) > 2013-05-22_20:23:11.95773 at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69) > 2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a > java.lang.Object) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) > 2013-05-22_20:23:11.95776 at > kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95776 at > kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95776 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > 2013-05-22_20:23:11.95776 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > 2013-05-22_20:23:11.95776 at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > 2013-05-22_20:23:11.95777 at > scala.collection.immutable.List.foreach(List.scala:45) > 2013-05-22_20:23:11.95777 at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > 2013-05-22_20:23:11.95777 at > scala.collection.immutable.List.map(List.scala:45) > 2013-05-22_20:23:11.95777 at > kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95777 at > kafka.tools.MirrorMaker.main(MirrorMaker.scala) > [t2] > 2013-05-22_20:23:11.87465 > "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 > tid=0x00007f196401a800 nid=0x717a waiting for monitor entry > [0x00007f19bf0ef000] > 2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object > monitor) > 2013-05-22_20:23:11.87467 at > kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57) > ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> > (a java.lang.Object) > 2013-05-22_20:23:11.87468 at > kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95682 at > kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95683 at > scala.collection.mutable.HashSet.foreach(HashSet.scala:61) > 2013-05-22_20:23:11.95684 at > kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95684 at > kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69) > 2013-05-22_20:23:11.95684 at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168) > 2013-05-22_20:23:11.95684 at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > 2013-05-22_20:23:11.95684 at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 2013-05-22_20:23:11.95686 > 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 > tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000] > 2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > 2013-05-22_20:23:11.95687 at > org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira