chia7712 commented on code in PR #19796:
URL: https://github.com/apache/kafka/pull/19796#discussion_r2115934867


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java:
##########
@@ -72,29 +70,8 @@ public List<CoordinatorRecord> build(TopicsImage 
topicsImage) {
             
records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
 member))
         );
 
-        // Add subscription metadata.
-        if (subscriptionMetadata == null) {
-            subscriptionMetadata = new HashMap<>();
-            members.forEach((memberId, member) ->
-                member.subscribedTopicNames().forEach(topicName -> {
-                    TopicImage topicImage = topicsImage.getTopic(topicName);
-                    if (topicImage != null) {
-                        subscriptionMetadata.put(topicName, new TopicMetadata(
-                            topicImage.id(),
-                            topicImage.name(),
-                            topicImage.partitions().size()
-                        ));
-                    }
-                })
-            );
-        }
-
-        if (!subscriptionMetadata.isEmpty()) {
-            
records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId,
 subscriptionMetadata));
-        }
-
         // Add group epoch record.
-        
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 
groupEpoch, 0));
+        
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 
groupEpoch, metadataHash));

Review Comment:
   Should we calculate the `metadataHash` if it is not defined? That is 
analogous to `subscriptionMetadata`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2693,18 +2688,20 @@ Map<Uuid, InitMapValue> 
subscribedTopicsChangeMap(String groupId, Map<String, To
                     .filter(entry -> curTimestamp - 
entry.getValue().timestamp() < delta)
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue))
             );
-
         // Here will add any topics which are subscribed but not initialized 
and initializing
         // topics whose timestamp indicates that they are older than delta 
elapsed.
-        subscriptionMetadata.forEach((topicName, topicMetadata) -> {
-            Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.containsKey(topicMetadata.id()) ? 
alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of();
-            if (alreadyInitializedPartSet.isEmpty() || 
alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
-                Set<Integer> partitionSet = IntStream.range(0, 
topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
-                partitionSet.removeAll(alreadyInitializedPartSet);
-                // alreadyInitialized contains all initialized topics and 
initializing topics which are less than delta old
-                // which means we are putting subscribed topics which are 
unseen or initializing for more than delta. But, we
-                // are also updating the timestamp here which means, old 
initializing will not be included repeatedly.
-                topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k 
-> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp));
+        subscriptionTopicNames.forEach(topicName -> {
+            TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+            if (topicImage != null) {
+                Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.containsKey(topicImage.id()) ? 
alreadyInitialized.get(topicImage.id()).partitions() : Set.of();
+                if (alreadyInitializedPartSet.isEmpty() || 
alreadyInitializedPartSet.size() < topicImage.partitions().size()) {
+                    Set<Integer> partitionSet = IntStream.range(0, 
topicImage.partitions().size()).boxed().collect(Collectors.toSet());

Review Comment:
   Could you consider using `alreadyInitializedPartSet` to filter out the 
partitions from `partitionSet`?
   ```
                       Set<Integer> partitionSet = IntStream.range(0, 
topicImage.partitions().size()).boxed()
                               .filter(p -> 
!alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet());
   ```
   
   It could be a small optimization, I guess :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to