squah-confluent commented on code in PR #20781:
URL: https://github.com/apache/kafka/pull/20781#discussion_r2469007977
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -498,6 +498,81 @@ public void
testUnrevokedPartitionsToUnreleasedPartitions() {
);
}
+ @Test
+ public void
testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, topic1, 10)
+ .addTopic(topicId2, topic2, 10)
+ .buildCoordinatorMetadataImage();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setSubscribedTopicNames(List.of(topic1, topic2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3),
+ mkTopicAssignment(topicId2, 5, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ // Partition 4 is pending revocation by the member but is back
in the latest target
+ // assignment.
+ mkTopicAssignment(topicId1, 4)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withMetadataImage(metadataImage)
+ .withTargetAssignment(12, new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3, 4),
+ mkTopicAssignment(topicId2, 5, 6, 7))))
+ .withCurrentPartitionEpoch((topicId, partitionId) -> {
+ if (topicId.equals(topicId1)) {
+ // Partitions 2 and 3 are in the member's current
assignment.
+ // Partition 4 is pending revocation by the member.
+ switch (partitionId) {
+ case 2:
+ case 3:
+ case 4:
+ return 10;
+ }
+ } else if (topicId.equals(topicId2)) {
+ // Partitions 5 and 6 are in the member's current
assignment.
+ switch (partitionId) {
+ case 5:
+ case 6:
+ return 10;
+ }
+ }
+ return -1;
+ })
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
Review Comment:
It just needs to be at least 2 more than the previous epoch to be realistic.
Epoch 11's assignment would have moved the partition away from the member and
epoch 12's assignment would have returned the partition. It doesn't actually
impact the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]