lucasbru commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2827683614


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +851,46 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks per-partition assignment epochs.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch(KIP-1251).
+     *
+     * @param member              The consumer group member.
+     * @param receivedMemberEpoch The member epoch from the offset commit 
request.
+     * @return A validator that checks each partition's assignment epoch.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Check if the partition is in the assigned partitions.
+            // If not found in assigned, check partitions pending revocation.
+            Integer assignmentEpoch = member.getAssignmentEpoch(topicId, 
partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = member.getPendingRevocationEpoch(topicId, 
partitionId);
+            }
+
+            // If the partition is not assigned to this member, reject.
+            if (assignmentEpoch == null) {
+                throw new StaleMemberEpochException(
+                    String.format("Partition %s-%d is not assigned to member 
%s.",

Review Comment:
   Above, we are allowing committing unassigned partitions if the "strict epoch 
match" passes, that is client-side member epoch = server-side member epoch. 
   
   Then this error message is probably not sufficient as a reason. We should 
also include that the client-side member epoch != server-side member epoch in 
this error.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4138,9 +4138,12 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
         ConsumerGroupMember member
     ) {
         // We will write a member epoch of -2 for this departing static member.
+        // Assignment epochs are reset to 0 so when the static member rejoins, 
partitions
+        // are considered assigned from epoch 0 to the new member ID.
         ConsumerGroupMember leavingStaticMember = new 
ConsumerGroupMember.Builder(member)

Review Comment:
   It seems we also need to update the dynamic member leave code. We seem to 
set setAssignedPartitions there. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -66,6 +69,8 @@ public static class Builder {
         private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
         private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
         private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata = null;
+        private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs 
= Map.of();

Review Comment:
   Why are we adding assignedPartitionsWithEpochs in addition to 
assignedPartitions? This will get out of sync. I think we should just have the 
second, and replace all uses of `assignedPartitions` with 
`assignedPartitionsWithEpochs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to