lucliu1108 commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2867077216


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static 
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
     }
 
     /**
-     * Creates a map of topic id and partition set from a list of consumer 
group TopicPartitions.
+     * Creates a map of topic id and partition-epoch map from a list of 
consumer group TopicPartitions.
      *
-     * @param topicPartitionsList   The list of TopicPartitions.
-     * @return a map of topic id and partition set.
+     * @param topicPartitions The list of TopicPartitions.
+     * @param defaultEpoch The default epoch to use when the epoch information 
is not available for a partition.
+     * @return a map of topic id and partition-epoch map.
      */
-    public static Map<Uuid, Set<Integer>> assignmentFromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitionsList
+    public static Map<Uuid, Map<Integer, Integer>> 
assignmentFromTopicPartitions(
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
topicPartitions,
+        int defaultEpoch
     ) {
-        return topicPartitionsList.stream().collect(Collectors.toMap(
-            ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
-            topicPartitions -> Collections.unmodifiableSet(new 
HashSet<>(topicPartitions.partitions()))));
+        // For legacy static member, the defaultEpoch could be 
-2(LEAVE_GROUP_STATIC_MEMBER_EPOCH).
+        // But we want to ensure the default memberEpoch assigned is 
non-negative.
+        int adjustedDefaultEpoch = Math.max(defaultEpoch, 0);

Review Comment:
   I added some tests:
   1. `UtilsTest#testAssignmentFromTopicPartitionsWithNegativeDefaultEpoch` and 
`testAssignmentFromTopicPartitionsWithEpochsProvided` to test static leave 
group and provided epoch
   2. 
`ConsumerGroupMemberTest#testUpdateWithConsumerGroupCurrentMemberAssignmentValueWithNegativeEpoch`
 to test static leave group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to