ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2070951806


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid(
                     regex, ex.getDescription()));
         }
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+    static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+    /**
+     * Computes the hash of the topics in a group.
+     * <p>
+     * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+     * <p>
+     * The hashing process involves the following steps:
+     * 1. Sort the topic hashes by topic name.
+     * 2. Convert each long hash value into a byte array.
+     * 3. Combine the sorted byte arrays to produce a final hash for the group.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        // Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+        // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+        LongFunction<byte[]> longToBytes = (long value) -> new byte[] {
+            (byte) value,
+            (byte) (value >> 8),
+            (byte) (value >> 16),
+            (byte) (value >> 24),
+            (byte) (value >> 32),
+            (byte) (value >> 40),
+            (byte) (value >> 48),
+            (byte) (value >> 56)
+        };
+
+        // Combine the sorted topic hashes.
+        byte[] resultBytes = new byte[8];
+        topicHashes.entrySet()
+            .stream()
+            .sorted(Map.Entry.comparingByKey()) // sort by topic name
+            .map(Map.Entry::getValue)
+            .map(longToBytes::apply)
+            .forEach(nextBytes -> {
+                // Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+                // 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+                for (int i = 0; i < nextBytes.length; i++) {
+                    resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+                }
+            });
+
+        return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+     * <p>
+     * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+     * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+     * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+     * new hash version.
+     * <p>
+     * The hashing process involves the following steps:
+     * 1. Write a magic byte to denote the version of the hash function.
+     * 2. Write the hash code of the topic ID.
+     * 3. Write the UTF-8 encoded topic name.
+     * 4. Write the number of partitions associated with the topic.
+     * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+     *    - Rack identifiers are formatted as "length1:value1,length2:value2" 
to prevent issues with simple separators.
+     *
+     * @param topicImage   The topic image.
+     * @param clusterImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+        try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
+             DataOutputStream dos = new DataOutputStream(bbos)) {
+            dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+            dos.writeLong(topicImage.id().hashCode()); // topic ID
+            dos.writeUTF(topicImage.name()); // topic name
+            dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+            for (int i = 0; i < topicImage.partitions().size(); i++) {
+                dos.writeInt(i); // partition id
+                List<String> sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+                    .mapToObj(clusterImage::broker)
+                    .filter(Objects::nonNull)
+                    .map(BrokerRegistration::rack)
+                    .filter(Optional::isPresent)
+                    .map(Optional::get)
+                    .sorted()
+                    .toList();
+
+                // The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
+                // If using simple separator like "," it may hit edge case 
like ",," and ",,," / ",,," and ",,".
+                // Add length before the rack string to avoid the edge case.
+                String racks = sortedRacksList.stream().map(s -> s.length() + 
":" + s) // Format: "length:value"
+                    .collect(Collectors.joining(",")); // Separator between 
"length:value" pairs
+                dos.writeUTF(racks); // sorted racks
+            }
+            dos.flush();
+            ByteBuffer topicBytes = bbos.buffer().flip();
+            return LZ4_HASH_INSTANCE.hash(topicBytes, 0);

Review Comment:
   There is also a streaming hash class - would that be a better option instead 
of creating the complete byte buffer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to