FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2066166230
########## gradle/dependencies.gradle: ########## @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", Review Comment: Thanks for the suggestion. Updated benchmark result. ``` Benchmark (numReplicasPerBroker) (partitionsPerTopic) (replicationFactor) Mode Cnt Score Error Units TopicHashBenchmark.testLz4 10 10 3 avgt 15 166.389 ± 1.542 ns/op TopicHashBenchmark.testLz4 10 50 3 avgt 15 375.660 ± 2.771 ns/op TopicHashBenchmark.testLz4 10 100 3 avgt 15 636.176 ± 8.305 ns/op TopicHashBenchmark.testMurmur 10 10 3 avgt 15 238.242 ± 1.664 ns/op TopicHashBenchmark.testMurmur 10 50 3 avgt 15 1143.583 ± 5.981 ns/op TopicHashBenchmark.testMurmur 10 100 3 avgt 15 2278.680 ± 29.007 ns/op ``` <details> <summary> TopicHashBenchmark.java </summary> ```java package org.apache.kafka.jmh.metadata; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.image.ClusterDelta; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.streams.state.internals.Murmur3; import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers; @State(Scope.Benchmark) @Fork(value = 1) @Warmup(iterations = 5) @Measurement(iterations = 15) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class TopicHashBenchmark { @Param({"10", "50", "100"}) private int partitionsPerTopic; @Param({"3"}) private int replicationFactor; @Param({"10"}) private int numReplicasPerBroker; private byte[] topicBytes; @Setup(Level.Trial) public void setup() throws IOException { TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY); for (int i = 0; i < numBrokers; i++) { clusterDelta.replay(new RegisterBrokerRecord() .setBrokerId(i) .setRack(Uuid.randomUuid().toString()) ); } TopicImage topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get(); ClusterImage clusterImage = clusterDelta.apply(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { dos.writeByte(0); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImage.partitions().size(); i++) { dos.writeInt(i); // partition id List<String> sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas) .mapToObj(clusterImage::broker) .filter(Objects::nonNull) .map(BrokerRegistration::rack) .filter(Optional::isPresent) .map(Optional::get) .sorted() .toList(); String racks = IntStream.range(0, sortedRacksList.size()) .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value" .collect(Collectors.joining(",")); // Separator between "index:value" pairs dos.writeUTF(racks); // sorted racks } dos.flush(); topicBytes = baos.toByteArray(); } } @Benchmark public void testLz4() { XXHash64 hash = XXHashFactory.fastestInstance().hash64(); hash.hash(topicBytes, 0, topicBytes.length, 0); } @Benchmark public void testMurmur() { Murmur3.hash64(topicBytes); } } ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org