chenzhongyu created KAFKA-13581:
-----------------------------------

             Summary: Error getting old protocol
                 Key: KAFKA-13581
                 URL: https://issues.apache.org/jira/browse/KAFKA-13581
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 2.5.0
            Reporter: chenzhongyu
             Fix For: 2.5.0


In this case,oldProtocols will always be the protocols,because 
knownStaticMember is updated before.So, I think oldProtocol should be assigned 
before updateMember.
{code:java}
private def updateStaticMemberAndRebalance(group: GroupMetadata,
                                           newMemberId: String,
                                           groupInstanceId: Option[String],
                                           protocols: List[(String, 
Array[Byte])],
                                           responseCallback: JoinCallback): 
Unit = {
  val oldMemberId = group.getStaticMemberId(groupInstanceId)
  info(s"Static member $groupInstanceId of group ${group.groupId} with unknown 
member id rejoins, assigning new member id $newMemberId, while " +
    s"old member id $oldMemberId will be removed.")

  val currentLeader = group.leaderOrNull
  val member = group.replaceGroupInstance(oldMemberId, newMemberId, 
groupInstanceId)
  // Heartbeat of old member id will expire without effect since the group no 
longer contains that member id.
  // New heartbeat shall be scheduled with new member id.
  completeAndScheduleNextHeartbeatExpiration(group, member)

  val knownStaticMember = group.get(newMemberId)
  group.updateMember(knownStaticMember, protocols, responseCallback)
  val oldProtocols = knownStaticMember.supportedProtocols

  group.currentState match {
    case Stable =>
      // check if group's selectedProtocol of next generation will change, if 
not, simply store group to persist the
      // updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
      val selectedProtocolOfNextGeneration = group.selectProtocol
      if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
        info(s"Static member which joins during Stable stage and doesn't affect 
selectProtocol will not trigger rebalance.")
        val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
        groupManager.storeGroup(group, groupAssignment, error => {
          if (error != Errors.NONE) {
            warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

            // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
            group.updateMember(knownStaticMember, oldProtocols, null)
            val oldMember = group.replaceGroupInstance(newMemberId, 
oldMemberId, groupInstanceId)
            completeAndScheduleNextHeartbeatExpiration(group, oldMember)
            responseCallback(JoinGroupResult(
              List.empty,
              memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
              generationId = group.generationId,
              protocolType = group.protocolType,
              protocolName = group.protocolName,
              leaderId = currentLeader,
              error = error
            ))
          } else {
            group.maybeInvokeJoinCallback(member, JoinGroupResult(
              members = List.empty,
              memberId = newMemberId,
              generationId = group.generationId,
              protocolType = group.protocolType,
              protocolName = group.protocolName,
              // We want to avoid current leader performing trivial assignment 
while the group
              // is in stable stage, because the new assignment in leader's 
next sync call
              // won't be broadcast by a stable group. This could be guaranteed 
by
              // always returning the old leader id so that the current leader 
won't assume itself
              // as a leader based on the returned message, since the new 
member.id won't match
              // returned leader id, therefore no assignment will be performed.
              leaderId = currentLeader,
              error = Errors.NONE))
          }
        })
      } else {
        maybePrepareRebalance(group, s"Group's selectedProtocol will change 
because static member ${member.memberId} with instance id $groupInstanceId 
joined with change of protocol")
      }
    case CompletingRebalance =>
      // if the group is in after-sync stage, upon getting a new join-group of 
a known static member
      // we should still trigger a new rebalance, since the old member may 
already be sent to the leader
      // for assignment, and hence when the assignment gets back there would be 
a mismatch of the old member id
      // with the new replaced member id. As a result the new member id would 
not get any assignment.
      prepareRebalance(group, s"Updating metadata for static member 
${member.memberId} with instance id $groupInstanceId")
    case Empty | Dead =>
      throw new IllegalStateException(s"Group ${group.groupId} was not supposed 
to be " +
        s"in the state ${group.currentState} when the unknown static member 
$groupInstanceId rejoins.")
    case PreparingRebalance =>
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to