dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2111387854
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -490,6 +490,13 @@ GroupMetadataManager build() {
*/
private MetadataImage metadataImage;
+ /**
+ * The cache for topic hash value by topic name.
+ * A topic hash is calculated when there is a group subscribes to it.
+ * A topic hash is removed when it's updated in MetadataImage or there is
no group subscribes to it.
+ */
+ private final Map<String, Long> topicHashCache;
Review Comment:
This is just an idea for future work. I wonder whether we could better
encapsulate all the logic related to those hashes in a class - e.g.
MetadataHasher. This class could have a method to compute the hash of a list of
topic names, it could also have a method to update the MetadataImage, etc. This
would keep all the logic about maintaining it in one place. As I said, we can
consider this as a follow up.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -639,6 +751,281 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testNewRacksDataInMetadataImageTriggersEpochBump() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName,
+ computeTopicHash(fooTopicName, metadataImage))
Review Comment:
nit: `fooTopicName, computeTopicHash(fooTopicName, metadataImage)`
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+ // Use LEAVE_GROUP_STATIC_MEMBER_EPOCH to leave group, so there is no
group subscribes to foo.
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH));
Review Comment:
You should actually use LEAVE_GROUP_MEMBER_EPOCH.
LEAVE_GROUP_STATIC_MEMBER_EPOCH is only used by static members.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
Review Comment:
nit: Could we use assertEquals?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]