showuon edited a comment on pull request #11451: URL: https://github.com/apache/kafka/pull/11451#issuecomment-957039415
@guozhangwang , thanks for your comment. Answer your question below. > Only resetting the generation at the coordinator layer would not change what user data would be serialized and sent to the brokers. Am I missing something? --> The point here is, when reset state and `generation`, we'll mark them as default value (i.e. `NO_GENERATION` for generation), and also mark the consumer as `needsJoinPrepare`, and `needsRejoin`. That means, when the consumer do next poll, it'll enter `onJoinPrepare`, and in `ConsumerCoordinator`, we'll clean up all the assigned partition in this consumer if it's `NO_GENERATION`, and then rejoin the group: ```java if (generation == Generation.NO_GENERATION.generationId && memberId.equals(Generation.NO_GENERATION.memberId)) { revokedPartitions = new HashSet<>(subscriptions.assignedPartitions()); if (!revokedPartitions.isEmpty()) { log.info("Giving away all assigned partitions as lost since generation has been reset," + "indicating that consumer is no longer part of the group"); exception = invokePartitionsLost(revokedPartitions); subscriptions.assignFromSubscribed(Collections.emptySet()); } } ``` Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org