squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565524884


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -288,22 +289,17 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
 
         // m2 should not be able to acquire foo-1 because the partition is
         // still owned by another member.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+        consumerGroup.updateMember(m2);
+        assertEquals(mkAssignment(mkTopicAssignment(fooTopicId, 1)), 
+            consumerGroup.getOrMaybeCreateMember("m1", 
false).assignedPartitions()
+        );

Review Comment:
   ^ Ignore the above.
   
   We should however test that an `IllegalStateException` is thrown when the 
epoch goes backwards and the update is accepted when the epoch is the same.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
                 }
                 for (Integer partitionId : assignedPartitions) {
-                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-                    if (prevValue != null) {
+                    Integer prevValue = partitionsOrNull.get(partitionId);
+                    if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from bar at epoch N - 1. `Member A { epoch: N - 1, 
assigned partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. `Member A { epoch: N, assigned partitions: [], 
pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to