[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570734#comment-16570734 ]
ASF GitHub Bot commented on KAFKA-7142: --------------------------------------- harshach closed pull request #5354: KAFKA-7142: fix joinGroup performance issues URL: https://github.com/apache/kafka/pull/5354 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 6ca443f66eb..c4e6dc97137 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int, case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { - if (member.awaitingJoinCallback != null) { - member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR)) - member.awaitingJoinCallback = null - } + group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) } + joinPurgatory.checkAndComplete(GroupKey(group.groupId)) case Stable | CompletingRebalance => @@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int, val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) - member.awaitingJoinCallback = callback // update the newMemberAdded flag to indicate that the join group can be further delayed if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true - group.add(member) + group.add(member, callback) maybePrepareRebalance(group, s"Adding new member $memberId") member } @@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int, member: MemberMetadata, protocols: List[(String, Array[Byte])], callback: JoinCallback) { - member.supportedProtocols = protocols - member.awaitingJoinCallback = callback + group.updateMember(member, protocols, callback) maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}") } @@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int, def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { group.inLock { - if (group.notYetRejoinedMembers.isEmpty) + if (group.hasAllMembersJoined) forceComplete() else false } @@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int, leaderId = group.leaderOrNull, error = Errors.NONE) - member.awaitingJoinCallback(joinResult) - member.awaitingJoinCallback = null + group.invokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index d729449af4e..cbe78e980b6 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -128,7 +128,7 @@ private object GroupMetadata { group.protocol = Option(protocol) group.leaderId = Option(leaderId) group.currentStateTimestamp = currentStateTimestamp - members.foreach(group.add) + members.foreach(group.add(_, null)) group } } @@ -172,6 +172,8 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs */ @nonthreadsafe private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging { + type JoinCallback = JoinGroupResult => Unit + private[group] val lock = new ReentrantLock private var state: GroupState = initialState @@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private var protocol: Option[String] = None private val members = new mutable.HashMap[String, MemberMetadata] + private var numMembersAwaitingJoin = 0 + private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0) private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() @@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def protocolOrNull: String = protocol.orNull def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1) - def add(member: MemberMetadata) { + def add(member: MemberMetadata, callback: JoinCallback = null) { if (members.isEmpty) this.protocolType = Some(member.protocolType) @@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (leaderId.isEmpty) leaderId = Some(member.memberId) members.put(member.memberId, member) + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } + member.awaitingJoinCallback = callback + if (member.awaitingJoinCallback != null) + numMembersAwaitingJoin += 1; } def remove(memberId: String) { - members.remove(memberId) + members.remove(memberId).foreach { member => + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 } + if (member.awaitingJoinCallback != null) + numMembersAwaitingJoin -= 1 + } + if (isLeader(memberId)) { leaderId = if (members.isEmpty) { None @@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList + def hasAllMembersJoined = members.size <= numMembersAwaitingJoin + def allMembers = members.keySet def allMemberMetadata = members.values.toList @@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private def candidateProtocols = { // get the set of protocols that are commonly supported by all members - allMemberMetadata - .map(_.protocols) - .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols) + val numMembers = members.size + supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet } def supportsProtocols(memberProtocols: Set[String]) = { - members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty + val numMembers = members.size + members.isEmpty || memberProtocols.exists(supportedProtocols(_) == numMembers) + } + + def updateMember(member: MemberMetadata, + protocols: List[(String, Array[Byte])], + callback: JoinCallback) = { + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 } + protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } + member.supportedProtocols = protocols + + if (callback != null && member.awaitingJoinCallback == null) { + numMembersAwaitingJoin += 1; + } else if (callback == null && member.awaitingJoinCallback != null) { + numMembersAwaitingJoin -= 1; + } + member.awaitingJoinCallback = callback + } + + def invokeJoinCallback(member: MemberMetadata, + joinGroupResult: JoinGroupResult) : Unit = { + if (member.awaitingJoinCallback != null) { + member.awaitingJoinCallback(joinGroupResult) + member.awaitingJoinCallback = null + numMembersAwaitingJoin -= 1; + } } def initNextGeneration() = { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 77e6fdc3683..21c13658e79 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -843,8 +843,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -873,8 +872,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest { val subscription = new Subscription(List(topic).asJava) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 90545339ca5..ac12804b1d2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite { protocolType, List(("roundrobin", Array.empty[Byte]))) group.transitionTo(PreparingRebalance) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) assertEquals(0, group.generationId) assertNull(group.protocolOrNull) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rebalancing large consumer group can block the coordinator broker for several > seconds > ------------------------------------------------------------------------------------- > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 > Reporter: Ying Zheng > Assignee: Ying Zheng > Priority: Major > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > Group rebalance is a very frequent operation, it can be triggered by adding / > removing / restarting a single member in the consumer group. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 > nid=0x1b985 waiting on condition [0x00007f98f1adb000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 > nid=0x1b984 runnable [0x00007f98f1bdc000] > java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 > nid=0x1ceff runnable [0x00007fe8701ca000] > java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189) > at > kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766) > at > kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38) > at > kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin > are O(N) operations. This makes the group rebalancing to be an O(N^2) > operation. In spite of how many brokers are there in the cluster and how many > cores are there in the broker, those consumer group operations can only be > processed by a single thread. > My trace log messages show that each GroupMetadata.supportsProtocols() call > on a 3000 member group takes 30ms in average. > Both of the 2 operations can be done in O(1) time, with some data structures > tracing the supported protocols and # of "not yet joined" members when adding > / removing / updating members. -- This message was sent by Atlassian JIRA (v7.6.3#76005)