dajac commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r569206619



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -185,9 +185,9 @@ class GroupCoordinator(val brokerId: Int,
               group.remove(memberId)
               
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.GROUP_MAX_SIZE_REACHED))
             } else if (isUnknownMember) {
-              doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, 
clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, 
protocols, responseCallback)
+              doNewMemberJoinGroup(group, groupInstanceId, 
requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, 
sessionTimeoutMs, protocolType, protocols, responseCallback)

Review comment:
       nit:  Could we break this long line and the one below as well?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                                  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })

Review comment:
       nit: I would use `match` here as well.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -220,67 +222,163 @@ class GroupCoordinator(val brokerId: Int,
         responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else {
         val newMemberId = group.generateMemberId(clientId, groupInstanceId)
+        groupInstanceId match {
+          case Some(instanceId) =>
+            doStaticNewMemberJoinGroup(
+              group,
+              instanceId,
+              newMemberId,
+              clientId,
+              clientHost,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              protocolType,
+              protocols,
+              responseCallback
+            )
 
-        if (group.hasStaticMember(groupInstanceId)) {
-          updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, 
protocols, responseCallback)
-        } else if (requireKnownMemberId) {
-            // If member id required (dynamic membership), register the member 
in the pending member list
-            // and send back a response to call for another join group request 
with allocated member id.
-          debug(s"Dynamic member with unknown member id joins group 
${group.groupId} in " +
-              s"${group.currentState} state. Created a new member id 
$newMemberId and request the member to rejoin with this id.")
-          group.addPendingMember(newMemberId)
-          addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
-          responseCallback(JoinGroupResult(newMemberId, 
Errors.MEMBER_ID_REQUIRED))
-        } else {
-          info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} 
Member with unknown member id joins group ${group.groupId} in " +
-            s"${group.currentState} state. Created a new member id 
$newMemberId for this member and add to the group.")
-          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, 
newMemberId, groupInstanceId,
-            clientId, clientHost, protocolType, protocols, group, 
responseCallback)
+          case None =>
+            doDynamicNewMemberJoinGroup(
+              group,
+              requireKnownMemberId,
+              newMemberId,
+              clientId,
+              clientHost,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              protocolType,
+              protocols,
+              responseCallback
+            )
         }
       }
     }
   }
 
-  private def doJoinGroup(group: GroupMetadata,
-                          memberId: String,
-                          groupInstanceId: Option[String],
-                          clientId: String,
-                          clientHost: String,
-                          rebalanceTimeoutMs: Int,
-                          sessionTimeoutMs: Int,
-                          protocolType: String,
-                          protocols: List[(String, Array[Byte])],
-                          responseCallback: JoinCallback): Unit = {
+  private def doStaticNewMemberJoinGroup(
+    group: GroupMetadata,
+    groupInstanceId: String,
+    newMemberId: String,
+    clientId: String,
+    clientHost: String,
+    rebalanceTimeoutMs: Int,
+    sessionTimeoutMs: Int,
+    protocolType: String,
+    protocols: List[(String, Array[Byte])],
+    responseCallback: JoinCallback
+  ): Unit = {
+    group.currentStaticMemberId(groupInstanceId) match {
+      case Some(oldMemberId) =>
+        info(s"Static member with groupInstanceId=$groupInstanceId and unknown 
member id joins " +
+          s"group ${group.groupId} in ${group.currentState} state. Replacing 
previously mapped " +
+          s"member $oldMemberId with this groupInstanceId.")
+        updateStaticMemberAndRebalance(group, newMemberId, oldMemberId, 
groupInstanceId, protocols, responseCallback)
+
+      case None =>
+        info(s"Static member with groupInstanceId=$groupInstanceId and unknown 
member id joins " +
+          s"group ${group.groupId} in ${group.currentState} state. Created a 
new member id $newMemberId " +
+          s"for this member and add to the group.")
+        addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, 
newMemberId, Some(groupInstanceId),
+          clientId, clientHost, protocolType, protocols, group, 
responseCallback)
+    }
+  }
+
+  private def doDynamicNewMemberJoinGroup(
+    group: GroupMetadata,
+    requireKnownMemberId: Boolean,
+    newMemberId: String,
+    clientId: String,
+    clientHost: String,
+    rebalanceTimeoutMs: Int,
+    sessionTimeoutMs: Int,
+    protocolType: String,
+    protocols: List[(String, Array[Byte])],
+    responseCallback: JoinCallback
+  ): Unit = {
+    if (requireKnownMemberId) {
+      // If member id required, register the member in the pending member list 
and send
+      // back a response to call for another join group request with allocated 
member id.
+      info(s"Dynamic member with unknown member id joins group 
${group.groupId} in " +
+        s"${group.currentState} state. Created a new member id $newMemberId 
and request the " +
+        s"member to rejoin with this id.")
+      group.addPendingMember(newMemberId)
+      addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+      responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
+    } else {
+      info(s"Dynamic Member with unknown member id joins group 
${group.groupId} in " +
+        s"${group.currentState} state. Created a new member id $newMemberId 
for this member " +
+        s"and add to the group.")
+      addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, 
None,
+        clientId, clientHost, protocolType, protocols, group, responseCallback)
+    }
+  }
+
+  private def validateCurrentMember(
+    group: GroupMetadata,
+    memberId: String,
+    groupInstanceId: Option[String],
+    operation: String
+  ): Option[Errors] = {
+    // We are validating two things:
+    // 1. If `groupInstanceId` is present, then it exists and is mapped to 
`memberId`
+    // 2. The `memberId` exists in the group
+

Review comment:
       nit: This empty line could be removed.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -603,6 +731,25 @@ class GroupCoordinator(val brokerId: Int,
     groupError -> partitionErrors
   }
 
+  private def validateHeartbeat(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else {
+      validateCurrentMember(group, memberId, groupInstanceId, 
"sync-group").orElse {

Review comment:
       `heartbeat` instead of `sync-group` should be used here.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1017,26 +1198,17 @@ class GroupCoordinator(val brokerId: Int,
     // for new members. If the new member is still there, we expect it to 
retry.
     completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-    if (member.isStaticMember) {
-      info(s"Adding new static member $groupInstanceId to group 
${group.groupId} with member id $memberId.")
-      group.addStaticMember(groupInstanceId, memberId)
-    } else {
-      group.removePendingMember(memberId)
-    }
     maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId")
   }
 
   private def updateStaticMemberAndRebalance(group: GroupMetadata,
                                              newMemberId: String,
-                                             groupInstanceId: Option[String],
+                                             oldMemberId: String,

Review comment:
       nit: I would put `oldMemberId` before `newMemberId`. It feels a bit more 
natural when reading it.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1055,11 +1227,15 @@ class GroupCoordinator(val brokerId: Int,
           val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
           groupManager.storeGroup(group, groupAssignment, error => {
             if (error != Errors.NONE) {
+
+              // TODO: This logic seems questionable. The write was not 
committed, but that doesn't
+              //  mean it wasn't written to the log and cannot eventually 
become committed.

Review comment:
       I do agree. Should we turn this onto a Jira and address it in a follow 
up PR?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -247,11 +250,15 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
     if (leaderId.isEmpty)
       leaderId = Some(member.memberId)
+
     members.put(member.memberId, member)
     member.supportedProtocols.foreach{ case (protocol, _) => 
supportedProtocols(protocol) += 1 }

Review comment:
       nit: Not related to your PR but a space is missing before `{`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                                  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })
       } else {
         groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback, producerId, producerEpoch)
       }
     }
   }
 
+
+  private def validateOffsetCommit(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId >= 0 || memberId != 
JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+      validateCurrentMember(group, memberId, groupInstanceId, 
"offset-commit").orElse {
+        if (generationId != group.generationId) {
+          Some(Errors.ILLEGAL_GENERATION)
+        } else {
+          None
+        }
+      }
+    } else if (!group.is(Empty)) {
+      // When the group is non-empty, only members can commit offsets
+      Some(Errors.UNKNOWN_MEMBER_ID)
+    } else {
+      None
+    }
+  }
+
   private def doCommitOffsets(group: GroupMetadata,
                               memberId: String,
                               groupInstanceId: Option[String],
                               generationId: Int,
                               offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-      } else if (generationId < 0 && group.is(Empty)) {
-        // The group is only using Kafka to store offsets.
-        groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback)
-      } else if (!group.has(memberId)) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId != group.generationId) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })

