dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2059807814


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        return Hashing.combineOrdered(
+            topicHashes.entrySet()
+                .stream()
+                .sorted(Map.Entry.comparingByKey())
+                .map(e -> HashCode.fromLong(e.getValue()))
+                .toList()
+        ).asLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+        HashFunction hf = Hashing.murmur3_128();
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            .putLong(topicImage.id().hashCode()) // topic Id
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putInt(topicImage.partitions().size()); // number of partitions
+
+        
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+            topicHasher.putInt(entry.getKey()); // partition id
+            String racks = Arrays.stream(entry.getValue().replicas)
+                .mapToObj(clusterImage::broker)
+                .filter(Objects::nonNull)
+                .map(BrokerRegistration::rack)
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .sorted()
+                .collect(Collectors.joining(";"));

Review Comment:
   `;` is allowed in the `rack` field too so it does really protect us.



##########
gradle/dependencies.gradle:
##########
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   This is something that we haven't really discussed in the KIP because it is 
an implementation detail but we should discuss whether we really want to take a 
dependency on Guava.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {

Review Comment:
   I wonder whether it is worth inlining the implementation from Guava or 
something similar to combine the hashes. It would avoid the extra collections. 
I am not sure whether it makes a real difference though. What are your thoughts?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        return Hashing.combineOrdered(
+            topicHashes.entrySet()
+                .stream()
+                .sorted(Map.Entry.comparingByKey())
+                .map(e -> HashCode.fromLong(e.getValue()))
+                .toList()
+        ).asLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+        HashFunction hf = Hashing.murmur3_128();
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            .putLong(topicImage.id().hashCode()) // topic Id
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putInt(topicImage.partitions().size()); // number of partitions
+
+        
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {

Review Comment:
   We know that partitions go from 0 to N. I wonder whether we should use a 
good old for loop instead of sorting the partitions. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        return Hashing.combineOrdered(
+            topicHashes.entrySet()
+                .stream()
+                .sorted(Map.Entry.comparingByKey())
+                .map(e -> HashCode.fromLong(e.getValue()))
+                .toList()
+        ).asLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+        HashFunction hf = Hashing.murmur3_128();
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte

Review Comment:
   Should we define a constant for the magic byte? 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {

Review Comment:
   In the KIP, you also mentioned combining the index with the hash. Is this 
something done within `combineOrdered`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to