[ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13665299#comment-13665299
 ] 

Jun Rao commented on KAFKA-916:
-------------------------------

Thanks for finding this. The deadlock is introduced because 
AbstractFetcherManager.removeFetcher() can be called from AbstractFetcherThread 
and AbstractFetcherManager.closeAllFetchers can wait for AbstractFetcherThread 
to stop. This is only happening for the ConsumerFetcherThread. So, one possible 
fix is to remove the error partitions from the fetcher directly in 
ConsumerFetcherThread.handlePartitionsWithErrors(), instead of going through 
ConsumerFetcherManager. This will break the cycle.
                
> Deadlock between fetcher shutdown and handling partitions with error
> --------------------------------------------------------------------
>
>                 Key: KAFKA-916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-916
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>             Fix For: 0.8
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b 
> waiting on condition [0x00007f1b2bd38000]
> 2013-05-22_20:23:11.95767    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767     at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767     - parking to wait for  <0x00007f1a25780598> (a 
> java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767     at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768     at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771     at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771     at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771        - locked <0x00007f1a2ae92510> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95771     at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772     at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773     at 
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> 2013-05-22_20:23:11.95773     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
> 2013-05-22_20:23:11.95773     - locked <0x00007f1a2a29b450> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95773     at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
> 2013-05-22_20:23:11.95774     - locked <0x00007f1a2a69b1d8> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
> 2013-05-22_20:23:11.95776     at 
> kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776     at 
> kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.immutable.List.foreach(List.scala:45)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.immutable.List.map(List.scala:45)
> 2013-05-22_20:23:11.95777     at 
> kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95777     at 
> kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> [t2]
> 2013-05-22_20:23:11.87465 
> "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 
> tid=0x00007f196401a800 nid=0x717a waiting for monitor entry 
> [0x00007f19bf0ef000]
> 2013-05-22_20:23:11.87466    java.lang.Thread.State: BLOCKED (on object 
> monitor)
> 2013-05-22_20:23:11.87467     at 
> kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
> ---> 2013-05-22_20:23:11.87467        - waiting to lock <0x00007f1a2ae92510> 
> (a java.lang.Object)
> 2013-05-22_20:23:11.87468     at 
> kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95682     at 
> kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95683     at 
> scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
> 2013-05-22_20:23:11.95684     at 
> kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95684     at 
> kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
> 2013-05-22_20:23:11.95684     at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
> 2013-05-22_20:23:11.95684     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 2013-05-22_20:23:11.95684     at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 2013-05-22_20:23:11.95686 
> 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 
> tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
> 2013-05-22_20:23:11.95686    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95686     at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95686     - parking to wait for  <0x00007f1a2a4426f8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> 2013-05-22_20:23:11.95687     at 
> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to