dajac commented on code in PR #17444:
URL: https://github.com/apache/kafka/pull/17444#discussion_r2047369968


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -348,54 +376,58 @@ public Map<String, Assignment> targetAssignment() {
     }
 
     /**
-     * @return An immutable Map of subscription metadata for
-     *         each topic that the consumer group is subscribed to.
-     */
-    public Map<String, TopicMetadata> subscriptionMetadata() {
-        return Collections.unmodifiableMap(subscribedTopicMetadata);
-    }
-
-    /**
-     * Updates the subscription metadata. This replaces the previous one.
+     * Compute metadata hash based on the current subscription info.
      *
-     * @param subscriptionMetadata The new subscription metadata.
+     * @param subscribedTopicNames Map of topic names to the number of 
subscribers.
+     * @param metadataImage        The current metadata for all available 
topics.
+     * @param topicHashCache       The cache of topic hashes.
      */
-    public void setSubscriptionMetadata(
-        Map<String, TopicMetadata> subscriptionMetadata
+    public long computeMetadataHash(
+        Map<String, SubscriptionCount> subscribedTopicNames,
+        MetadataImage metadataImage,
+        Map<String, Long> topicHashCache
     ) {
-        this.subscribedTopicMetadata.clear();
-        this.subscribedTopicMetadata.putAll(subscriptionMetadata);
+        TopicsImage topicsImage = metadataImage.topics();
+        List<HashCode> hashCodes = subscribedTopicNames.keySet().stream()
+            .filter(topicName -> topicsImage.getTopic(topicName) != null)
+            .map(topicName -> HashCode.fromLong(
+                topicHashCache.computeIfAbsent(
+                    topicName,
+                    key -> computeTopicHash(topicsImage.getTopic(topicName), 
metadataImage.cluster())
+                )
+            ))
+            .toList();
+        return hashCodes.isEmpty() ? 0 : Hashing.combineUnordered(
+            hashCodes
+        ).asLong();
     }
 
     /**
-     * Computes the subscription metadata based on the current subscription 
info.
-     *
-     * @param subscribedTopicNames      Map of topic names to the number of 
subscribers.
-     * @param topicsImage               The current metadata for all available 
topics.
-     * @param clusterImage              The current metadata for the Kafka 
cluster.
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
      *
-     * @return An immutable map of subscription metadata for each topic that 
the consumer group is subscribed to.
-     */
-    public Map<String, TopicMetadata> computeSubscriptionMetadata(
-        Map<String, SubscriptionCount> subscribedTopicNames,
-        TopicsImage topicsImage,
-        ClusterImage clusterImage
-    ) {
-        // Create the topic metadata for each subscribed topic.
-        Map<String, TopicMetadata> newSubscriptionMetadata = new 
HashMap<>(subscribedTopicNames.size());
-
-        subscribedTopicNames.forEach((topicName, count) -> {
-            TopicImage topicImage = topicsImage.getTopic(topicName);
-            if (topicImage != null) {
-                newSubscriptionMetadata.put(topicName, new TopicMetadata(
-                    topicImage.id(),
-                    topicImage.name(),
-                    topicImage.partitions().size()
-                ));
-            }
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     */
+    public static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+        HashFunction hf = Hashing.murmur3_128();
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            .putLong(topicImage.id().hashCode()) // topic Id
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putLong(topicImage.partitions().size()); // number of partitions
+
+        
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+            topicHasher.putInt(entry.getKey()); // partition id
+            Arrays.stream(entry.getValue().replicas)
+                .mapToObj(clusterImage::broker)
+                .filter(Objects::nonNull)
+                .map(BrokerRegistration::rack)
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .sorted()
+                .forEach(rack -> topicHasher.putString(rack, 
StandardCharsets.UTF_8)); // sorted racks

Review Comment:
   I think that we could introduce a separator between the racks to avoid this. 
We just need to find one which is not used. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to