[ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
------------------------------
    Description: 
In our production cluster, we noticed that when a large consumer group (a few 
thousand members) is rebalancing, the produce latency of the coordinator broker 
can jump to several seconds.

 

When this happens, jstack shows all the request handler threads of the broker 
are waiting for group lock:
{noformat}
"kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 
nid=0x1b985 waiting on condition [0x00007f98f1adb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000024aa73b20> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
        at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
        at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
        at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
        at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
        at java.lang.Thread.run(Thread.java:745){noformat}
  

Besides one thread that is either doing GroupMetadata.supportsProtocols():
{noformat}
"kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 
nid=0x1b984 runnable [0x00007f98f1bdc000]
   java.lang.Thread.State: RUNNABLE
        at scala.collection.immutable.List.map(List.scala:284)
        at 
kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
        at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
        at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
        at scala.collection.immutable.List.map(List.scala:288)
        at 
kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
        at 
kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
        at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
        at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
        at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
        at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
        at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
        at java.lang.Thread.run(Thread.java:745){noformat}
or GroupCoordinator.tryCompleteJoin
{noformat}
"kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 
nid=0x1ceff runnable [0x00007fe8701ca000]
   java.lang.Thread.State: RUNNABLE
        at 
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
        at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
        at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
        at 
kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
        at 
kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767)
        at 
kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
        at 
kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
        at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
        at 
kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766)
        at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38)
        at 
kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298)
        at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
        at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
        at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
        at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
        at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
        at java.lang.Thread.run(Thread.java:745){noformat}
 

Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin 
are O(N) operations. This makes the group rebalancing to be an O(N^2) 
operation. In spite of how many brokers are there in the cluster and how many 
cores are there in the broker, those consumer group operations can only be 
processed by a single thread.

My trace log messages show that each GroupMetadata.supportsProtocols() call on 
a 3000 member group takes 30ms in average.

Both of the 2 operations can be done in O(1) time, with some data structures 
tracing the supported protocols and # of "not yet joined" members when adding / 
removing / updating members.

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7142
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7142
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.10.2.0
>            Reporter: Ying Zheng
>            Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 
> nid=0x1b985 waiting on condition [0x00007f98f1adb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x000000024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 
> nid=0x1b984 runnable [0x00007f98f1bdc000]
>    java.lang.Thread.State: RUNNABLE
>         at scala.collection.immutable.List.map(List.scala:284)
>         at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
>         at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
>         at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
>         at scala.collection.immutable.List.map(List.scala:288)
>         at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
>         at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
>         at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 
> nid=0x1ceff runnable [0x00007fe8701ca000]
>    java.lang.Thread.State: RUNNABLE
>         at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
>         at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
>         at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at 
> kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
>         at 
> kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766)
>         at 
> kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38)
>         at 
> kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
>         at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396)
>         at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298)
>         at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189)
>         at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
>         at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:745){noformat}
>  
> Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin 
> are O(N) operations. This makes the group rebalancing to be an O(N^2) 
> operation. In spite of how many brokers are there in the cluster and how many 
> cores are there in the broker, those consumer group operations can only be 
> processed by a single thread.
> My trace log messages show that each GroupMetadata.supportsProtocols() call 
> on a 3000 member group takes 30ms in average.
> Both of the 2 operations can be done in O(1) time, with some data structures 
> tracing the supported protocols and # of "not yet joined" members when adding 
> / removing / updating members.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to