vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370542261
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1908,6 +1909,715 @@ public void testLeavingMemberBumpsGroupEpoch() {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ // Use zar only here to ensure that metadata needs to be
recomputed.
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar",
"zar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId1, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )));
+ put(memberId2, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )));
+ put(memberId3, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ }
+ }
+ ));
+
+ // Member 3 joins the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId3)
+ .setInstanceId(memberId3)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId3)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember3 = new
ConsumerGroupMember.Builder(memberId3)
+ .setMemberEpoch(11)
+ .setInstanceId(memberId3)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setPartitionsPendingAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedMember3),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId1,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId2,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId3,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+ );
+
+ assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
+ assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ }
+
+ @Test
+ public void testStaticMemberGetsBackAssignmentUponRejoin() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String member2RejoinId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .build();
+
+ // Consumer group with two static members.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ }
+ }))
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId1, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)
+ )));
+ put(memberId2, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ // When the member rejoins, it gets the same assignments.
+ put(member2RejoinId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ )));
+ }
+ }
+ ));
+
+ // Member 2 leaves the consumer group.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(-2)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ // member epoch of the response would be set to -2
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(-2),
+ result.response()
+ );
+
+ ConsumerGroupMember member2UpdatedEpoch = new
ConsumerGroupMember.Builder(member2)
+ .setMemberEpoch(-2)
+ .build();
+ // The departing static member will have it's epoch set to -2.
+ List<Record> expectedRecords = Collections.singletonList(
+ RecordHelpers.newCurrentAssignmentRecord(groupId,
member2UpdatedEpoch)
+ );
+
+ assertEquals(result.records(), expectedRecords);
+ // Replay the updated record with -2 epoch so that the group can
update itself.
+ context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId,
member2UpdatedEpoch));
+
+ // member 2 rejoins the group with the same instance id
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
rejoinResult = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(member2RejoinId)
+ .setGroupId(groupId)
+ .setInstanceId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(member2RejoinId)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
+ rejoinResult.response()
+ );
+
+ ConsumerGroupMember expectedRejoinedMember = new
ConsumerGroupMember.Builder(member2RejoinId)
+ .setMemberEpoch(10)
+ .setInstanceId(memberId2)
+ .setPreviousMemberEpoch(0)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setPartitionsPendingAssignment(mkAssignment(
Review Comment:
@dajac , I realised that even in the static member re-joining case, while
the group epoch doesn't bump, the partitions would be in pending assignment
state. I believe, eventually the member would get it's assignments. In that
case, this state seems correct to me. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]