FrankYang0529 commented on code in PR #19523: URL: https://github.com/apache/kafka/pull/19523#discussion_r2073493044
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.image.MetadataImage; + +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class UtilsTest { + private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final String FOO_TOPIC_NAME = "foo"; + private static final String BAR_TOPIC_NAME = "bar"; + private static final int FOO_NUM_PARTITIONS = 2; + private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(); + private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); Review Comment: Thanks for the suggestion. I do benchmark for streaming XXH3 / streaming XXH64 / non-streaming XXH3 / non-streaming XXH64. The streaming XXH3 gets the best result. However, it needs to include new library `com.dynatrace.hash4j`. Do we want to import it? cc @chia7712 @dajac ``` Benchmark (numReplicasPerBroker) (partitionsPerTopic) (replicationFactor) Mode Cnt Score Error Units TopicHashBenchmark.testDynatraceStreamingXXH3 10 10 3 avgt 5 879.241 ± 6.788 ns/op TopicHashBenchmark.testDynatraceStreamingXXH3 10 50 3 avgt 5 4192.380 ± 195.424 ns/op TopicHashBenchmark.testDynatraceStreamingXXH3 10 100 3 avgt 5 8027.227 ± 210.403 ns/op TopicHashBenchmark.testDynatraceXXH3 10 10 3 avgt 5 1676.398 ± 2.249 ns/op TopicHashBenchmark.testDynatraceXXH3 10 50 3 avgt 5 9256.175 ± 45.298 ns/op TopicHashBenchmark.testDynatraceXXH3 10 100 3 avgt 5 20195.772 ± 37.651 ns/op TopicHashBenchmark.testLz4StreamingXXHash64 10 10 3 avgt 5 9739.833 ± 188.303 ns/op TopicHashBenchmark.testLz4StreamingXXHash64 10 50 3 avgt 5 45540.195 ± 455.747 ns/op TopicHashBenchmark.testLz4StreamingXXHash64 10 100 3 avgt 5 89084.689 ± 2164.862 ns/op TopicHashBenchmark.testLz4XXHash64 10 10 3 avgt 5 1755.391 ± 6.436 ns/op TopicHashBenchmark.testLz4XXHash64 10 50 3 avgt 5 9421.643 ± 79.838 ns/op TopicHashBenchmark.testLz4XXHash64 10 100 3 avgt 5 19461.960 ± 425.881 ns/op JMH benchmarks done ``` <details> <summary> TopicHashBenchmark.java </summary> ```java package org.apache.kafka.jmh.metadata; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.image.ClusterDelta; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicImage; import org.apache.kafka.metadata.BrokerRegistration; import com.dynatrace.hash4j.hashing.HashStream64; import com.dynatrace.hash4j.hashing.Hashing; import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta; import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers; @State(Scope.Benchmark) @Fork(value = 1) @Warmup(iterations = 3) @Measurement(iterations = 5) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class TopicHashBenchmark { @Param({"10", "50", "100"}) private int partitionsPerTopic; @Param({"3"}) private int replicationFactor; @Param({"10"}) private int numReplicasPerBroker; private TopicImage topicImage; private ClusterImage clusterImage; @Setup(Level.Trial) public void setup() throws IOException { TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker); ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY); for (int i = 0; i < numBrokers; i++) { clusterDelta.replay(new RegisterBrokerRecord() .setBrokerId(i) .setRack(Uuid.randomUuid().toString()) ); } topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get(); clusterImage = clusterDelta.apply(); } @Benchmark public void testLz4StreamingXXHash64() { try (StreamingXXHash64 hash = XXHashFactory.fastestInstance().newStreamingHash64(0)) { hash.update(new byte[]{(byte) 0}, 0, 1); // magic byte // topic id hash.update(intToBytes(topicImage.id().hashCode()), 0, 32); // topic name byte[] topicNameBytes = topicImage.name().getBytes(); hash.update(topicNameBytes, 0, topicNameBytes.length); // number of partitions hash.update(intToBytes(topicImage.partitions().size()), 0, 32); for (int i = 0; i < topicImage.partitions().size(); i++) { // partition id hash.update(intToBytes(i), 0, 32); // sorted racks List<String> racks = new ArrayList<String>(); for (int replicaId : topicImage.partitions().get(i).replicas) { BrokerRegistration broker = clusterImage.broker(replicaId); if (broker != null) { Optional<String> rackOptional = broker.rack(); rackOptional.ifPresent(racks::add); } } Collections.sort(racks); for (String rack : racks) { // Format: "<length><value>" byte[] rackBytes = rack.getBytes(); hash.update(intToBytes(rack.length()), 0, 32); hash.update(rackBytes, 0, rackBytes.length); } } hash.getValue(); } } @Benchmark public void testLz4XXHash64() throws IOException { try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(0); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImage.partitions().size(); i++) { dos.writeInt(i); // partition id // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". // Add length before the rack string to avoid the edge case. List<String> racks = new ArrayList<>(); for (int replicaId : topicImage.partitions().get(i).replicas) { BrokerRegistration broker = clusterImage.broker(replicaId); if (broker != null) { Optional<String> rackOptional = broker.rack(); rackOptional.ifPresent(racks::add); } } Collections.sort(racks); for (String rack : racks) { // Format: "<length><value>" dos.writeInt(rack.length()); dos.writeUTF(rack); } } dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); XXHashFactory.fastestInstance().hash64().hash(topicBytes, 0); } } @Benchmark public void testDynatraceStreamingXXH3() { HashStream64 hash = Hashing.xxh3_64().hashStream(); hash = hash.putByte((byte) 0) .putLong(topicImage.id().hashCode()) .putString(topicImage.name()) .putInt(topicImage.partitions().size()); for (int i = 0; i < topicImage.partitions().size(); i++) { // partition id hash = hash.putInt(i); // sorted racks List<String> racks = new ArrayList<String>(); for (int replicaId : topicImage.partitions().get(i).replicas) { BrokerRegistration broker = clusterImage.broker(replicaId); if (broker != null) { Optional<String> rackOptional = broker.rack(); rackOptional.ifPresent(racks::add); } } Collections.sort(racks); for (String rack : racks) { // Format: "<length><value>" hash.putInt(rack.length()); hash.putString(rack); } } hash.getAsLong(); } @Benchmark public void testDynatraceXXH3() throws IOException { try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512); DataOutputStream dos = new DataOutputStream(bbos)) { dos.writeByte(0); // magic byte dos.writeLong(topicImage.id().hashCode()); // topic ID dos.writeUTF(topicImage.name()); // topic name dos.writeInt(topicImage.partitions().size()); // number of partitions for (int i = 0; i < topicImage.partitions().size(); i++) { dos.writeInt(i); // partition id // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character. // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,". // Add length before the rack string to avoid the edge case. List<String> racks = new ArrayList<>(); for (int replicaId : topicImage.partitions().get(i).replicas) { BrokerRegistration broker = clusterImage.broker(replicaId); if (broker != null) { Optional<String> rackOptional = broker.rack(); rackOptional.ifPresent(racks::add); } } Collections.sort(racks); for (String rack : racks) { // Format: "<length><value>" dos.writeInt(rack.length()); dos.writeUTF(rack); } } dos.flush(); ByteBuffer topicBytes = bbos.buffer().flip(); Hashing.xxh3_64().hashBytesToLong(topicBytes.array()); } } private byte[] intToBytes(int value) { return new byte[] { (byte)(value >>> 24), (byte)(value >>> 16), (byte)(value >>> 8), (byte)value }; } } ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org