[ 
https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14832:
--------------------------------
    Description: 
```          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members 
are able to
              // skip running the assignor based on the `SkipAssignment` field. 
We leverage
              // this to tell the leader that it is the leader of the group but 
by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata 
changes for the
              // group. Note that any metadata changes happening while the 
static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side 
effects. This is why
              // we skip entirely the assignment step as it could generate a 
different group
              // assignment which would be ignored by the group coordinator 
because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader)

{                   group.currentMemberMetadata                 }

else

{                   List.empty                 }

,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else

{               // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader               // performing trivial assignment while the group 
is in stable stage, because               // the new assignment in leader's 
next sync call won't be broadcast by a stable group.               // This 
could be guaranteed by always returning the old leader id so that the current   
            // leader won't assume itself as a leader based on the returned 
message, since the new               // member.id won't match returned leader 
id, therefore no assignment will be performed.               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= List.empty,                 memberId = newMemberId,                 
generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = Errors.NONE               ))             }```

  was:
          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members 
are able to
              // skip running the assignor based on the `SkipAssignment` field. 
We leverage
              // this to tell the leader that it is the leader of the group but 
by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata 
changes for the
              // group. Note that any metadata changes happening while the 
static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side 
effects. This is why
              // we skip entirely the assignment step as it could generate a 
different group
              // assignment which would be ignored by the group coordinator 
because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else {
              // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
              // performing trivial assignment while the group is in stable 
stage, because
              // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
              // This could be guaranteed by always returning the old leader id 
so that the current
              // leader won't assume itself as a leader based on the returned 
message, since the new
              // member.id won't match returned leader id, therefore no 
assignment will be performed.
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = List.empty,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = Errors.NONE
              ))
            }


> Thread unsafe for GroupMetadata
> -------------------------------
>
>                 Key: KAFKA-14832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.3.2
>            Reporter: zou shengfu
>            Assignee: zou shengfu
>            Priority: Major
>
> ```          groupManager.storeGroup(group, groupAssignment, error => {
>             if (error != Errors.NONE) {
>               warn(s"Failed to persist metadata for group ${group.groupId}: 
> ${error.message}")
>               // Failed to persist member.id of the given static member, 
> revert the update of the static member in the group.
>               group.updateMember(knownStaticMember, oldProtocols, 
> oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
>               val oldMember = group.replaceStaticMember(groupInstanceId, 
> newMemberId, oldMemberId)
>               completeAndScheduleNextHeartbeatExpiration(group, oldMember)
>               responseCallback(JoinGroupResult(
>                 List.empty,
>                 memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
>                 generationId = group.generationId,
>                 protocolType = group.protocolType,
>                 protocolName = group.protocolName,
>                 leaderId = currentLeader,
>                 skipAssignment = false,
>                 error = error
>               ))
>             } else if (supportSkippingAssignment) {
>               // Starting from version 9 of the JoinGroup API, static members 
> are able to
>               // skip running the assignor based on the `SkipAssignment` 
> field. We leverage
>               // this to tell the leader that it is the leader of the group 
> but by skipping
>               // running the assignor while the group is in stable state.
>               // Notes:
>               // 1) This allows the leader to continue monitoring metadata 
> changes for the
>               // group. Note that any metadata changes happening while the 
> static leader is
>               // down won't be noticed.
>               // 2) The assignors are not idempotent nor free from side 
> effects. This is why
>               // we skip entirely the assignment step as it could generate a 
> different group
>               // assignment which would be ignored by the group coordinator 
> because the group
>               // is the stable state.
>               val isLeader = group.isLeader(newMemberId)
>               group.maybeInvokeJoinCallback(member, JoinGroupResult(
>                 members = if (isLeader)
> {                   group.currentMemberMetadata                 }
> else
> {                   List.empty                 }
> ,
>                 memberId = newMemberId,
>                 generationId = group.generationId,
>                 protocolType = group.protocolType,
>                 protocolName = group.protocolName,
>                 leaderId = group.leaderOrNull,
>                 skipAssignment = isLeader,
>                 error = Errors.NONE
>               ))
>             } else
> {               // Prior to version 9 of the JoinGroup API, we wanted to 
> avoid current leader               // performing trivial assignment while the 
> group is in stable stage, because               // the new assignment in 
> leader's next sync call won't be broadcast by a stable group.               
> // This could be guaranteed by always returning the old leader id so that the 
> current               // leader won't assume itself as a leader based on the 
> returned message, since the new               // member.id won't match 
> returned leader id, therefore no assignment will be performed.               
> group.maybeInvokeJoinCallback(member, JoinGroupResult(                 
> members = List.empty,                 memberId = newMemberId,                 
> generationId = group.generationId,                 protocolType = 
> group.protocolType,                 protocolName = group.protocolName,        
>          leaderId = currentLeader,                 skipAssignment = false,    
>              error = Errors.NONE               ))             }```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to