[ https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-2253: --------------------------------- Resolution: Fixed Status: Resolved (was: Patch Available) > Deadlock in delayed operation purgatory > --------------------------------------- > > Key: KAFKA-2253 > URL: https://issues.apache.org/jira/browse/KAFKA-2253 > Project: Kafka > Issue Type: Bug > Reporter: Mayuresh Gharat > Assignee: Guozhang Wang > Fix For: 0.8.3 > > Attachments: KAFKA-2253.patch, KAFKA-2253.patch, > KAFKA-2253_2015-06-08_11:47:40.patch > > > We hit a deadlock while running brokers with git hash: > 9e894aa0173b14d64a900bcf780d6b7809368384 > There's a circular wait between the removeWatchersLock and an operations > intrinsic lock. > {code} > Found one Java-level deadlock: > ============================= > "kafka-request-handler-a": > waiting for ownable synchronizer 0x00000006da08f9e0, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "ExpirationReaper-xyz" > "ExpirationReaper-xyz": > waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a > java.util.LinkedList), > which is held by "kafka-request-handler-b" > "kafka-request-handler-b": > waiting for ownable synchronizer 0x00000006da08f9e0, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "ExpirationReaper-xyz" > "kafka-request-handler-a": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006da08f9e0> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283) > at > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224) > at > kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) > at > kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268) > at > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787) > at scala.collection.immutable.Map$Map4.foreach(Map.scala:181) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312) > at kafka.server.KafkaApis.handle(KafkaApis.scala:60) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > "ExpirationReaper-xyz": > at > kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278) > - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at > kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) > at > kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322) > - locked <0x000000071a86a478> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > "kafka-request-handler-b": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006da08f9e0> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at > kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303) > - locked <0x00000006b0563fe8> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228) > at > kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) > at > kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272) > at kafka.server.KafkaApis.handle(KafkaApis.scala:59) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Found 1 deadlock. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)