[
https://issues.apache.org/jira/browse/KAFKA-13581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477859#comment-17477859
]
Kvicii.Yu commented on KAFKA-13581:
-----------------------------------
[~chenzy]
thanks for your reporting.
In fact, I agree with this suggestion, but what are the benefits of doing this?
please let me know
> Error getting old protocol
> --------------------------
>
> Key: KAFKA-13581
> URL: https://issues.apache.org/jira/browse/KAFKA-13581
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 2.5.0
> Reporter: chenzhongyu
> Priority: Minor
> Fix For: 2.5.0
>
>
> In this case,oldProtocols will always be the protocols,because
> knownStaticMember is updated before.So, I think oldProtocol should be
> assigned before updateMember.
> {code:java}
> private def updateStaticMemberAndRebalance(group: GroupMetadata,
> newMemberId: String,
> groupInstanceId: Option[String],
> protocols: List[(String,
> Array[Byte])],
> responseCallback: JoinCallback):
> Unit = {
> val oldMemberId = group.getStaticMemberId(groupInstanceId)
> info(s"Static member $groupInstanceId of group ${group.groupId} with
> unknown member id rejoins, assigning new member id $newMemberId, while " +
> s"old member id $oldMemberId will be removed.")
> val currentLeader = group.leaderOrNull
> val member = group.replaceGroupInstance(oldMemberId, newMemberId,
> groupInstanceId)
> // Heartbeat of old member id will expire without effect since the group no
> longer contains that member id.
> // New heartbeat shall be scheduled with new member id.
> completeAndScheduleNextHeartbeatExpiration(group, member)
> val knownStaticMember = group.get(newMemberId)
> group.updateMember(knownStaticMember, protocols, responseCallback)
> val oldProtocols = knownStaticMember.supportedProtocols
> group.currentState match {
> case Stable =>
> // check if group's selectedProtocol of next generation will change, if
> not, simply store group to persist the
> // updated static member, if yes, rebalance should be triggered to let
> the group's assignment and selectProtocol consistent
> val selectedProtocolOfNextGeneration = group.selectProtocol
> if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
> info(s"Static member which joins during Stable stage and doesn't
> affect selectProtocol will not trigger rebalance.")
> val groupAssignment: Map[String, Array[Byte]] =
> group.allMemberMetadata.map(member => member.memberId ->
> member.assignment).toMap
> groupManager.storeGroup(group, groupAssignment, error => {
> if (error != Errors.NONE) {
> warn(s"Failed to persist metadata for group ${group.groupId}:
> ${error.message}")
> // Failed to persist member.id of the given static member, revert
> the update of the static member in the group.
> group.updateMember(knownStaticMember, oldProtocols, null)
> val oldMember = group.replaceGroupInstance(newMemberId,
> oldMemberId, groupInstanceId)
> completeAndScheduleNextHeartbeatExpiration(group, oldMember)
> responseCallback(JoinGroupResult(
> List.empty,
> memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
> generationId = group.generationId,
> protocolType = group.protocolType,
> protocolName = group.protocolName,
> leaderId = currentLeader,
> error = error
> ))
> } else {
> group.maybeInvokeJoinCallback(member, JoinGroupResult(
> members = List.empty,
> memberId = newMemberId,
> generationId = group.generationId,
> protocolType = group.protocolType,
> protocolName = group.protocolName,
> // We want to avoid current leader performing trivial
> assignment while the group
> // is in stable stage, because the new assignment in leader's
> next sync call
> // won't be broadcast by a stable group. This could be
> guaranteed by
> // always returning the old leader id so that the current
> leader won't assume itself
> // as a leader based on the returned message, since the new
> member.id won't match
> // returned leader id, therefore no assignment will be
> performed.
> leaderId = currentLeader,
> error = Errors.NONE))
> }
> })
> } else {
> maybePrepareRebalance(group, s"Group's selectedProtocol will change
> because static member ${member.memberId} with instance id $groupInstanceId
> joined with change of protocol")
> }
> case CompletingRebalance =>
> // if the group is in after-sync stage, upon getting a new join-group
> of a known static member
> // we should still trigger a new rebalance, since the old member may
> already be sent to the leader
> // for assignment, and hence when the assignment gets back there would
> be a mismatch of the old member id
> // with the new replaced member id. As a result the new member id would
> not get any assignment.
> prepareRebalance(group, s"Updating metadata for static member
> ${member.memberId} with instance id $groupInstanceId")
> case Empty | Dead =>
> throw new IllegalStateException(s"Group ${group.groupId} was not
> supposed to be " +
> s"in the state ${group.currentState} when the unknown static member
> $groupInstanceId rejoins.")
> case PreparingRebalance =>
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)