dajac commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r582631095



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -602,6 +765,30 @@ class GroupCoordinator(val brokerId: Int,
     groupError -> partitionErrors
   }
 
+  private def validateHeartbeat(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else {
+      validateCurrentMember(
+        group,
+        memberId,
+        groupInstanceId,
+        operation = "sync-group"

Review comment:
       `sync-group` => `heartbeat`?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -247,16 +250,20 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
     if (leaderId.isEmpty)
       leaderId = Some(member.memberId)
+
     members.put(member.memberId, member)
-    member.supportedProtocols.foreach{ case (protocol, _) => 
supportedProtocols(protocol) += 1 }
+    member.supportedProtocols.foreach { case (protocol, _) => 
supportedProtocols(protocol) += 1 }
     member.awaitingJoinCallback = callback
+
     if (member.isAwaitingJoin)
       numMembersAwaitingJoin += 1
+
+    pendingMembers.remove(member.memberId)
   }
 
   def remove(memberId: String): Unit = {
     members.remove(memberId).foreach { member =>
-      member.supportedProtocols.foreach{ case (protocol, _) => 
supportedProtocols(protocol) -= 1 }
+      member.supportedProtocols.foreach { case (protocol, _) => 
supportedProtocols(protocol) -= 1 }

Review comment:
       I have noticed that we update `supportedProtocols` in multiple places 
with the exact same line (modulo +/-). I think that we should consolidate them 
into one or two helper methods. We can address this separately though. I just 
wanted to point that out.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -378,24 +537,19 @@ class GroupCoordinator(val brokerId: Int,
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; this is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"sync-group")) {
-        responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
-      } else if (!group.has(memberId)) {
-        responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
-      } else if (generationId != group.generationId) {
-        responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
-      } else if (protocolType.isDefined && 
!group.protocolType.contains(protocolType.get)) {
-        responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
-      } else if (protocolName.isDefined && 
!group.protocolName.contains(protocolName.get)) {
-        responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
-      } else {
-        group.currentState match {
+      val validationErrorOpt = validateSyncGroup(
+        group,
+        generationId,
+        memberId,
+        protocolType,
+        protocolName,
+        groupInstanceId
+      )
+
+      validationErrorOpt match {
+        case Some(error) =>  responseCallback(SyncGroupResult(error))

Review comment:
       nit: There is an extra space before `responseCallback`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -430,7 +429,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   private def candidateProtocols: Set[String] = {
     // get the set of protocols that are commonly supported by all members
     val numMembers = members.size
-    supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
+    supportedProtocols.filter(_._2 == numMembers).keys.toSet

Review comment:
       nit: Could we directly use `keysSet` instead of `.keys.toSet`?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                                  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })

Review comment:
       Ah.. I see your point regarding the indentation. That makes sense to 
avoid them. I do not see the reasonable way that you are referring to though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to