squah-confluent commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2838826022


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For members using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol

Review Comment:
   nit:
   ```suggestion
           // For members using the consumer protocol, the epoch must either 
match the last epoch sent
           // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For members using the classic protocol, use strict epoch validation.

Review Comment:
   nit:
   ```suggestion
   
           // For members using the classic protocol, use strict epoch 
validation.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -63,9 +65,9 @@ public static class Builder {
         private Set<String> subscribedTopicNames = Set.of();
         private String subscribedTopicRegex = "";
         private String serverAssignorName = null;
-        private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
-        private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
         private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata = null;
+        private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs 
= Map.of();

Review Comment:
   nit: Could you minimize the diff churn in this file by making the 
replacement methods/fields occupy the same position as the previous ones?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
         Map<Uuid, Set<Integer>> memberAssignedPartitions
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs;
+        boolean changed = false;

Review Comment:
   Is this refactor necessary?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -370,10 +380,13 @@ private ConsumerGroupMember computeNextAssignment(
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         boolean hasUnreleasedPartitions = false;
-        Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new 
HashMap<>();
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs = new 
HashMap<>();
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs = new HashMap<>();
         Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new 
HashMap<>();
 
+        // Get existing epochs from member
+        Map<Uuid, Map<Integer, Integer>> existingAssignedEpochs = 
member.assignedPartitionsWithEpochs();

Review Comment:
   Can we change the type of `memberAssignedPartitions` to `Map<topic id, 
Map<partition, assignment epoch>>` and work with epoch maps instead?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For members using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol
+        // Case 1: Strict epoch match

Review Comment:
   nit: Do these cases correspond to the KIP? I would drop the comments 
otherwise.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,21 +192,61 @@ public Builder setState(MemberState state) {
             return this;
         }
 
-        public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> 
assignedPartitions) {
-            this.assignedPartitions = assignedPartitions;
+        public Builder 
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata) {
+            this.classicMemberMetadata = classicMemberMetadata;
             return this;
         }
 
-        public Builder setPartitionsPendingRevocation(Map<Uuid, Set<Integer>> 
partitionsPendingRevocation) {
-            this.partitionsPendingRevocation = partitionsPendingRevocation;
+        public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer, 
Integer>> assignedPartitionsWithEpochs) {
+            this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
             return this;
         }
 
-        public Builder 
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata) {
-            this.classicMemberMetadata = classicMemberMetadata;
+        public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid, 
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+            this.partitionsPendingRevocationWithEpochs = 
partitionsPendingRevocationWithEpochs;
+            return this;
+        }
+
+        /**
+         * Resets the assignment epochs to 0 for all assigned partitions.
+         * Used when a static member leaves, so that the rejoining member

Review Comment:
   ```suggestion
            * Used when a static member leaves, so that the rejoining member's
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For members using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol
+        // Case 1: Strict epoch match
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+        // Case 2: Client epoch > broker epoch, which is an invalid request
+        if (memberEpoch > member.memberEpoch()) {
+            throw new StaleMemberEpochException(String.format("The received 
member epoch %d is larger than "
+                + "the expected member epoch %d.", memberEpoch, 
member.memberEpoch()));
+        }
+        return createAssignmentEpochValidator(member, memberEpoch);

Review Comment:
   nit: Could we follow the streams implementation?
   ```suggestion
   
           // Member epoch is older; validate against per-partition assignment 
epochs.
           return createAssignmentEpochValidator(member, memberEpoch);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
                 "by members using the modern group protocol");
         }
+        // For members using the classic protocol, use strict epoch validation.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For member using the consumer protocol
+        // Case 1: Strict epoch match
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+        // Case 2: Client epoch > broker epoch, which is an invalid request
+        if (memberEpoch > member.memberEpoch()) {
+            throw new StaleMemberEpochException(String.format("The received 
member epoch %d is larger than "
+                + "the expected member epoch %d.", memberEpoch, 
member.memberEpoch()));

Review Comment:
   nit: Could we follow the streams implementation?
   ```suggestion
               throw new StaleMemberEpochException(String.format("Received 
member epoch %d is newer than "
                   + "current member epoch %d.", memberEpoch, 
member.memberEpoch()));
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1184,17 +1240,24 @@ public static ConsumerGroup fromClassicGroup(
             // assignment of the classic group. All the members are put in the 
Stable state. If the classic
             // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
             // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            int memberEpoch = classicGroup.generationId();
+            // Convert assigned partitions to epochs map
+            Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs = 
assignedPartitions.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> e.getValue().stream().collect(Collectors.toMap(p -> 
p, p -> memberEpoch))
+                ));

Review Comment:
   nit: We could move this into a 
`Builder.setAssignedPartitions(assignedPartitions, assignmentEpoch)` 
convenience method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -337,6 +406,50 @@ public Map<Uuid, Set<Integer>> 
partitionsPendingRevocation() {
         return partitionsPendingRevocation;
     }
 
+    /**
+     * @return The epoch-annotated assigned partitions map.
+     */
+    public Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs() {
+        return assignedPartitionsWithEpochs;
+    }
+
+    /**
+     * @return The epoch-annotated pending revocation partitions map.
+     */
+    public Map<Uuid, Map<Integer, Integer>> 
partitionsPendingRevocationWithEpochs() {
+        return partitionsPendingRevocationWithEpochs;
+    }
+
+    /**
+     * Gets the assignment epoch for a specific partition.
+     *
+     * @param topicId     The topic UUID.
+     * @param partitionId The partition index.
+     * @return The epoch at which the partition was assigned, or null if not 
assigned.
+     */
+    public Integer getAssignmentEpoch(Uuid topicId, int partitionId) {
+        Map<Integer, Integer> partitionEpochs = 
assignedPartitionsWithEpochs.get(topicId);
+        if (partitionEpochs != null) {
+            return partitionEpochs.get(partitionId);
+        }
+        return null;
+    }
+
+    /**
+     * Gets the assignment epoch for a partition pending revocation.
+     *
+     * @param topicId     The topic UUID.
+     * @param partitionId The partition index.
+     * @return The epoch at which the partition was assigned, or null if not 
pending revocation.
+     */
+    public Integer getPendingRevocationEpoch(Uuid topicId, int partitionId) {

Review Comment:
   nit: The convention in this class is to omit `get` from method names.
   ```suggestion
       public Integer pendingRevocationEpoch(Uuid topicId, int partitionId) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,21 +192,61 @@ public Builder setState(MemberState state) {
             return this;
         }
 
-        public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> 
assignedPartitions) {
-            this.assignedPartitions = assignedPartitions;
+        public Builder 
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata) {
+            this.classicMemberMetadata = classicMemberMetadata;
             return this;
         }
 
-        public Builder setPartitionsPendingRevocation(Map<Uuid, Set<Integer>> 
partitionsPendingRevocation) {
-            this.partitionsPendingRevocation = partitionsPendingRevocation;
+        public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer, 
Integer>> assignedPartitionsWithEpochs) {
+            this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
             return this;
         }
 
-        public Builder 
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata) {
-            this.classicMemberMetadata = classicMemberMetadata;
+        public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid, 
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+            this.partitionsPendingRevocationWithEpochs = 
partitionsPendingRevocationWithEpochs;
+            return this;
+        }
+
+        /**
+         * Resets the assignment epochs to 0 for all assigned partitions.
+         * Used when a static member leaves, so that the rejoining member
+         * partitions will be assigned from epoch 0 to the new member ID.
+         * All commits using the old member ID will be fenced.
+         */
+        public Builder resetAssignedPartitionsEpochsToZero() {
+            if (this.assignedPartitionsWithEpochs.isEmpty()) {
+                return this;
+            }
+            Map<Uuid, Map<Integer, Integer>> resetEpochs = new HashMap<>();
+            for (Map.Entry<Uuid, Map<Integer, Integer>> entry : 
this.assignedPartitionsWithEpochs.entrySet()) {
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : entry.getValue().keySet()) {
+                    partitionEpochs.put(partitionId, 0);
+                }
+                resetEpochs.put(entry.getKey(), 
Collections.unmodifiableMap(partitionEpochs));
+            }
+            this.assignedPartitionsWithEpochs = 
Collections.unmodifiableMap(resetEpochs);
             return this;
         }
 
+        private static Map<Uuid, Map<Integer, Integer>> 
assignmentWithEpochsFromTopicPartitions(
+            List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions,
+            int defaultEpoch
+        ) {

Review Comment:
   nit: This method should replace `Utils.assignmentFromTopicPartitions` and 
live in `Utils`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
         Map<Uuid, Set<Integer>> memberAssignedPartitions
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-

Review Comment:
   nit: stray newline change



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -337,6 +406,50 @@ public Map<Uuid, Set<Integer>> 
partitionsPendingRevocation() {
         return partitionsPendingRevocation;
     }
 
+    /**
+     * @return The epoch-annotated assigned partitions map.
+     */
+    public Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs() {
+        return assignedPartitionsWithEpochs;
+    }
+
+    /**
+     * @return The epoch-annotated pending revocation partitions map.
+     */
+    public Map<Uuid, Map<Integer, Integer>> 
partitionsPendingRevocationWithEpochs() {
+        return partitionsPendingRevocationWithEpochs;
+    }
+
+    /**
+     * Gets the assignment epoch for a specific partition.
+     *
+     * @param topicId     The topic UUID.
+     * @param partitionId The partition index.
+     * @return The epoch at which the partition was assigned, or null if not 
assigned.
+     */
+    public Integer getAssignmentEpoch(Uuid topicId, int partitionId) {

Review Comment:
   nit: The convention in this class is to omit `get` from method names.
   ```suggestion
       public Integer assignmentEpoch(Uuid topicId, int partitionId) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +852,47 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks per-partition assignment epochs.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch(KIP-1251).
+     *
+     * @param member              The consumer group member.
+     * @param receivedMemberEpoch The member epoch from the offset commit 
request.
+     * @return A validator that checks each partition's assignment epoch.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Check if the partition is in the assigned partitions.
+            // If not found in assigned, check partitions pending revocation.
+            Integer assignmentEpoch = member.getAssignmentEpoch(topicId, 
partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = member.getPendingRevocationEpoch(topicId, 
partitionId);
+            }
+
+            // If client-side epoch != broker-side epoch, and the partition is 
not assigned to this member, reject.
+            if (assignmentEpoch == null) {
+                throw new StaleMemberEpochException(String.format(
+                    "Partition %s-%d is not assigned or pending revocation for 
member %s. " +
+                        "Committing unassigned partitions is only allowed when 
member epoch matches exactly " +
+                        "(received: %d, current: %d).",
+                    topicName, partitionId, member.memberId(), 
receivedMemberEpoch, member.memberEpoch()));
+            }
+
+            // If the received epoch is older than when this partition was 
assigned,
+            // It is a zombie commit and should be rejected.
+            if (receivedMemberEpoch < assignmentEpoch) {
+                throw new StaleMemberEpochException(
+                    String.format("The received member epoch %d is older than 
the assignment epoch %d for partition %s-%d.",
+                        receivedMemberEpoch, assignmentEpoch, topicName, 
partitionId)
+                );
+            }
+        };
+    }