Review comment:
       nit: I would use `match` here as well.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -470,33 +600,31 @@ class GroupCoordinator(val brokerId: Int,
                 val memberErrors = leavingMembers.map { leavingMember =>
                   val memberId = leavingMember.memberId
                   val groupInstanceId = Option(leavingMember.groupInstanceId)
-                  if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
-                    && group.isStaticMemberFenced(memberId, groupInstanceId, 
"leave-group")) {
-                    memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
+
+                  // The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+                  // in which case we expect the MemberId to be undefined.
+                  if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+                    groupInstanceId.flatMap(group.currentStaticMemberId) match 
{
+                      case Some(currentMemberId) =>
+                        removeCurrentMemberFromGroup(group, currentMemberId)
+                        memberLeaveError(leavingMember, Errors.NONE)
+                      case None =>
+                        memberLeaveError(leavingMember, 
Errors.UNKNOWN_MEMBER_ID)
+                    }
                   } else if (group.isPendingMember(memberId)) {
-                    if (groupInstanceId.isDefined) {
-                      throw new IllegalStateException(s"the static member 
$groupInstanceId was not expected to be leaving " +
-                        s"from pending member bucket with member id $memberId")
+                    removePendingMemberAndUpdateGroup(group, memberId)
+                    
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+                    info(s"Pending member with memberId=$memberId has left 
group ${group.groupId} " +
+                      s"through explicit `LeaveGroup` request")
+                    memberLeaveError(leavingMember, Errors.NONE)
+                  } else {
+                    val memberErrorOpt = validateCurrentMember(group, 
memberId, groupInstanceId, "leave-group")
+                    if (memberErrorOpt.isDefined) {
+                      memberLeaveError(leavingMember, memberErrorOpt.get)

Review comment:
       nit: It might be better to use `match` here instead of using `isDefined` 
and `get` on `memberErrorOpt`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                                  responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, 
"txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> 
Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> 
validationErrorOpt.get })
       } else {
         groupManager.storeOffsets(group, memberId, offsetMetadata, 
responseCallback, producerId, producerEpoch)
       }
     }
   }
 
+
+  private def validateOffsetCommit(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId >= 0 || memberId != 
JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+      validateCurrentMember(group, memberId, groupInstanceId, 
"offset-commit").orElse {
+        if (generationId != group.generationId) {
+          Some(Errors.ILLEGAL_GENERATION)
+        } else {
+          None
+        }
+      }

Review comment:
       This block is similar to the one in `validateTxnOffsetCommit` module the 
reason. I wonder if we could define a common helper method for both cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to