lucasbru commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2853141793


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
      * @param memberEpoch               The epoch of the member to use.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember updateCurrentAssignment(
         int memberEpoch,
-        Map<Uuid, Set<Integer>> memberAssignedPartitions
+        Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs;
+
         if (subscribedTopicIds.isEmpty() && 
member.partitionsPendingRevocation().isEmpty()) {
-            newAssignedPartitions = Map.of();
-            newPartitionsPendingRevocation = memberAssignedPartitions;
+            newAssignedPartitionsWithEpochs = Map.of();
+            // Move all assigned to pending revocation with their epochs
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.assignedPartitions());
         } else {
-            newAssignedPartitions = memberAssignedPartitions;
-            newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
-            for (Map.Entry<Uuid, Set<Integer>> entry : 
memberAssignedPartitions.entrySet()) {
+            newAssignedPartitionsWithEpochs = new 
HashMap<>(member.assignedPartitions());

Review Comment:
   @lucliu1108 It seems this is breaking the lazy copy optimization that Sean 
has implemented here. This will copy every time, instead of just when needed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -222,8 +256,12 @@ public Builder 
updateWith(ConsumerGroupCurrentMemberAssignmentValue record) {
             setMemberEpoch(record.memberEpoch());
             setPreviousMemberEpoch(record.previousMemberEpoch());
             setState(MemberState.fromValue(record.state()));
-            
setAssignedPartitions(Utils.assignmentFromTopicPartitions(record.assignedPartitions()));
-            
setPartitionsPendingRevocation(Utils.assignmentFromTopicPartitions(record.partitionsPendingRevocation()));
+            setAssignedPartitions(
+                
Utils.assignmentFromTopicPartitions(record.assignedPartitions(), 
record.memberEpoch())

Review Comment:
   Can it be that memberEpoch is something invalid here, like -2 for a static 
member? It seems, for legacy static member we would get -2 here.
   
   That goes back to the KIP discussion I believe. I proposed to just omit the 
assignment epoch for static members, while David preferred setting them to 0. 
Here, we now have a mix of both situations --
   
   legacy "static member" records get -2 as assignment epoch
   new "static member" records get 0 as assignment epoch
   
   That's a bit unfortunate. Here, we should at least make sure that default >= 
0.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -395,7 +487,13 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
             .setMemberEpoch(memberEpoch)
             .setMemberId(memberId)
             .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
-                .setTopicPartitions(topicPartitionsFromMap(assignedPartitions, 
image)))
+                .setTopicPartitions(topicPartitionsFromMap(

Review Comment:
   Seems we should create a new version of topicPartitionsFromMap which handles 
assignment epochs natively. We definitely should avoid these kinds of 
intermediate collections.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
      * @param memberEpoch               The epoch of the member to use.
-     * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
+     * @param memberAssignedPartitionsWithEpochs  The assigned partitions with 
epochs of the member to use.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember updateCurrentAssignment(
         int memberEpoch,
-        Map<Uuid, Set<Integer>> memberAssignedPartitions
+        Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
 
         // Reuse the original map if no topics need to be removed.
-        Map<Uuid, Set<Integer>> newAssignedPartitions;
-        Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+        Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+        Map<Uuid, Map<Integer, Integer>> 
newPartitionsPendingRevocationWithEpochs;
+
         if (subscribedTopicIds.isEmpty() && 
member.partitionsPendingRevocation().isEmpty()) {
-            newAssignedPartitions = Map.of();
-            newPartitionsPendingRevocation = memberAssignedPartitions;
+            newAssignedPartitionsWithEpochs = Map.of();
+            // Move all assigned to pending revocation with their epochs
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.assignedPartitions());
         } else {
-            newAssignedPartitions = memberAssignedPartitions;
-            newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());
-            for (Map.Entry<Uuid, Set<Integer>> entry : 
memberAssignedPartitions.entrySet()) {
+            newAssignedPartitionsWithEpochs = new 
HashMap<>(member.assignedPartitions());
+            newPartitionsPendingRevocationWithEpochs = new 
HashMap<>(member.partitionsPendingRevocation());
+            for (Map.Entry<Uuid, Map<Integer, Integer>> entry : 
memberAssignedPartitionsWithEpochs.entrySet()) {
                 if (!subscribedTopicIds.contains(entry.getKey())) {
-                    if (newAssignedPartitions == memberAssignedPartitions) {
-                        newAssignedPartitions = new 
HashMap<>(memberAssignedPartitions);
-                        newPartitionsPendingRevocation = new 
HashMap<>(member.partitionsPendingRevocation());

Review Comment:
   @squah-confluent Why another copy? We already copy it above.
   
   Seems like Lucy is fixing a minor duplication here, just making sure we are 
not breaking anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to