[ 
https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14832:
--------------------------------
    Description: 
{code:java}
          groupManager.storeGroup(group, groupAssignment, error => {            
if (error != Errors.NONE) {              warn(s"Failed to persist metadata for 
group ${group.groupId}: ${error.message}")
              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.              
group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, 
oldSessionTimeoutMs, null)              val oldMember = 
group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)            
  completeAndScheduleNextHeartbeatExpiration(group, oldMember)              
responseCallback(JoinGroupResult(                List.empty,                
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,                generationId = 
group.generationId,                protocolType = group.protocolType,           
     protocolName = group.protocolName,                leaderId = 
currentLeader,                skipAssignment = false,                error = 
error              ))            } else if (supportSkippingAssignment) {        
      // Starting from version 9 of the JoinGroup API, static members are able 
to              // skip running the assignor based on the `SkipAssignment` 
field. We leverage              // this to tell the leader that it is the 
leader of the group but by skipping              // running the assignor while 
the group is in stable state.              // Notes:              // 1) This 
allows the leader to continue monitoring metadata changes for the              
// group. Note that any metadata changes happening while the static leader is   
           // down won't be noticed.              // 2) The assignors are not 
idempotent nor free from side effects. This is why              // we skip 
entirely the assignment step as it could generate a different group             
 // assignment which would be ignored by the group coordinator because the 
group              // is the stable state.              val isLeader = 
group.isLeader(newMemberId)              group.maybeInvokeJoinCallback(member, 
JoinGroupResult(                members = if (isLeader) {                  
group.currentMemberMetadata                } else {                  List.empty 
               },                memberId = newMemberId,                
generationId = group.generationId,                protocolType = 
group.protocolType,                protocolName = group.protocolName,           
     leaderId = group.leaderOrNull,                skipAssignment = isLeader,   
             error = Errors.NONE              ))            } else {            
  // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader 
             // performing trivial assignment while the group is in stable 
stage, because              // the new assignment in leader's next sync call 
won't be broadcast by a stable group.              // This could be guaranteed 
by always returning the old leader id so that the current              // 
leader won't assume itself as a leader based on the returned message, since the 
new              // member.id won't match returned leader id, therefore no 
assignment will be performed.              
group.maybeInvokeJoinCallback(member, JoinGroupResult(                members = 
List.empty,                memberId = newMemberId,                generationId 
= group.generationId,                protocolType = group.protocolType,         
       protocolName = group.protocolName,                leaderId = 
currentLeader,                skipAssignment = false,                error = 
Errors.NONE              ))            }{code}

  was:
{code:java}
//代码占位符
{code}
          groupManager.storeGroup(group, groupAssignment, error => \{           
  if (error != Errors.NONE) {               warn(s"Failed to persist metadata 
for group ${group.groupId}: ${error.message}")              // Failed to 
persist member.id of the given static member, revert the update of the static 
member in the group.               group.updateMember(knownStaticMember, 
oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)               
val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, 
oldMemberId)               completeAndScheduleNextHeartbeatExpiration(group, 
oldMember)               responseCallback(JoinGroupResult(                 
List.empty,                 memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,      
           generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = error               ))             } else if 
(supportSkippingAssignment) \{               // Starting from version 9 of the 
JoinGroup API, static members are able to               // skip running the 
assignor based on the `SkipAssignment` field. We leverage               // this 
to tell the leader that it is the leader of the group but by skipping           
    // running the assignor while the group is in stable state.               
// Notes:               // 1) This allows the leader to continue monitoring 
metadata changes for the               // group. Note that any metadata changes 
happening while the static leader is               // down won't be noticed.    
           // 2) The assignors are not idempotent nor free from side effects. 
This is why               // we skip entirely the assignment step as it could 
generate a different group               // assignment which would be ignored 
by the group coordinator because the group               // is the stable 
state.               val isLeader = group.isLeader(newMemberId)               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= if (isLeader) {                   group.currentMemberMetadata                 
} else \{                   List.empty                 },                 
memberId = newMemberId,                 generationId = group.generationId,      
           protocolType = group.protocolType,                 protocolName = 
group.protocolName,                 leaderId = group.leaderOrNull,              
   skipAssignment = isLeader,                 error = Errors.NONE               
))             } else \{               // Prior to version 9 of the JoinGroup 
API, we wanted to avoid current leader               // performing trivial 
assignment while the group is in stable stage, because               // the new 
assignment in leader's next sync call won't be broadcast by a stable group.     
          // This could be guaranteed by always returning the old leader id so 
that the current               // leader won't assume itself as a leader based 
on the returned message, since the new               // member.id won't match 
returned leader id, therefore no assignment will be performed.               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= List.empty,                 memberId = newMemberId,                 
generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = Errors.NONE               ))             }

        Summary: Thread unsafe for GroupMetadata when persisting metadata   
