FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2073493044


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+    private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+    private static final String FOO_TOPIC_NAME = "foo";
+    private static final String BAR_TOPIC_NAME = "bar";
+    private static final int FOO_NUM_PARTITIONS = 2;
+    private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+        .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+        .addRacks()
+        .build();
+    private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   Thanks for the suggestion. I do benchmark for streaming XXH3 / streaming 
XXH64 / non-streaming XXH3 / non-streaming XXH64. The streaming XXH3 gets the 
best result. However, it needs to include new library `com.dynatrace.hash4j`. 
Do we want to import it?
   
   cc @chia7712 @dajac 
   
   ```
   Benchmark                                      (numReplicasPerBroker)  
(partitionsPerTopic)  (replicationFactor)  Mode  Cnt      Score      Error  
Units
   TopicHashBenchmark.testDynatraceStreamingXXH3                      10        
            10                    3  avgt    5    879.241 ±    6.788  ns/op
   TopicHashBenchmark.testDynatraceStreamingXXH3                      10        
            50                    3  avgt    5   4192.380 ±  195.424  ns/op
   TopicHashBenchmark.testDynatraceStreamingXXH3                      10        
           100                    3  avgt    5   8027.227 ±  210.403  ns/op
   TopicHashBenchmark.testDynatraceXXH3                               10        
            10                    3  avgt    5   1676.398 ±    2.249  ns/op
   TopicHashBenchmark.testDynatraceXXH3                               10        
            50                    3  avgt    5   9256.175 ±   45.298  ns/op
   TopicHashBenchmark.testDynatraceXXH3                               10        
           100                    3  avgt    5  20195.772 ±   37.651  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash64                        10        
            10                    3  avgt    5   9739.833 ±  188.303  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash64                        10        
            50                    3  avgt    5  45540.195 ±  455.747  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash64                        10        
           100                    3  avgt    5  89084.689 ± 2164.862  ns/op
   TopicHashBenchmark.testLz4XXHash64                                 10        
            10                    3  avgt    5   1755.391 ±    6.436  ns/op
   TopicHashBenchmark.testLz4XXHash64                                 10        
            50                    3  avgt    5   9421.643 ±   79.838  ns/op
   TopicHashBenchmark.testLz4XXHash64                                 10        
           100                    3  avgt    5  19461.960 ±  425.881  ns/op
   JMH benchmarks done
   ```
   
   <details>
   <summary> TopicHashBenchmark.java
   </summary>
   
   ```java
   package org.apache.kafka.jmh.metadata;
   
   import org.apache.kafka.common.Uuid;
   import org.apache.kafka.common.metadata.RegisterBrokerRecord;
   import org.apache.kafka.common.utils.ByteBufferOutputStream;
   import org.apache.kafka.image.ClusterDelta;
   import org.apache.kafka.image.ClusterImage;
   import org.apache.kafka.image.TopicsDelta;
   import org.apache.kafka.image.TopicImage;
   import org.apache.kafka.metadata.BrokerRegistration;
   
   import com.dynatrace.hash4j.hashing.HashStream64;
   import com.dynatrace.hash4j.hashing.Hashing;
   
   import net.jpountz.xxhash.StreamingXXHash64;
   import net.jpountz.xxhash.XXHashFactory;
   
   import org.openjdk.jmh.annotations.Benchmark;
   import org.openjdk.jmh.annotations.BenchmarkMode;
   import org.openjdk.jmh.annotations.Fork;
   import org.openjdk.jmh.annotations.Level;
   import org.openjdk.jmh.annotations.Measurement;
   import org.openjdk.jmh.annotations.Mode;
   import org.openjdk.jmh.annotations.OutputTimeUnit;
   import org.openjdk.jmh.annotations.Param;
   import org.openjdk.jmh.annotations.Scope;
   import org.openjdk.jmh.annotations.Setup;
   import org.openjdk.jmh.annotations.State;
   import org.openjdk.jmh.annotations.Warmup;
   
   import java.io.DataOutputStream;
   import java.io.IOException;
   import java.nio.ByteBuffer;
   import java.util.ArrayList;
   import java.util.Collections;
   import java.util.List;
   import java.util.Optional;
   import java.util.concurrent.TimeUnit;
   
   
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;
   
   @State(Scope.Benchmark)
   @Fork(value = 1)
   @Warmup(iterations = 3)
   @Measurement(iterations = 5)
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.NANOSECONDS)
   public class TopicHashBenchmark {
       @Param({"10", "50", "100"})
       private int partitionsPerTopic;
       @Param({"3"})
       private int replicationFactor;
       @Param({"10"})
       private int numReplicasPerBroker;
   
       private TopicImage topicImage;
       private ClusterImage clusterImage;
   
       @Setup(Level.Trial)
       public void setup() throws IOException {
           TopicsDelta topicsDelta = getInitialTopicsDelta(1, 
partitionsPerTopic, replicationFactor, numReplicasPerBroker);
           int numBrokers = getNumBrokers(1, partitionsPerTopic, 
replicationFactor, numReplicasPerBroker);
           ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
           for (int i = 0; i < numBrokers; i++) {
               clusterDelta.replay(new RegisterBrokerRecord()
                   .setBrokerId(i)
                   .setRack(Uuid.randomUuid().toString())
               );
           }
           topicImage = 
topicsDelta.apply().topicsById().values().stream().findFirst().get();
           clusterImage = clusterDelta.apply();
       }
   
       @Benchmark
       public void testLz4StreamingXXHash64() {
           try (StreamingXXHash64 hash = 
XXHashFactory.fastestInstance().newStreamingHash64(0)) {
               hash.update(new byte[]{(byte) 0}, 0, 1); // magic byte
   
               // topic id
               hash.update(intToBytes(topicImage.id().hashCode()), 0, 32);
   
               // topic name
               byte[] topicNameBytes = topicImage.name().getBytes();
               hash.update(topicNameBytes, 0, topicNameBytes.length);
   
               // number of partitions
               hash.update(intToBytes(topicImage.partitions().size()), 0, 32);
   
               for (int i = 0; i < topicImage.partitions().size(); i++) {
                   // partition id
                   hash.update(intToBytes(i), 0, 32);
   
                   // sorted racks
                   List<String> racks = new ArrayList<String>();
                   for (int replicaId : 
topicImage.partitions().get(i).replicas) {
                       BrokerRegistration broker = 
clusterImage.broker(replicaId);
                       if (broker != null) {
                           Optional<String> rackOptional = broker.rack();
                           rackOptional.ifPresent(racks::add);
                       }
                   }
   
                   Collections.sort(racks);
                   for (String rack : racks) {
                       // Format: "<length><value>"
                       byte[] rackBytes = rack.getBytes();
                       hash.update(intToBytes(rack.length()), 0, 32);
                       hash.update(rackBytes, 0, rackBytes.length);
                   }
               }
               hash.getValue();
           }
       }
   
       @Benchmark
       public void testLz4XXHash64() throws IOException {
           try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
                DataOutputStream dos = new DataOutputStream(bbos)) {
               dos.writeByte(0); // magic byte
               dos.writeLong(topicImage.id().hashCode()); // topic ID
               dos.writeUTF(topicImage.name()); // topic name
               dos.writeInt(topicImage.partitions().size()); // number of 
partitions
               for (int i = 0; i < topicImage.partitions().size(); i++) {
                   dos.writeInt(i); // partition id
                   // The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
                   // If using simple separator like "," it may hit edge case 
like ",," and ",,," / ",,," and ",,".
                   // Add length before the rack string to avoid the edge case.
                   List<String> racks = new ArrayList<>();
                   for (int replicaId : 
topicImage.partitions().get(i).replicas) {
                       BrokerRegistration broker = 
clusterImage.broker(replicaId);
                       if (broker != null) {
                           Optional<String> rackOptional = broker.rack();
                           rackOptional.ifPresent(racks::add);
                       }
                   }
   
                   Collections.sort(racks);
                   for (String rack : racks) {
                       // Format: "<length><value>"
                       dos.writeInt(rack.length());
                       dos.writeUTF(rack);
                   }
               }
               dos.flush();
               ByteBuffer topicBytes = bbos.buffer().flip();
               XXHashFactory.fastestInstance().hash64().hash(topicBytes, 0);
           }
       }
   
       @Benchmark
       public void testDynatraceStreamingXXH3() {
           HashStream64 hash = Hashing.xxh3_64().hashStream();
           hash = hash.putByte((byte) 0)
               .putLong(topicImage.id().hashCode())
               .putString(topicImage.name())
               .putInt(topicImage.partitions().size());
   
           for (int i = 0; i < topicImage.partitions().size(); i++) {
               // partition id
               hash = hash.putInt(i);
   
               // sorted racks
               List<String> racks = new ArrayList<String>();
               for (int replicaId : topicImage.partitions().get(i).replicas) {
                   BrokerRegistration broker = clusterImage.broker(replicaId);
                   if (broker != null) {
                       Optional<String> rackOptional = broker.rack();
                       rackOptional.ifPresent(racks::add);
                   }
               }
   
               Collections.sort(racks);
               for (String rack : racks) {
                   // Format: "<length><value>"
                   hash.putInt(rack.length());
                   hash.putString(rack);
               }
           }
           hash.getAsLong();
       }
   
       @Benchmark
       public void testDynatraceXXH3() throws IOException {
           try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
                DataOutputStream dos = new DataOutputStream(bbos)) {
               dos.writeByte(0); // magic byte
               dos.writeLong(topicImage.id().hashCode()); // topic ID
               dos.writeUTF(topicImage.name()); // topic name
               dos.writeInt(topicImage.partitions().size()); // number of 
partitions
               for (int i = 0; i < topicImage.partitions().size(); i++) {
                   dos.writeInt(i); // partition id
                   // The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
                   // If using simple separator like "," it may hit edge case 
like ",," and ",,," / ",,," and ",,".
                   // Add length before the rack string to avoid the edge case.
                   List<String> racks = new ArrayList<>();
                   for (int replicaId : 
topicImage.partitions().get(i).replicas) {
                       BrokerRegistration broker = 
clusterImage.broker(replicaId);
                       if (broker != null) {
                           Optional<String> rackOptional = broker.rack();
                           rackOptional.ifPresent(racks::add);
                       }
                   }
   
                   Collections.sort(racks);
                   for (String rack : racks) {
                       // Format: "<length><value>"
                       dos.writeInt(rack.length());
                       dos.writeUTF(rack);
                   }
               }
               dos.flush();
               ByteBuffer topicBytes = bbos.buffer().flip();
               Hashing.xxh3_64().hashBytesToLong(topicBytes.array());
           }
       }
   
       private byte[] intToBytes(int value) {
           return new byte[] {
               (byte)(value >>> 24),
               (byte)(value >>> 16),
               (byte)(value >>> 8),
               (byte)value
           };
       }
   }
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to