[ https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zou shengfu updated KAFKA-14832: -------------------------------- Description: ``` groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) { // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else { List.empty } , memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else { // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) }``` was: groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) { // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else { List.empty }, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else { // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) } > Thread unsafe for GroupMetadata > ------------------------------- > > Key: KAFKA-14832 > URL: https://issues.apache.org/jira/browse/KAFKA-14832 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 3.3.2 > Reporter: zou shengfu > Assignee: zou shengfu > Priority: Major > > ``` groupManager.storeGroup(group, groupAssignment, error => { > if (error != Errors.NONE) { > warn(s"Failed to persist metadata for group ${group.groupId}: > ${error.message}") > // Failed to persist member.id of the given static member, > revert the update of the static member in the group. > group.updateMember(knownStaticMember, oldProtocols, > oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) > val oldMember = group.replaceStaticMember(groupInstanceId, > newMemberId, oldMemberId) > completeAndScheduleNextHeartbeatExpiration(group, oldMember) > responseCallback(JoinGroupResult( > List.empty, > memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, > generationId = group.generationId, > protocolType = group.protocolType, > protocolName = group.protocolName, > leaderId = currentLeader, > skipAssignment = false, > error = error > )) > } else if (supportSkippingAssignment) { > // Starting from version 9 of the JoinGroup API, static members > are able to > // skip running the assignor based on the `SkipAssignment` > field. We leverage > // this to tell the leader that it is the leader of the group > but by skipping > // running the assignor while the group is in stable state. > // Notes: > // 1) This allows the leader to continue monitoring metadata > changes for the > // group. Note that any metadata changes happening while the > static leader is > // down won't be noticed. > // 2) The assignors are not idempotent nor free from side > effects. This is why > // we skip entirely the assignment step as it could generate a > different group > // assignment which would be ignored by the group coordinator > because the group > // is the stable state. > val isLeader = group.isLeader(newMemberId) > group.maybeInvokeJoinCallback(member, JoinGroupResult( > members = if (isLeader) > { group.currentMemberMetadata } > else > { List.empty } > , > memberId = newMemberId, > generationId = group.generationId, > protocolType = group.protocolType, > protocolName = group.protocolName, > leaderId = group.leaderOrNull, > skipAssignment = isLeader, > error = Errors.NONE > )) > } else > { // Prior to version 9 of the JoinGroup API, we wanted to > avoid current leader // performing trivial assignment while the > group is in stable stage, because // the new assignment in > leader's next sync call won't be broadcast by a stable group. > // This could be guaranteed by always returning the old leader id so that the > current // leader won't assume itself as a leader based on the > returned message, since the new // member.id won't match > returned leader id, therefore no assignment will be performed. > group.maybeInvokeJoinCallback(member, JoinGroupResult( > members = List.empty, memberId = newMemberId, > generationId = group.generationId, protocolType = > group.protocolType, protocolName = group.protocolName, > leaderId = currentLeader, skipAssignment = false, > error = Errors.NONE )) }``` -- This message was sent by Atlassian Jira (v8.20.10#820010)