dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107327586


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult 
updateSubscriptionMetadata(
             numMembers
         );
 
-        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+        if (groupMetadataHash != group.metadataHash()) {
             if (log.isDebugEnabled()) {
-                log.debug("[GroupId {}] Computed new subscription metadata: 
{}.",
-                    groupId, subscriptionMetadata);
+                log.debug("[GroupId {}] Computed new metadata hash: {}.",
+                    groupId, groupMetadataHash);
             }
             bumpGroupEpoch = true;
-            records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
         }
 
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
-            log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 
groupMetadataHash));
+            log.info("[GroupId {}] Bumped group epoch to {} with metadata hash 
{}.", groupId, groupEpoch, groupMetadataHash);
             metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
         }
 
         group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
 
+        if (group.addSubscriptionMetadataTombstoneRecord()) {

Review Comment:
   nit: I wonder whether we should call it `hasSubscriptionMetadataRecord()`. 
What do you think? I also suggest to add a comment explaining this block.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -490,6 +490,11 @@ GroupMetadataManager build() {
      */
     private MetadataImage metadataImage;
 
+    /**
+     * The topic hash value by topic name.
+     */

Review Comment:
   It would be great if we could expand the comment to explain how we maintain 
this cache.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
      * @return The hash of the group.
      */
     static long computeGroupHash(Map<String, Long> topicHashes) {
-        if (topicHashes.isEmpty()) {
+        // Sort entries by topic name
+        List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>();
+        for (Map.Entry<String, Long> entry : topicHashes.entrySet()) {
+            // Filter out entries with a hash value of 0, which indicates no 
topic
+            if (entry.getValue() != 0) {

Review Comment:
   Sure, I understand that we may have nonexistent topics here. I was more 
trying to understand whether having those zeros in the final hash was an issue. 
I suppose that you're saying that it is an issue.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult 
updateSubscriptionMetadata(
             numMembers
         );
 
-        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+        if (groupMetadataHash != group.metadataHash()) {
             if (log.isDebugEnabled()) {
-                log.debug("[GroupId {}] Computed new subscription metadata: 
{}.",
-                    groupId, subscriptionMetadata);
+                log.debug("[GroupId {}] Computed new metadata hash: {}.",
+                    groupId, groupMetadataHash);
             }
             bumpGroupEpoch = true;
-            records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
         }
 
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
-            log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 
groupMetadataHash));
+            log.info("[GroupId {}] Bumped group epoch to {} with metadata hash 
{}.", groupId, groupEpoch, groupMetadataHash);
             metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
         }
 
         group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
 
+        if (group.addSubscriptionMetadataTombstoneRecord()) {
+            
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+            group.setAddSubscriptionMetadataTombstoneRecord(false);

Review Comment:
   We should remove this as it will be updated when the record is replayed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -398,6 +423,21 @@ public Map<String, TopicMetadata> 
computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    public static long computeMetadataHash(
+        Map<String, SubscriptionCount> subscribedTopicNames,
+        Map<String, Long> topicHashCache,
+        MetadataImage metadataImage
+    ) {
+        Map<String, Long> topicHash = new 
HashMap<>(subscribedTopicNames.size());
+        subscribedTopicNames.keySet().forEach(topicName ->
+            topicHash.put(
+                topicName,
+                topicHashCache.computeIfAbsent(topicName, k -> 
Utils.computeTopicHash(topicName, metadataImage))

Review Comment:
   We may have an issue here. If the topic does not exist, we cache zero. 
However, we don't remove it when the topic is created, isn't it? Or, is it 
covered by `topicsDelta.changedTopics()`? It would be great if we could add 
more tests.
   
   We should also add unit tests for this new method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -151,6 +151,8 @@ public String toLowerCaseString() {
      */
     private final TimelineHashMap<String, ResolvedRegularExpression> 
resolvedRegularExpressions;
 
+    private final TimelineObject<Boolean> 
addSubscriptionMetadataTombstoneRecord;

Review Comment:
   nit: I re-iterate my previous comment here. `hasSubscriptionMetadataRecord` 
seems a bit nicer.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
      * @return The hash of the group.
      */
     static long computeGroupHash(Map<String, Long> topicHashes) {
-        if (topicHashes.isEmpty()) {
+        // Sort entries by topic name
+        List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>();
+        for (Map.Entry<String, Long> entry : topicHashes.entrySet()) {
+            // Filter out entries with a hash value of 0, which indicates no 
topic
+            if (entry.getValue() != 0) {

Review Comment:
   btw, it seems that we could move the zero check to L369 to simplify a bit 
the code.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addRacks()
+            .build();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+        
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+        // leave group
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(-2));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(-2)
+                .setHeartbeatIntervalMs(0),
+            result.response()
+        );
+
+        expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2, 0)
+        );
+        assertRecordsEquals(expectedRecords, result.records());
+        assertTrue(context.groupMetadataManager.topicHashCache().isEmpty());

Review Comment:
   nit: I would prefer to use `assertEquals(Map.of(), 
context.groupMetadataManager.topicHashCache())`. If there is a failure, it 
gives you more information.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addRacks()
+            .build();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+        
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+        // leave group

Review Comment:
   nit: All the comments in this file starts with a capital letter and ends 
with a dot. Let's try to state consistent.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -20635,12 +21067,10 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
                 ),
                 // Remove regex.
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")),
-                // Updated subscription metadata.
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-                    barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
-                ))),
                 // Bumped epoch.
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
0))
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                    barTopicName, barTopicHash
+                ))))
             ),
             result.records()

Review Comment:
   Not related to this line but it would be great if we could add more tests 
for the cache cleaning (e.g. when a topic is updated or deleted). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to