Review Comment:
   nit: Could we follow the streams implementation?
   ```suggestion
       /**
        * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
        * A commit is rejected if the partition is not assigned to the member
        * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
        *
        * @param member The member whose assignments are being validated.
        * @param receivedMemberEpoch The received member epoch.
        * @return A validator for per-partition validation.
        */
       private CommitPartitionValidator createAssignmentEpochValidator(
           ConsumerGroupMember member,
           int receivedMemberEpoch
       ) {
           return (topicName, topicId, partitionId) -> {
               // Search for the partition in assigned partitions, then in 
partitions pending revocation
               Integer assignmentEpoch = member.getAssignmentEpoch(topicId, 
partitionId);
               if (assignmentEpoch == null) {
                   assignmentEpoch = member.getPendingRevocationEpoch(topicId, 
partitionId);
               }
   
               if (assignmentEpoch == null) {
                   throw new StaleMemberEpochException(String.format(
                       "Partition %s-%d is not assigned or pending revocation 
for member.",
                       topicName, partitionId));
               }
   
               if (receivedMemberEpoch < assignmentEpoch) {
                   throw new StaleMemberEpochException(
                       String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
                           receivedMemberEpoch, assignmentEpoch, topicName, 
partitionId)
                   );
               }
           };
       }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -407,19 +422,35 @@ private ConsumerGroupMember computeNextAssignment(
                 !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
             ) || hasUnreleasedPartitions;
 
+            // Build epochs map for assigned partitions (preserve existing 
epochs)
             if (!assignedPartitions.isEmpty()) {
-                newAssignedPartitions.put(topicId, assignedPartitions);
+                Map<Integer, Integer> partitionEpochs = new HashMap<>();
+                for (Integer partitionId : assignedPartitions) {
+                    partitionEpochs.put(partitionId, 
existingTopicEpochs.getOrDefault(partitionId, memberEpoch));

Review Comment:
   The default is never used because `assignedPartitions` comes from 
`member.assignedPartitions()`.
   ```suggestion
                       partitionEpochs.put(partitionId, 
existingTopicEpochs.get(partitionId));
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -66,6 +69,8 @@ public static class Builder {
         private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
         private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
         private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata 
classicMemberMetadata = null;
+        private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs 
= Map.of();

Review Comment:
   > I would leave `assignedPartitions` in place for now but would like to see 
it removed eventually.
   
   Actually if we remove it now, we can call the new methods 
`assignedPartitions` and `partitionsPendingRevocation` without conflict and we 
don't need to invent a view to satisfy the base class.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
         Map<Uuid, Set<Integer>> memberAssignedPartitions
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs;
+        boolean changed = false;
+
         if (subscribedTopicIds.isEmpty() && 
member.partitionsPendingRevocation().isEmpty()) {
-            newAssignedPartitions = Map.of();
-            newPartitionsPendingRevocation = memberAssignedPartitions;
+            newAssignedPartitionsWithEpochs = Map.of();
+            // Move all assigned to pending revocation with their epochs
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.assignedPartitionsWithEpochs());
+            changed = true;
         } else {
-            newAssignedPartitions = memberAssignedPartitions;
-            newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
+            newAssignedPartitionsWithEpochs = new 
HashMap<>(member.assignedPartitionsWithEpochs());
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.partitionsPendingRevocationWithEpochs());
             for (Map.Entry<Uuid, Set<Integer>> entry : 
memberAssignedPartitions.entrySet()) {
                 if (!subscribedTopicIds.contains(entry.getKey())) {
-                    if (newAssignedPartitions == memberAssignedPartitions) {
-                        newAssignedPartitions = new 
HashMap<>(memberAssignedPartitions);
-                        newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
+                    changed = true;
+                    Uuid topicId = entry.getKey();
+                    Map<Integer, Integer> removedEpochs = 
newAssignedPartitionsWithEpochs.remove(topicId);
+                    if (removedEpochs != null) {
+                        newPartitionsPendingRevocationWithEpochs.merge(
+                            topicId,
+                            removedEpochs,
+                            (existing, additional) -> {
+                                Map<Integer, Integer> merged = new 
HashMap<>(existing);
+                                merged.putAll(additional);
+                                return merged;
+                            }
+                        );
                     }
-                    newAssignedPartitions.remove(entry.getKey());
-                    newPartitionsPendingRevocation.merge(
-                        entry.getKey(),
-                        entry.getValue(),
-                        (existing, additional) -> {
-                            existing = new HashSet<>(existing);
-                            existing.addAll(additional);
-                            return existing;
-                        }
-                    );
                 }
             }
         }
 
-        if (newAssignedPartitions == memberAssignedPartitions) {
+        if (!changed) {
             // If no partitions were removed, we can return the member as is.
             return member;
         }
 
+        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = 
newPartitionsPendingRevocationWithEpochs.entrySet().stream()
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                e -> e.getValue().keySet()
+            ));

Review Comment:
   Can we try updating `ownsRevokedPartitions` to accept a `Map<topic id, 
Map<partition, assignment epoch>>` instead?



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##########
@@ -36,7 +36,10 @@
       { "name": "TopicId", "type": "uuid", "versions": "0+",
         "about": "The topic Id." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
-        "about": "The partition Ids." }
+        "about": "The partition Ids." },
+      { "name": "AssignmentEpochs", "versions": "0+", "nullableVersions": "0+",
+        "taggedVersions": "0+", "tag": 0, "type": "[]int32", "default": null,
+        "about": "The epoch at which each partition was assigned to this 
member. Aligned with Partitions array. Used to validate offset commits 
(KIP-1251)." }

Review Comment:
   nit: Could we follow the streams implementation?
   ```suggestion
           "about": "The epoch at which the partition was assigned to the 
member. Used to fence zombie commits requests. Of the same length as 
Partitions." }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to