dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2111387854


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -490,6 +490,13 @@ GroupMetadataManager build() {
      */
     private MetadataImage metadataImage;
 
+    /**
+     * The cache for topic hash value by topic name.
+     * A topic hash is calculated when there is a group subscribes to it.
+     * A topic hash is removed when it's updated in MetadataImage or there is 
no group subscribes to it.
+     */
+    private final Map<String, Long> topicHashCache;

Review Comment:
   This is just an idea for future work. I wonder whether we could better 
encapsulate all the logic related to those hashes in a class - e.g. 
MetadataHasher. This class could have a method to compute the hash of a list of 
topic names, it could also have a method to update the MetadataImage, etc. This 
would keep all the logic about maintaining it in one place. As I said, we can 
consider this as a follow up.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -639,6 +751,281 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testNewRacksDataInMetadataImageTriggersEpochBump() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName,
+                    computeTopicHash(fooTopicName, metadataImage))

Review Comment:
   nit: `fooTopicName, computeTopicHash(fooTopicName, metadataImage)`



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addRacks()
+            .build();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+        
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+        // Use LEAVE_GROUP_STATIC_MEMBER_EPOCH to leave group, so there is no 
group subscribes to foo.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH));

Review Comment:
   You should actually use LEAVE_GROUP_MEMBER_EPOCH. 
LEAVE_GROUP_STATIC_MEMBER_EPOCH is only used by static members.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addRacks()
+            .build();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+        
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));

Review Comment:
   nit: Could we use assertEquals?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to