[ 
https://issues.apache.org/jira/browse/KAFKA-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15003164#comment-15003164
 ] 

ASF GitHub Bot commented on KAFKA-2821:
---------------------------------------

GitHub user hachikuji opened a pull request:

    https://github.com/apache/kafka/pull/519

    KAFKA-2821: fix deadlock in group metadata write callback

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hachikuji/kafka KAFKA-2821

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #519
    
----
commit 06b66cd2270f1936972e487ba8670463804ac109
Author: Jason Gustafson <ja...@confluent.io>
Date:   2015-11-12T22:21:56Z

    KAFKA-2821: fix deadlock in group metadata write callback

commit f33d96612eb8162387ea94b80c7bab1f2e5d868b
Author: Jason Gustafson <ja...@confluent.io>
Date:   2015-11-12T22:56:29Z

    Move metadata/offset message append outside group metadata lock

commit b4d02dc2331b6e80f530a7e0963e3bc50433f434
Author: Jason Gustafson <ja...@confluent.io>
Date:   2015-11-12T23:08:25Z

    minor renaming

----


> Deadlock in group metadata persistence callback
> -----------------------------------------------
>
>                 Key: KAFKA-2821
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2821
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Blocker
>
> Found this when system testing:
> {code}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-7":
>   waiting for ownable synchronizer 0x00000000bc0007f8, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "kafka-request-handler-6"
> "kafka-request-handler-6":
>   waiting to lock monitor 0x00007fcb94004e28 (object 0x00000000bc000c70, a 
> kafka.coordinator.GroupMetadata),
>   which is held by "kafka-request-handler-2"
> "kafka-request-handler-2":
>   waiting for ownable synchronizer 0x00000000bc0007f8, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "kafka-request-handler-6"
> Java stack information for the threads listed above:
> ===================================================
> "kafka-request-handler-7":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000bc0007f8> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
>         at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
>         at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
>         at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
>         at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
>         at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-6":
>         at 
> kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:289)
>         - waiting to lock <0x00000000bc000c70> (a 
> kafka.coordinator.GroupMetadata)
>         at 
> kafka.coordinator.GroupCoordinator$$anonfun$doSyncGroup$2.apply(GroupCoordinator.scala:284)
>         at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$1(GroupMetadataManager.scala:220)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$storeGroup$1.apply(GroupMetadataManager.scala:229)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>         at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
>         at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>         - locked <0x00000000bc000d20> (a kafka.server.DelayedProduce)
>         - locked <0x00000000bc000d60> (a java.util.LinkedList)
>         at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>         at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:194)
>         at 
> kafka.cluster.Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Partition.scala:349)
>         at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply$mcV$sp(Partition.scala:278)
>         at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
>         at 
> kafka.cluster.Partition$$anonfun$maybeExpandIsr$1.apply(Partition.scala:260)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
>         at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:258)
>         at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:235)
>        at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:841)
>         at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:838)
>         at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:838)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:471)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:433)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-2":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000bc0007f8> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
>         at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:400)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:405)
>         at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:390)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:390)
>         at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:326)
>         at 
> kafka.coordinator.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:224)
>         at 
> kafka.coordinator.GroupCoordinator.doSyncGroup(GroupCoordinator.scala:284)
>         - locked <0x00000000bc000c70> (a kafka.coordinator.GroupMetadata)
>         at 
> kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
>         at 
> kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:248)
>         at kafka.server.KafkaApis.handleSyncGroupRequest(KafkaApis.scala:818)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:81)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> It looks like the cause is trying to grab the group metadata lock inside the 
> storeGroup callback while holding the leaderIsrUpdateLock in 
> kafka.cluster.Partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to