chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066288123


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +221,90 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+    XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        // Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+        LongFunction<byte[]> longToBytes = (long value) -> new byte[] {
+            (byte) value,
+            (byte) (value >> 8),
+            (byte) (value >> 16),
+            (byte) (value >> 24),
+            (byte) (value >> 32),
+            (byte) (value >> 40),
+            (byte) (value >> 48),
+            (byte) (value >> 56)
+        };
+
+        // Combine the sorted topic hashes.
+        byte[] resultBytes = new byte[8];
+        topicHashes.entrySet()
+            .stream()
+            .sorted(Map.Entry.comparingByKey()) // sort by topic name
+            .map(Map.Entry::getValue)
+            .map(longToBytes::apply)
+            .forEach(nextBytes -> {
+                // Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+                // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+                for (int i = 0; i < nextBytes.length; i++) {
+                    resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+                }
+            });
+
+        // Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+        long retVal = (resultBytes[0] & 0xFF);
+        for (int i = 1; i < resultBytes.length; i++) {
+            retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+        }
+        return retVal;
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.

Review Comment:
   please update the docs



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +221,90 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+    XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        // Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+        LongFunction<byte[]> longToBytes = (long value) -> new byte[] {
+            (byte) value,
+            (byte) (value >> 8),
+            (byte) (value >> 16),
+            (byte) (value >> 24),
+            (byte) (value >> 32),
+            (byte) (value >> 40),
+            (byte) (value >> 48),
+            (byte) (value >> 56)
+        };
+
+        // Combine the sorted topic hashes.
+        byte[] resultBytes = new byte[8];
+        topicHashes.entrySet()
+            .stream()
+            .sorted(Map.Entry.comparingByKey()) // sort by topic name
+            .map(Map.Entry::getValue)
+            .map(longToBytes::apply)
+            .forEach(nextBytes -> {
+                // Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+                // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+                for (int i = 0; i < nextBytes.length; i++) {
+                    resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+                }
+            });
+
+        // Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+        long retVal = (resultBytes[0] & 0xFF);
+        for (int i = 1; i < resultBytes.length; i++) {
+            retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+        }
+        return retVal;
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream dos = new DataOutputStream(baos)) {
+            dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+            dos.writeLong(topicImage.id().hashCode()); // topic ID
+            dos.writeUTF(topicImage.name()); // topic name
+            dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+            for (int i = 0; i < topicImage.partitions().size(); i++) {
+                dos.writeInt(i); // partition id
+                List<String> sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+                    .mapToObj(clusterImage::broker)
+                    .filter(Objects::nonNull)
+                    .map(BrokerRegistration::rack)
+                    .filter(Optional::isPresent)
+                    .map(Optional::get)
+                    .sorted()
+                    .toList();
+
+                String racks = IntStream.range(0, sortedRacksList.size())

Review Comment:
   the KIP does not mention the "index" for the rack. could it be replaced by 
`String.join(",", sortedRacksList)`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +221,90 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+    XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        // Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+        LongFunction<byte[]> longToBytes = (long value) -> new byte[] {
+            (byte) value,
+            (byte) (value >> 8),
+            (byte) (value >> 16),
+            (byte) (value >> 24),
+            (byte) (value >> 32),
+            (byte) (value >> 40),
+            (byte) (value >> 48),
+            (byte) (value >> 56)
+        };
+
+        // Combine the sorted topic hashes.
+        byte[] resultBytes = new byte[8];
+        topicHashes.entrySet()
+            .stream()
+            .sorted(Map.Entry.comparingByKey()) // sort by topic name
+            .map(Map.Entry::getValue)
+            .map(longToBytes::apply)
+            .forEach(nextBytes -> {
+                // Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+                // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+                for (int i = 0; i < nextBytes.length; i++) {
+                    resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+                }
+            });
+
+        // Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+        long retVal = (resultBytes[0] & 0xFF);
+        for (int i = 1; i < resultBytes.length; i++) {
+            retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+        }
+        return retVal;
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   We can do a small optimization for it by using `ByteBufferOutputStream`. for 
example:
   
   ```java
           try (var baos = new ByteBufferOutputStream(100);
                var dos = new DataOutputStream(baos)) {
               ...
               dos.flush();
               var topicBytes = baos.buffer().flip();
               return LZ4_HASH_INSTANCE.hash(topicBytes, 0);
           }
   ```
   `LZ4_HASH_INSTANCE.hash` takes an array of ByteBuffer to compute the hash, 
which avoids an array copy.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +221,90 @@ void validateOffsetFetch(
     default boolean shouldExpire() {
         return true;
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+    XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+    /**
+     * Computes the hash of the topics in a group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        // Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+        LongFunction<byte[]> longToBytes = (long value) -> new byte[] {
+            (byte) value,
+            (byte) (value >> 8),
+            (byte) (value >> 16),
+            (byte) (value >> 24),
+            (byte) (value >> 32),
+            (byte) (value >> 40),
+            (byte) (value >> 48),
+            (byte) (value >> 56)
+        };
+
+        // Combine the sorted topic hashes.
+        byte[] resultBytes = new byte[8];
+        topicHashes.entrySet()
+            .stream()
+            .sorted(Map.Entry.comparingByKey()) // sort by topic name
+            .map(Map.Entry::getValue)
+            .map(longToBytes::apply)
+            .forEach(nextBytes -> {
+                // Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+                // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+                for (int i = 0; i < nextBytes.length; i++) {
+                    resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+                }
+            });
+
+        // Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.

Review Comment:
   why not using `LZ4_HASH_INSTANCE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to