FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066016813


##########
gradle/dependencies.gradle:
##########
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   Based on benchmark result, lz4 has better performance in our case. I will 
change to use it.
   
   ```
   Benchmark                      (numReplicasPerBroker)  (partitionsPerTopic)  
(replicationFactor)  Mode  Cnt     Score    Error  Units
   TopicHashBenchmark.testLz4                         10                    10  
                  3  avgt   15   194.553 ±  1.631  ns/op
   TopicHashBenchmark.testLz4                         10                    50  
                  3  avgt   15   484.640 ±  1.721  ns/op
   TopicHashBenchmark.testLz4                         10                   100  
                  3  avgt   15   883.435 ±  4.001  ns/op
   TopicHashBenchmark.testMurmur                      10                    10  
                  3  avgt   15   205.529 ±  0.701  ns/op
   TopicHashBenchmark.testMurmur                      10                    50  
                  3  avgt   15  1066.528 ± 42.856  ns/op
   TopicHashBenchmark.testMurmur                      10                   100  
                  3  avgt   15  2082.821 ± 10.935  ns/op
   ```
   
   
   <details>
   <summary> TopicHashBenchmark.java
   </summary>
   
   ```java
   package org.apache.kafka.jmh.metadata;
   
   import org.apache.kafka.common.Uuid;
   import org.apache.kafka.common.internals.Murmur3;
   import org.apache.kafka.common.metadata.RegisterBrokerRecord;
   import org.apache.kafka.image.ClusterDelta;
   import org.apache.kafka.image.ClusterImage;
   import org.apache.kafka.image.TopicsDelta;
   import org.apache.kafka.image.TopicImage;
   import org.apache.kafka.metadata.BrokerRegistration;
   
   import net.jpountz.xxhash.XXHash64;
   import net.jpountz.xxhash.XXHashFactory;
   
   import org.openjdk.jmh.annotations.Benchmark;
   import org.openjdk.jmh.annotations.BenchmarkMode;
   import org.openjdk.jmh.annotations.Fork;
   import org.openjdk.jmh.annotations.Level;
   import org.openjdk.jmh.annotations.Measurement;
   import org.openjdk.jmh.annotations.Mode;
   import org.openjdk.jmh.annotations.OutputTimeUnit;
   import org.openjdk.jmh.annotations.Param;
   import org.openjdk.jmh.annotations.Scope;
   import org.openjdk.jmh.annotations.Setup;
   import org.openjdk.jmh.annotations.State;
   import org.openjdk.jmh.annotations.TearDown;
   import org.openjdk.jmh.annotations.Warmup;
   
   import java.io.ByteArrayOutputStream;
   import java.io.DataOutputStream;
   import java.io.IOException;
   import java.util.Arrays;
   import java.util.List;
   import java.util.Objects;
   import java.util.Optional;
   import java.util.concurrent.TimeUnit;
   import java.util.stream.Collectors;
   import java.util.stream.IntStream;
   
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;
   
   @State(Scope.Benchmark)
   @Fork(value = 1)
   @Warmup(iterations = 5)
   @Measurement(iterations = 15)
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.NANOSECONDS)
   public class TopicHashBenchmark {
       @Param({"10", "50", "100"})
       private int partitionsPerTopic;
       @Param({"3"})
       private int replicationFactor;
       @Param({"10"})
       private int numReplicasPerBroker;
   
       private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
       private final DataOutputStream dos = new DataOutputStream(baos);
   
       @Setup(Level.Trial)
       public void setup() throws IOException {
           TopicsDelta topicsDelta = getInitialTopicsDelta(1, 
partitionsPerTopic, replicationFactor, numReplicasPerBroker);
           int numBrokers = getNumBrokers(1, partitionsPerTopic, 
replicationFactor, numReplicasPerBroker);
           ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
           for (int i = 0; i < numBrokers; i++) {
               clusterDelta.replay(new RegisterBrokerRecord()
                   .setBrokerId(i)
                   .setRack(Uuid.randomUuid().toString())
               );
           }
           TopicImage topicImage = 
topicsDelta.apply().topicsById().values().stream().findFirst().get();
           ClusterImage clusterImage = clusterDelta.apply();
   
           dos.writeByte(0); // magic byte
           dos.writeLong(topicImage.id().hashCode()); // topic ID
           dos.writeUTF(topicImage.name()); // topic name
           dos.writeInt(topicImage.partitions().size()); // number of partitions
           for (int i = 0; i < topicImage.partitions().size(); i++) {
               dos.writeInt(i); // partition id
               List<String> sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
                   .mapToObj(clusterImage::broker)
                   .filter(Objects::nonNull)
                   .map(BrokerRegistration::rack)
                   .filter(Optional::isPresent)
                   .map(Optional::get)
                   .sorted()
                   .toList();
   
               String racks = IntStream.range(0, sortedRacksList.size())
                   .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // 
Format: "index:value"
                   .collect(Collectors.joining(",")); // Separator between 
"index:value" pairs
               dos.writeUTF(racks); // sorted racks
           }
           dos.flush();
       }
   
       @TearDown(Level.Trial)
       public void tearDown() throws IOException {
           dos.close();
           baos.close();
       }
   
       @Benchmark
       public void testLz4() {
           XXHash64 hash = XXHashFactory.fastestInstance().hash64();
           hash.hash(baos.toByteArray(), 0, baos.size(), 0);
       }
   
       @Benchmark
       public void testMurmur() {
           Murmur3.hash64(baos.toByteArray());
       }
   }
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to