dajac commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2111387854
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -490,6 +490,13 @@ GroupMetadataManager build() { */ private MetadataImage metadataImage; + /** + * The cache for topic hash value by topic name. + * A topic hash is calculated when there is a group subscribes to it. + * A topic hash is removed when it's updated in MetadataImage or there is no group subscribes to it. + */ + private final Map<String, Long> topicHashCache; Review Comment: This is just an idea for future work. I wonder whether we could better encapsulate all the logic related to those hashes in a class - e.g. MetadataHasher. This class could have a method to compute the hash of a list of topic names, it could also have a method to update the MetadataImage, etc. This would keep all the logic about maintaining it in one place. As I said, we can consider this as a follow up. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -639,6 +751,281 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testNewRacksDataInMetadataImageTriggersEpochBump() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) Review Comment: nit: `fooTopicName, computeTopicHash(fooTopicName, metadataImage)` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName)); + + // Use LEAVE_GROUP_STATIC_MEMBER_EPOCH to leave group, so there is no group subscribes to foo. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)); Review Comment: You should actually use LEAVE_GROUP_MEMBER_EPOCH. LEAVE_GROUP_STATIC_MEMBER_EPOCH is only used by static members. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName)); Review Comment: nit: Could we use assertEquals? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org