Joel Koshy created KAFKA-914:
--------------------------------
Summary: Deadlock between initial rebalance and watcher-triggered
rebalances
Key: KAFKA-914
URL: https://issues.apache.org/jira/browse/KAFKA-914
Project: Kafka
Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
Fix For: 0.8
Summary doesn't give the full picture and the fetcher-manager/fetcher-thread
code is very complex so it's a bit hard to articulate the following very
clearly. I will try and describe the sequence that results in a deadlock
when starting up a large number of consumers at around the same time:
- When a consumer's createMessageStream method is called, it initiates an
initial inline rebalance.
- However, before the above initial rebalance actually begins, a ZK watch
may trigger (due to some other consumers starting up) and initiate a
rebalance. This happens successfully so fetchers start and start filling
up the chunk queues.
- Another watch triggers and initiates yet another rebalance. This rebalance
attempt tries to close the fetchers. Before the fetchers are stopped, we
shutdown the leader-finder-thread to prevent new fetchers from being
started.
- The shutdown is accomplished by interrupting the leader-finder-thread and
then awaiting its shutdown latch.
- If the leader-finder-thread still has a partition without leader to
process and tries to add a fetcher for it, it will get an exception
(InterruptedException if acquiring the partitionMapLock or
ClosedByInterruptException if performing an offset request). If we get an
InterruptedException the thread's interrupted flag is cleared.
- However, the leader-finder-thread may have multiple partitions without
leader that it is currently processing. So if the interrupted flag is
cleared and the leader-finder-thread tries to add a fetcher for a another
partition, it does not receive an InterruptedException when it tries to
acquire the partitionMapLock. It can end up blocking indefinitely at that
point.
- The problem is that until now, the createMessageStream's initial inline
rebalance has not yet returned - it is blocked on the rebalance lock
waiting on the second watch-triggered rebalance to complete. i.e., the
consumer iterators have not been created yet and thus the fetcher queues
get filled up. [td1]
- As a result, processPartitionData (which holds on to the partitionMapLock)
in one or more fetchers will be blocked trying to enqueue into a full
chunk queue.[td2]
- So the leader-finder-thread cannot finish adding fetchers for the
remaining partitions without leader and thus cannot shutdown.
One way to fix would be to let the exception from the leader-finder-thread
propagate outside if the leader-finder-thread is currently shutting down -
and avoid the subsequent (unnecessary) attempt to add a fetcher and lock
partitionMapLock. There may be more elegant fixes (such as rewriting the
whole consumer manager logic) but obviously we want to avoid extensive
changes at this point in 0.8.
Relevant portions of the thread-dump are here:
[td1] createMessageStream's initial inline rebalance (blocked on the ongoing
watch-triggered rebalance)
2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772
waiting for monitor entry [0x00007f59666c3000]
2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object
monitor)
2013-05-20_17:50:13.04848 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a
java.lang.Object)
2013-05-20_17:50:13.04849 at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-20_17:50:13.04850 at
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
2013-05-20_17:50:13.04850 at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-20_17:50:13.04850 at
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-20_17:50:13.04850 at
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-20_17:50:13.04850 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-20_17:50:13.04851 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-20_17:50:13.04851 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-20_17:50:13.04851 at
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-20_17:50:13.04851 at
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-20_17:50:13.04852 at
scala.collection.immutable.List.map(List.scala:45)
2013-05-20_17:50:13.04852 at
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-20_17:50:13.04852 at
kafka.tools.MirrorMaker.main(MirrorMaker.scala)
[td2] A consumer fetcher thread blocked on full queue.
2013-05-20_17:50:13.04703
"ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10
tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00
007f58316ae000]
2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
2013-05-20_17:50:13.04704 at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04704 at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
2013-05-20_17:50:13.04704 at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
2013-05-20_17:50:13.04704 at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
2013-05-20_17:50:13.04705 at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
2013-05-20_17:50:13.04706 at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
2013-05-20_17:50:13.04707 at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
2013-05-20_17:50:13.04708 at
scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
2013-05-20_17:50:13.04709 at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
2013-05-20_17:50:13.04709 at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
2013-05-20_17:50:13.04709 at
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2
[td3] Second watch-triggered rebalance
2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor"
prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition
[0x00007f58318b00
00]
2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8> (a
java.util.concurrent.CountDownLatch$Sync)
2013-05-20_17:50:13.04726 at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04727 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-20_17:50:13.04727 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-20_17:50:13.04728 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-20_17:50:13.04728 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-20_17:50:13.04729 at
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-20_17:50:13.04729 at
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-20_17:50:13.04730 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-20_17:50:13.04730 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-20_17:50:13.04731 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-20_17:50:13.04731 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-20_17:50:13.04732 at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-20_17:50:13.04733 at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-20_17:50:13.04733 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a
java.lang.Object)
2013-05-20_17:50:13.04734 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)
[td4] leader-finder-thread still trying to process partitions without leader,
blocked on the partitionMapLock held by processPartitionData in td2.
2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread"
prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831
7af000]
2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking)
2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method)
2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)
2013-05-20_17:50:13.04713 at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-20_17:50:13.04714 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-20_17:50:13.04714 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
2013-05-20_17:50:13.04717 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
2013-05-20_17:50:13.04718 at
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
2013-05-20_17:50:13.04718 at
kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
2013-05-20_17:50:13.04719 at
kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48)
2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a
java.lang.Object)
2013-05-20_17:50:13.04719 at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
2013-05-20_17:50:13.04720 at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
2013-05-20_17:50:13.04721 at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-20_17:50:13.04721 at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-20_17:50:13.04721 at
scala.collection.Iterator$class.foreach(Iterator.scala:631)
2013-05-20_17:50:13.04722 at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
2013-05-20_17:50:13.04723 at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
2013-05-20_17:50:13.04723 at
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
2013-05-20_17:50:13.04723 at
scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
2013-05-20_17:50:13.04724 at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
2013-05-20_17:50:13.04724 at
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira
