[ https://issues.apache.org/jira/browse/KAFKA-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Onur Karaman updated KAFKA-2253: -------------------------------- Description: We hit a deadlock while running brokers with git hash: 9e894aa0173b14d64a900bcf780d6b7809368384 {code} Found one Java-level deadlock: ============================= "kafka-request-handler-a": waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), which is held by "ExpirationReaper-xyz" "ExpirationReaper-xyz": waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a java.util.LinkedList), which is held by "kafka-request-handler-b" "kafka-request-handler-b": waiting for ownable synchronizer 0x00000006da08f9e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), which is held by "ExpirationReaper-xyz" "kafka-request-handler-a": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283) at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224) at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) at kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358) at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288) at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) at kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268) at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244) at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790) at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787) at scala.collection.immutable.Map$Map4.foreach(Map.scala:181) at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312) at kafka.server.KafkaApis.handle(KafkaApis.scala:60) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) "ExpirationReaper-xyz": at kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278) - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList) at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) at kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322) - locked <0x000000071a86a478> (a java.util.LinkedList) at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "kafka-request-handler-b": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006da08f9e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303) - locked <0x00000006b0563fe8> (a java.util.LinkedList) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228) at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Found 1 deadlock. {code} was:With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. > Deadlock in delayed operation purgatory > --------------------------------------- > > Key: KAFKA-2253 > URL: https://issues.apache.org/jira/browse/KAFKA-2253 > Project: Kafka > Issue Type: Bug > Reporter: Mayuresh Gharat > Assignee: Guozhang Wang > > We hit a deadlock while running brokers with git hash: > 9e894aa0173b14d64a900bcf780d6b7809368384 > {code} > Found one Java-level deadlock: > ============================= > "kafka-request-handler-a": > waiting for ownable synchronizer 0x00000006da08f9e0, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "ExpirationReaper-xyz" > "ExpirationReaper-xyz": > waiting to lock monitor 0x00007f4500004e18 (object 0x00000006b0563fe8, a > java.util.LinkedList), > which is held by "kafka-request-handler-b" > "kafka-request-handler-b": > waiting for ownable synchronizer 0x00000006da08f9e0, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "ExpirationReaper-xyz" > "kafka-request-handler-a": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006da08f9e0> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283) > at > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:224) > at > kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) > at > kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:358) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:288) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) > at > kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:270) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:268) > at > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:244) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:790) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:787) > at scala.collection.immutable.Map$Map4.foreach(Map.scala:181) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:787) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:432) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:312) > at kafka.server.KafkaApis.handle(KafkaApis.scala:60) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > "ExpirationReaper-xyz": > at > kafka.server.DelayedOperationPurgatory$Watchers.watched(DelayedOperation.scala:278) > - waiting to lock <0x00000006b0563fe8> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty$1.apply(DelayedOperation.scala:258) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at > kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) > at > kafka.server.DelayedOperationPurgatory$Watchers.purgeCompleted(DelayedOperation.scala:322) > - locked <0x000000071a86a478> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper$$anonfun$3.apply(DelayedOperation.scala:347) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:347) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > "kafka-request-handler-b": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006da08f9e0> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:296) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:306) > at > kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$removeKeyIfEmpty(DelayedOperation.scala:256) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:303) > - locked <0x00000006b0563fe8> (a java.util.LinkedList) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:228) > at > kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:166) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:426) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:410) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:304) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:410) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) > at > kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:272) > at kafka.server.KafkaApis.handle(KafkaApis.scala:59) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Found 1 deadlock. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)