guozhangwang commented on pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#issuecomment-964444781


   Hi @showuon I think I agree with you that, if we are going to encode both 
`ownedPartitions` and `generation` into the protocol in the new bytecode, then 
we do not need to try to "fix" 2), but only detect and fail it.
   
   As for this specific case, I'm actually thinking that we could consider 
having a slight different version of `resetStateAndGeneration` which, only 
reset the generation id, but not the member id field of the `Generation`. More 
specifically, we have three callers of `resetStateAndRejoin`, and one of them 
is `resetGenerationOnResponseError` (other two should always reset both 
generation and member ids). And there are several callers of 
`resetGenerationOnResponseError`:
   
   * UNKNOWN_MEMBER_ID in JoinGroup: reset both memberId and generation.
   * UNKNOWN_MEMBER_ID in SyncGroup: reset both.
   * ILLEGAL_GENERATION in SyncGroup: reset generation only.
   * UNKNOWN_MEMBER_ID in Heartbeat: reset both.
   * ILLEGAL_GENERATION in Heartbeat: reset generation only.
   * UNKNOWN_MEMBER_ID in OffsetCommit: reset both.
   * ILLEGAL_GENERATION in OffsetCommit: reset generation only.
   
   When we add the generation id to the join group protocol, it means the 
response could also include UNKNOWN_MEMBER_ID as well:
   
   * UNKNOWN_MEMBER_ID in JoinGroup: reset both.
   
   Now back to your original question:
   
   1) StickyAssignor in the new byte code would get the ownedPartitions from 
protocol directly, as in CooperativeStickyAssignor, and bump up the metadata to 
v2 with empty serialized data; the assign function would depend on the encoded 
metadata version to decide whether to retrieve the generation  / 
ownedPartitions from the protocol (v0,v1) or from user-data (v2). Note that for 
old versions where the version is not actually encoded, we'd need to rely on 
deserialization exception with higher versions to fallback to lower versions 
still.
   
   2) CooperativeAssignor in the new byte code would get the generation from 
protocol directly, and bump up the metadata to V2; the assign function would 
depend on the encoded metadata version to decide whether to retrieve the 
generation  / ownedPartitions from the protocol (v0,v1) or from user-data (v2). 
Same as 1) above.
   
   3) In the AbstractPartitionAssignor, we would have a `validateSubscription` 
function which takes in the ownedPartitions across all members, and needs to be 
called by all assignors (it is the customized assignor's own responsibility to 
call it), to check that ownedPartitions do not have overlaps.
   
   4) The broker-side coordinator would check for the generation upon 
Join-Group: if it is a sentinel value (e.g. null) then assume it is a new 
member that have never been in the group yet, and hence always for the current 
generation; if it is not sentinel value and stale, then return the error 
directly. Again, upon getting such error the member should not clear its 
memberId if there's one but only reset the generation to null and also its 
ownedPartitions before re-joining.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to