dajac commented on code in PR #17444: URL: https://github.com/apache/kafka/pull/17444#discussion_r2047369968
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ########## @@ -348,54 +376,58 @@ public Map<String, Assignment> targetAssignment() { } /** - * @return An immutable Map of subscription metadata for - * each topic that the consumer group is subscribed to. - */ - public Map<String, TopicMetadata> subscriptionMetadata() { - return Collections.unmodifiableMap(subscribedTopicMetadata); - } - - /** - * Updates the subscription metadata. This replaces the previous one. + * Compute metadata hash based on the current subscription info. * - * @param subscriptionMetadata The new subscription metadata. + * @param subscribedTopicNames Map of topic names to the number of subscribers. + * @param metadataImage The current metadata for all available topics. + * @param topicHashCache The cache of topic hashes. */ - public void setSubscriptionMetadata( - Map<String, TopicMetadata> subscriptionMetadata + public long computeMetadataHash( + Map<String, SubscriptionCount> subscribedTopicNames, + MetadataImage metadataImage, + Map<String, Long> topicHashCache ) { - this.subscribedTopicMetadata.clear(); - this.subscribedTopicMetadata.putAll(subscriptionMetadata); + TopicsImage topicsImage = metadataImage.topics(); + List<HashCode> hashCodes = subscribedTopicNames.keySet().stream() + .filter(topicName -> topicsImage.getTopic(topicName) != null) + .map(topicName -> HashCode.fromLong( + topicHashCache.computeIfAbsent( + topicName, + key -> computeTopicHash(topicsImage.getTopic(topicName), metadataImage.cluster()) + ) + )) + .toList(); + return hashCodes.isEmpty() ? 0 : Hashing.combineUnordered( + hashCodes + ).asLong(); } /** - * Computes the subscription metadata based on the current subscription info. - * - * @param subscribedTopicNames Map of topic names to the number of subscribers. - * @param topicsImage The current metadata for all available topics. - * @param clusterImage The current metadata for the Kafka cluster. + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. * - * @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to. - */ - public Map<String, TopicMetadata> computeSubscriptionMetadata( - Map<String, SubscriptionCount> subscribedTopicNames, - TopicsImage topicsImage, - ClusterImage clusterImage - ) { - // Create the topic metadata for each subscribed topic. - Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); - - subscribedTopicNames.forEach((topicName, count) -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - newSubscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } + * @param topicImage The topic image. + * @param clusterImage The cluster image. + */ + public static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(topicImage.id().hashCode()) // topic Id + .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name + .putLong(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { + topicHasher.putInt(entry.getKey()); // partition id + Arrays.stream(entry.getValue().replicas) + .mapToObj(clusterImage::broker) + .filter(Objects::nonNull) + .map(BrokerRegistration::rack) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted() + .forEach(rack -> topicHasher.putString(rack, StandardCharsets.UTF_8)); // sorted racks Review Comment: I think that we could introduce a separator between the racks to avoid this. We just need to find one which is not used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org