chia7712 commented on code in PR #19796: URL: https://github.com/apache/kafka/pull/19796#discussion_r2115934867
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java: ########## @@ -72,29 +70,8 @@ public List<CoordinatorRecord> build(TopicsImage topicsImage) { records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, member)) ); - // Add subscription metadata. - if (subscriptionMetadata == null) { - subscriptionMetadata = new HashMap<>(); - members.forEach((memberId, member) -> - member.subscribedTopicNames().forEach(topicName -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - subscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } - }) - ); - } - - if (!subscriptionMetadata.isEmpty()) { - records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); - } - // Add group epoch record. - records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, 0)); + records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, metadataHash)); Review Comment: Should we calculate the `metadataHash` if it is not defined? That is analogous to `subscriptionMetadata` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2693,18 +2688,20 @@ Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, To .filter(entry -> curTimestamp - entry.getValue().timestamp() < delta) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) ); - // Here will add any topics which are subscribed but not initialized and initializing // topics whose timestamp indicates that they are older than delta elapsed. - subscriptionMetadata.forEach((topicName, topicMetadata) -> { - Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of(); - if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) { - Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); - partitionSet.removeAll(alreadyInitializedPartSet); - // alreadyInitialized contains all initialized topics and initializing topics which are less than delta old - // which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we - // are also updating the timestamp here which means, old initializing will not be included repeatedly. - topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp)); + subscriptionTopicNames.forEach(topicName -> { + TopicImage topicImage = metadataImage.topics().getTopic(topicName); + if (topicImage != null) { + Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of(); + if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) { + Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().collect(Collectors.toSet()); Review Comment: Could you consider using `alreadyInitializedPartSet` to filter out the partitions from `partitionSet`? ``` Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed() .filter(p -> !alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet()); ``` It could be a small optimization, I guess :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org