dajac commented on a change in pull request #9958: URL: https://github.com/apache/kafka/pull/9958#discussion_r582631095
########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -602,6 +765,30 @@ class GroupCoordinator(val brokerId: Int, groupError -> partitionErrors } + private def validateHeartbeat( + group: GroupMetadata, + generationId: Int, + memberId: String, + groupInstanceId: Option[String] + ): Option[Errors] = { + if (group.is(Dead)) { + Some(Errors.COORDINATOR_NOT_AVAILABLE) + } else { + validateCurrentMember( + group, + memberId, + groupInstanceId, + operation = "sync-group" Review comment: `sync-group` => `heartbeat`? ########## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ########## @@ -247,16 +250,20 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (leaderId.isEmpty) leaderId = Some(member.memberId) + members.put(member.memberId, member) - member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } + member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) += 1 } member.awaitingJoinCallback = callback + if (member.isAwaitingJoin) numMembersAwaitingJoin += 1 + + pendingMembers.remove(member.memberId) } def remove(memberId: String): Unit = { members.remove(memberId).foreach { member => - member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 } + member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 } Review comment: I have noticed that we update `supportedProtocols` in multiple places with the exact same line (modulo +/-). I think that we should consolidate them into one or two helper methods. We can address this separately though. I just wanted to point that out. ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -378,24 +537,19 @@ class GroupCoordinator(val brokerId: Int, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback): Unit = { group.inLock { - if (group.is(Dead)) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the member retry - // finding the correct coordinator and rejoin. - responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE)) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) { - responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID)) - } else if (!group.has(memberId)) { - responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID)) - } else if (generationId != group.generationId) { - responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION)) - } else if (protocolType.isDefined && !group.protocolType.contains(protocolType.get)) { - responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL)) - } else if (protocolName.isDefined && !group.protocolName.contains(protocolName.get)) { - responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL)) - } else { - group.currentState match { + val validationErrorOpt = validateSyncGroup( + group, + generationId, + memberId, + protocolType, + protocolName, + groupInstanceId + ) + + validationErrorOpt match { + case Some(error) => responseCallback(SyncGroupResult(error)) Review comment: nit: There is an extra space before `responseCallback`. ########## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ########## @@ -430,7 +429,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private def candidateProtocols: Set[String] = { // get the set of protocols that are commonly supported by all members val numMembers = members.size - supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet + supportedProtocols.filter(_._2 == numMembers).keys.toSet Review comment: nit: Could we directly use `keysSet` instead of `.keys.toSet`? ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { - if (group.is(Dead)) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; it is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the member retry - // finding the correct coordinator and rejoin. - responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE }) - } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) { - responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID }) - } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { - // Enforce member id when it is set. - responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID }) - } else if (generationId >= 0 && generationId != group.generationId) { - // Enforce generation check when it is set. - responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION }) + val validationErrorOpt = validateTxnOffsetCommit( + group, + generationId, + memberId, + groupInstanceId + ) + + if (validationErrorOpt.isDefined) { + responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get }) Review comment: Ah.. I see your point regarding the indentation. That makes sense to avoid them. I do not see the reasonable way that you are referring to though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org