[ 
https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14832:
--------------------------------
    Description: 
{code:java}
groupManager.storeGroup(group, groupAssignment, error => {            
   if (error != Errors.NONE) {
             warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
             // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.  
      group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
      val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, 
oldMemberId)                      .... }{code}

  was:
{code:java}
          groupManager.storeGroup(group, groupAssignment, error => {            
if (error != Errors.NONE) {              warn(s"Failed to persist metadata for 
group ${group.groupId}: ${error.message}")
              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.              
group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, 
oldSessionTimeoutMs, null)              val oldMember = 
group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)            
  completeAndScheduleNextHeartbeatExpiration(group, oldMember)              
responseCallback(JoinGroupResult(                List.empty,                
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,                generationId = 
group.generationId,                protocolType = group.protocolType,           
     protocolName = group.protocolName,                leaderId = 
currentLeader,                skipAssignment = false,                error = 
error              ))            } else if (supportSkippingAssignment) {        
      // Starting from version 9 of the JoinGroup API, static members are able 
to              // skip running the assignor based on the `SkipAssignment` 
field. We leverage              // this to tell the leader that it is the 
leader of the group but by skipping              // running the assignor while 
the group is in stable state.              // Notes:              // 1) This 
allows the leader to continue monitoring metadata changes for the              
// group. Note that any metadata changes happening while the static leader is   
           // down won't be noticed.              // 2) The assignors are not 
idempotent nor free from side effects. This is why              // we skip 
entirely the assignment step as it could generate a different group             
 // assignment which would be ignored by the group coordinator because the 
group              // is the stable state.              val isLeader = 
group.isLeader(newMemberId)              group.maybeInvokeJoinCallback(member, 
JoinGroupResult(                members = if (isLeader) {                  
group.currentMemberMetadata                } else {                  List.empty 
               },                memberId = newMemberId,                
generationId = group.generationId,                protocolType = 
group.protocolType,                protocolName = group.protocolName,           
     leaderId = group.leaderOrNull,                skipAssignment = isLeader,   
             error = Errors.NONE              ))            } else {            
  // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader 
             // performing trivial assignment while the group is in stable 
stage, because              // the new assignment in leader's next sync call 
won't be broadcast by a stable group.              // This could be guaranteed 
by always returning the old leader id so that the current              // 
leader won't assume itself as a leader based on the returned message, since the 
new              // member.id won't match returned leader id, therefore no 
assignment will be performed.              
group.maybeInvokeJoinCallback(member, JoinGroupResult(                members = 
List.empty,                memberId = newMemberId,                generationId 
= group.generationId,                protocolType = group.protocolType,         
       protocolName = group.protocolName,                leaderId = 
currentLeader,                skipAssignment = false,                error = 
Errors.NONE              ))            }{code}


> Thread unsafe for GroupMetadata when persisting metadata 
> ---------------------------------------------------------
>
>                 Key: KAFKA-14832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.3.2
>            Reporter: zou shengfu
>            Assignee: zou shengfu
>            Priority: Major
>
> {code:java}
> groupManager.storeGroup(group, groupAssignment, error => {            
>    if (error != Errors.NONE) {
>              warn(s"Failed to persist metadata for group ${group.groupId}: 
> ${error.message}")
>              // Failed to persist member.id of the given static member, 
> revert the update of the static member in the group.  
>       group.updateMember(knownStaticMember, oldProtocols, 
> oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
>       val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, 
> oldMemberId)                      .... }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to