[ 
https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14832:
--------------------------------
    Description: 
{code:java}
//代码占位符
{code}
          groupManager.storeGroup(group, groupAssignment, error => \{           
  if (error != Errors.NONE) {               warn(s"Failed to persist metadata 
for group ${group.groupId}: ${error.message}")              // Failed to 
persist member.id of the given static member, revert the update of the static 
member in the group.               group.updateMember(knownStaticMember, 
oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)               
val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, 
oldMemberId)               completeAndScheduleNextHeartbeatExpiration(group, 
oldMember)               responseCallback(JoinGroupResult(                 
List.empty,                 memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,      
           generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = error               ))             } else if 
(supportSkippingAssignment) \{               // Starting from version 9 of the 
JoinGroup API, static members are able to               // skip running the 
assignor based on the `SkipAssignment` field. We leverage               // this 
to tell the leader that it is the leader of the group but by skipping           
    // running the assignor while the group is in stable state.               
// Notes:               // 1) This allows the leader to continue monitoring 
metadata changes for the               // group. Note that any metadata changes 
happening while the static leader is               // down won't be noticed.    
           // 2) The assignors are not idempotent nor free from side effects. 
This is why               // we skip entirely the assignment step as it could 
generate a different group               // assignment which would be ignored 
by the group coordinator because the group               // is the stable 
state.               val isLeader = group.isLeader(newMemberId)               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= if (isLeader) {                   group.currentMemberMetadata                 
} else \{                   List.empty                 },                 
memberId = newMemberId,                 generationId = group.generationId,      
           protocolType = group.protocolType,                 protocolName = 
group.protocolName,                 leaderId = group.leaderOrNull,              
   skipAssignment = isLeader,                 error = Errors.NONE               
))             } else \{               // Prior to version 9 of the JoinGroup 
API, we wanted to avoid current leader               // performing trivial 
assignment while the group is in stable stage, because               // the new 
assignment in leader's next sync call won't be broadcast by a stable group.     
          // This could be guaranteed by always returning the old leader id so 
that the current               // leader won't assume itself as a leader based 
on the returned message, since the new               // member.id won't match 
returned leader id, therefore no assignment will be performed.               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= List.empty,                 memberId = newMemberId,                 
generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = Errors.NONE               ))             }

  was:
```          groupManager.storeGroup(group, groupAssignment, error => {
            if (error != Errors.NONE) {
              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

              // Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
              responseCallback(JoinGroupResult(
                List.empty,
                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = currentLeader,
                skipAssignment = false,
                error = error
              ))
            } else if (supportSkippingAssignment) {
              // Starting from version 9 of the JoinGroup API, static members 
are able to
              // skip running the assignor based on the `SkipAssignment` field. 
We leverage
              // this to tell the leader that it is the leader of the group but 
by skipping
              // running the assignor while the group is in stable state.
              // Notes:
              // 1) This allows the leader to continue monitoring metadata 
changes for the
              // group. Note that any metadata changes happening while the 
static leader is
              // down won't be noticed.
              // 2) The assignors are not idempotent nor free from side 
effects. This is why
              // we skip entirely the assignment step as it could generate a 
different group
              // assignment which would be ignored by the group coordinator 
because the group
              // is the stable state.
              val isLeader = group.isLeader(newMemberId)
              group.maybeInvokeJoinCallback(member, JoinGroupResult(
                members = if (isLeader)

{                   group.currentMemberMetadata                 }

else

{                   List.empty                 }

,
                memberId = newMemberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                skipAssignment = isLeader,
                error = Errors.NONE
              ))
            } else

{               // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader               // performing trivial assignment while the group 
is in stable stage, because               // the new assignment in leader's 
next sync call won't be broadcast by a stable group.               // This 
could be guaranteed by always returning the old leader id so that the current   
            // leader won't assume itself as a leader based on the returned 
message, since the new               // member.id won't match returned leader 
id, therefore no assignment will be performed.               
group.maybeInvokeJoinCallback(member, JoinGroupResult(                 members 
= List.empty,                 memberId = newMemberId,                 
generationId = group.generationId,                 protocolType = 
group.protocolType,                 protocolName = group.protocolName,          
       leaderId = currentLeader,                 skipAssignment = false,        
         error = Errors.NONE               ))             }```


> Thread unsafe for GroupMetadata
> -------------------------------
>
>                 Key: KAFKA-14832
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14832
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.3.2
>            Reporter: zou shengfu
>            Assignee: zou shengfu
>            Priority: Major
>
> {code:java}
> //代码占位符
> {code}
>           groupManager.storeGroup(group, groupAssignment, error => \{         
>     if (error != Errors.NONE) {               warn(s"Failed to persist 
> metadata for group ${group.groupId}: ${error.message}")              // 
> Failed to persist member.id of the given static member, revert the update of 
> the static member in the group.               
> group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, 
> oldSessionTimeoutMs, null)               val oldMember = 
> group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId)          
>      completeAndScheduleNextHeartbeatExpiration(group, oldMember)             
>   responseCallback(JoinGroupResult(                 List.empty,               
>   memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,                 generationId 
> = group.generationId,                 protocolType = group.protocolType,      
>            protocolName = group.protocolName,                 leaderId = 
> currentLeader,                 skipAssignment = false,                 error 
> = error               ))             } else if (supportSkippingAssignment) \{ 
>               // Starting from version 9 of the JoinGroup API, static members 
> are able to               // skip running the assignor based on the 
> `SkipAssignment` field. We leverage               // this to tell the leader 
> that it is the leader of the group but by skipping               // running 
> the assignor while the group is in stable state.               // Notes:      
>          // 1) This allows the leader to continue monitoring metadata changes 
> for the               // group. Note that any metadata changes happening 
> while the static leader is               // down won't be noticed.            
>    // 2) The assignors are not idempotent nor free from side effects. This is 
> why               // we skip entirely the assignment step as it could 
> generate a different group               // assignment which would be ignored 
> by the group coordinator because the group               // is the stable 
> state.               val isLeader = group.isLeader(newMemberId)               
> group.maybeInvokeJoinCallback(member, JoinGroupResult(                 
> members = if (isLeader) {                   group.currentMemberMetadata       
>           } else \{                   List.empty                 },           
>       memberId = newMemberId,                 generationId = 
> group.generationId,                 protocolType = group.protocolType,        
>          protocolName = group.protocolName,                 leaderId = 
> group.leaderOrNull,                 skipAssignment = isLeader,                
>  error = Errors.NONE               ))             } else \{               // 
> Prior to version 9 of the JoinGroup API, we wanted to avoid current leader    
>            // performing trivial assignment while the group is in stable 
> stage, because               // the new assignment in leader's next sync call 
> won't be broadcast by a stable group.               // This could be 
> guaranteed by always returning the old leader id so that the current          
>      // leader won't assume itself as a leader based on the returned message, 
> since the new               // member.id won't match returned leader id, 
> therefore no assignment will be performed.               
> group.maybeInvokeJoinCallback(member, JoinGroupResult(                 
> members = List.empty,                 memberId = newMemberId,                 
> generationId = group.generationId,                 protocolType = 
> group.protocolType,                 protocolName = group.protocolName,        
>          leaderId = currentLeader,                 skipAssignment = false,    
>              error = Errors.NONE               ))             }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to