lucasbru commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2832233381
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +851,46 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks per-partition assignment epochs.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Check if the partition is in the assigned partitions.
+ // If not found in assigned, check partitions pending revocation.
+ Integer assignmentEpoch = member.getAssignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.getPendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ // If the partition is not assigned to this member, reject.
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(
+ String.format("Partition %s-%d is not assigned to member
%s.",
Review Comment:
Sound good!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]