zou shengfu created KAFKA-14832: ----------------------------------- Summary: Thread unsafe for GroupMetadata Key: KAFKA-14832 URL: https://issues.apache.org/jira/browse/KAFKA-14832 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Reporter: zou shengfu Assignee: zou shengfu
groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) { // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else { List.empty }, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else { // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) } -- This message was sent by Atlassian Jira (v8.20.10#820010)