(was: Thread unsafe for GroupMetadata)

> Thread unsafe for GroupMetadata when persisting metadata 
> ---------------------------------------------------------
>
>                 Key: KAFKA-14832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.3.2
>            Reporter: zou shengfu
>            Assignee: zou shengfu
>            Priority: Major
>
> {code:java}
>           groupManager.storeGroup(group, groupAssignment, error => {          
>   if (error != Errors.NONE) {              warn(s"Failed to persist metadata 
> for group ${group.groupId}: ${error.message}")
>               // Failed to persist member.id of the given static member, 
> revert the update of the static member in the group.              
> group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, 
> oldSessionTimeoutMs, null)              val oldMember = 
> group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)          
>     completeAndScheduleNextHeartbeatExpiration(group, oldMember)              
> responseCallback(JoinGroupResult(                List.empty,                
> memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,                generationId = 
> group.generationId,                protocolType = group.protocolType,         
>        protocolName = group.protocolName,                leaderId = 
> currentLeader,                skipAssignment = false,                error = 
> error              ))            } else if (supportSkippingAssignment) {      
>         // Starting from version 9 of the JoinGroup API, static members are 
> able to              // skip running the assignor based on the 
> `SkipAssignment` field. We leverage              // this to tell the leader 
> that it is the leader of the group but by skipping              // running 
> the assignor while the group is in stable state.              // Notes:       
>        // 1) This allows the leader to continue monitoring metadata changes 
> for the              // group. Note that any metadata changes happening while 
> the static leader is              // down won't be noticed.              // 
> 2) The assignors are not idempotent nor free from side effects. This is why   
>            // we skip entirely the assignment step as it could generate a 
> different group              // assignment which would be ignored by the 
> group coordinator because the group              // is the stable state.      
>         val isLeader = group.isLeader(newMemberId)              
> group.maybeInvokeJoinCallback(member, JoinGroupResult(                members 
> = if (isLeader) {                  group.currentMemberMetadata                
> } else {                  List.empty                },                
> memberId = newMemberId,                generationId = group.generationId,     
>            protocolType = group.protocolType,                protocolName = 
> group.protocolName,                leaderId = group.leaderOrNull,             
>    skipAssignment = isLeader,                error = Errors.NONE              
> ))            } else {              // Prior to version 9 of the JoinGroup 
> API, we wanted to avoid current leader              // performing trivial 
> assignment while the group is in stable stage, because              // the 
> new assignment in leader's next sync call won't be broadcast by a stable 
> group.              // This could be guaranteed by always returning the old 
> leader id so that the current              // leader won't assume itself as a 
> leader based on the returned message, since the new              // member.id 
> won't match returned leader id, therefore no assignment will be performed.    
>           group.maybeInvokeJoinCallback(member, JoinGroupResult(              
>   members = List.empty,                memberId = newMemberId,                
> generationId = group.generationId,                protocolType = 
> group.protocolType,                protocolName = group.protocolName,         
>        leaderId = currentLeader,                skipAssignment = false,       
>          error = Errors.NONE              ))            }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to