dajac commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107327586
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata( numMembers ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Computed new subscription metadata: {}.", - groupId, subscriptionMetadata); + log.debug("[GroupId {}] Computed new metadata hash: {}.", + groupId, groupMetadataHash); } bumpGroupEpoch = true; - records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { groupEpoch += 1; - records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); - log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); + log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); + if (group.addSubscriptionMetadataTombstoneRecord()) { Review Comment: nit: I wonder whether we should call it `hasSubscriptionMetadataRecord()`. What do you think? I also suggest to add a comment explaining this block. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -490,6 +490,11 @@ GroupMetadataManager build() { */ private MetadataImage metadataImage; + /** + * The topic hash value by topic name. + */ Review Comment: It would be great if we could expand the comment to explain how we maintain this cache. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map<String, Long> topicHashes) { - if (topicHashes.isEmpty()) { + // Sort entries by topic name + List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>(); + for (Map.Entry<String, Long> entry : topicHashes.entrySet()) { + // Filter out entries with a hash value of 0, which indicates no topic + if (entry.getValue() != 0) { Review Comment: Sure, I understand that we may have nonexistent topics here. I was more trying to understand whether having those zeros in the final hash was an issue. I suppose that you're saying that it is an issue. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata( numMembers ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Computed new subscription metadata: {}.", - groupId, subscriptionMetadata); + log.debug("[GroupId {}] Computed new metadata hash: {}.", + groupId, groupMetadataHash); } bumpGroupEpoch = true; - records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { groupEpoch += 1; - records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); - log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); + log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); + if (group.addSubscriptionMetadataTombstoneRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); + group.setAddSubscriptionMetadataTombstoneRecord(false); Review Comment: We should remove this as it will be updated when the record is replayed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ########## @@ -398,6 +423,21 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } + public static long computeMetadataHash( + Map<String, SubscriptionCount> subscribedTopicNames, + Map<String, Long> topicHashCache, + MetadataImage metadataImage + ) { + Map<String, Long> topicHash = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.keySet().forEach(topicName -> + topicHash.put( + topicName, + topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage)) Review Comment: We may have an issue here. If the topic does not exist, we cache zero. However, we don't remove it when the topic is created, isn't it? Or, is it covered by `topicsDelta.changedTopics()`? It would be great if we could add more tests. We should also add unit tests for this new method. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java: ########## @@ -151,6 +151,8 @@ public String toLowerCaseString() { */ private final TimelineHashMap<String, ResolvedRegularExpression> resolvedRegularExpressions; + private final TimelineObject<Boolean> addSubscriptionMetadataTombstoneRecord; Review Comment: nit: I re-iterate my previous comment here. `hasSubscriptionMetadataRecord` seems a bit nicer. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map<String, Long> topicHashes) { - if (topicHashes.isEmpty()) { + // Sort entries by topic name + List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>(); + for (Map.Entry<String, Long> entry : topicHashes.entrySet()) { + // Filter out entries with a hash value of 0, which indicates no topic + if (entry.getValue() != 0) { Review Comment: btw, it seems that we could move the zero check to L369 to simplify a bit the code. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName)); + + // leave group + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(-2)); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(-2) + .setHeartbeatIntervalMs(0), + result.response() + ); + + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0) + ); + assertRecordsEquals(expectedRecords, result.records()); + assertTrue(context.groupMetadataManager.topicHashCache().isEmpty()); Review Comment: nit: I would prefer to use `assertEquals(Map.of(), context.groupMetadataManager.topicHashCache())`. If there is a failure, it gives you more information. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName)); + + // leave group Review Comment: nit: All the comments in this file starts with a capital letter and ends with a dot. Let's try to state consistent. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -20635,12 +21067,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) ), // Remove regex. List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), - // Updated subscription metadata. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))), // Bumped epoch. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)) + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + barTopicName, barTopicHash + )))) ), result.records() Review Comment: Not related to this line but it would be great if we could add more tests for the cache cleaning (e.g. when a topic is updated or deleted). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org