This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch feature/artery-frequency-sketch in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 378d21d6722188a718b39f2211c592c72f2bdb2c Author: He-Pin <[email protected]> AuthorDate: Mon Jun 1 02:05:35 2026 +0800 feat: replace CountMinSketch with FastFrequencySketch in Artery compression Motivation: The Artery compression system uses a CountMinSketch (128KB per connection) for heavy hitter detection. Pekko already has FastFrequencySketch in pekko-actor (used for cluster sharding passivation) which is ~32x more memory efficient (~4KB) and includes TinyLFU reset for natural aging of frequency data, enabling the heavy hitters table to adapt to changing traffic patterns. Modification: - Replace CountMinSketch with FastFrequencySketch in InboundCompression - Modify TopHeavyHitters to support non-monotonic weights (needed because FrequencySketch's periodic reset halves all counters) - Add fixHeapUp method for bidirectional heap property restoration - Remove CountMinSketch.java (INTERNAL API, no external users) - Update benchmark to use FastFrequencySketch - Update legal files to remove CountMinSketch attribution - Add tests for weight decrease and heap upward restoration Result: Artery compression uses ~4KB instead of ~128KB per inbound connection. Heavy hitters now adapt to changing traffic patterns via TinyLFU aging. Tests: - remote/testOnly ...HeavyHittersSpec: 13/13 passed - remote/testOnly ...CompressionTableSpec: 3/3 passed - remote/testOnly ...OutboundCompressionSpec: 2/2 passed Refs: https://github.com/akka/akka-core/issues/31093 --- .../artery/compress/CountMinSketchBenchmark.scala | 28 +-- legal/pekko-remote-jar-license.txt | 8 - legal/pekko-remote-jar-notice.txt | 9 - .../remote/artery/compress/CountMinSketch.java | 264 --------------------- .../artery/compress/InboundCompressions.scala | 14 +- .../remote/artery/compress/TopHeavyHitters.scala | 97 +++++--- .../remote/artery/compress/HeavyHittersSpec.scala | 34 +++ 7 files changed, 115 insertions(+), 339 deletions(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala index 7c2ad73078..d991250df4 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala @@ -18,41 +18,37 @@ import java.util.Random import org.openjdk.jmh.annotations._ import org.openjdk.jmh.infra.Blackhole +import org.apache.pekko.util.FastFrequencySketch + @State(Scope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @Fork(2) class CountMinSketchBenchmark { - // @Param(Array("4", "8", "12", "16")) - @Param(Array("16", "256", "4096", "65536")) - var w: Int = _ - @Param(Array("16", "128", "1024")) - var d: Int = _ - - private val seed: Int = 20160726 + @Param(Array("256", "1024", "4096")) + var capacity: Int = _ - val rand = new Random(seed) + val rand = new Random(20160726) - val preallocateIds = Array.ofDim[Int](8192) - val preallocateValues = Array.ofDim[Long](8192) + val preallocatedIds = Array.ofDim[Int](8192) - var countMinSketch: CountMinSketch = _ + var frequencySketch: FastFrequencySketch[Int] = _ @Setup def init(): Unit = { - countMinSketch = new CountMinSketch(d, w, seed) + frequencySketch = FastFrequencySketch[Int](capacity) (0 to 8191).foreach { index => - preallocateIds(index) = rand.nextInt() - preallocateValues(index) = Math.abs(rand.nextInt()) + preallocatedIds(index) = rand.nextInt() } } @Benchmark @OperationsPerInvocation(8192) - def updateRandomNumbers(blackhole: Blackhole): Unit = { + def incrementAndFrequency(blackhole: Blackhole): Unit = { var i: Int = 0 while (i < 8192) { - blackhole.consume(countMinSketch.addObjectAndEstimateCount(preallocateIds(i), preallocateValues(i))) + frequencySketch.increment(preallocatedIds(i)) + blackhole.consume(frequencySketch.frequency(preallocatedIds(i))) i += 1 } } diff --git a/legal/pekko-remote-jar-license.txt b/legal/pekko-remote-jar-license.txt index 1fbb21b303..e6a71a3f56 100644 --- a/legal/pekko-remote-jar-license.txt +++ b/legal/pekko-remote-jar-license.txt @@ -202,14 +202,6 @@ --------------- -pekko-remote contains CountMinSketch.java which contains code derived from MurmurHash3, -written by Austin Appleby. He has placed his code in the public domain. -The author has disclaimed copyright to that source code. -CountMinSketch.java also contains additional code developed under an Apache 2.0 license. -Copyright 2016 AddThis - ---------------- - pekko-remote contains code from Aeron <https://github.com/real-logic/aeron>. ./remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java diff --git a/legal/pekko-remote-jar-notice.txt b/legal/pekko-remote-jar-notice.txt index e6cafef8bb..8a9cabaec0 100644 --- a/legal/pekko-remote-jar-notice.txt +++ b/legal/pekko-remote-jar-notice.txt @@ -9,12 +9,3 @@ Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com> Apache Pekko is derived from Akka 2.6.x, the last version that was distributed under the Apache License, Version 2.0 License. - ---------------- - -pekko-remote contains CountMinSketch.java which was developed under an Apache 2.0 license. - -stream-lib -Copyright 2016 AddThis - -This product includes software developed by AddThis. diff --git a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java b/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java deleted file mode 100644 index 737841dff3..0000000000 --- a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com> - */ - -package org.apache.pekko.remote.artery.compress; - -import java.nio.ByteOrder; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.util.SWARUtil; - -/** - * INTERNAL API: Count-Min Sketch datastructure. - * - * <p>Not thread-safe. - * - * <p>An Improved Data Stream Summary: The Count-Min Sketch and its Applications - * https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf - * This implementation is mostly taken and adjusted from the Apache V2 licensed project - * `stream-lib`, located here: - * https://github.com/clearspring/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/frequency/CountMinSketch.java - */ -public class CountMinSketch { - - private int depth; - private int width; - private long[][] table; - private long size; - private double eps; - private double confidence; - - private int[] recyclableCMSHashBuckets; - - public CountMinSketch(int depth, int width, int seed) { - if ((width & (width - 1)) != 0) { - throw new IllegalArgumentException("width must be a power of 2, was: " + width); - } - this.depth = depth; - this.width = width; - this.eps = 2.0 / width; - this.confidence = 1 - 1 / Math.pow(2, depth); - recyclableCMSHashBuckets = preallocateHashBucketsArray(depth); - initTablesWith(depth, width, seed); - } - - private void initTablesWith(int depth, int width, int seed) { - this.table = new long[depth][width]; - } - - /** Referred to as {@code epsilon} in the whitepaper */ - public double relativeError() { - return eps; - } - - public double confidence() { - return confidence; - } - - /** - * Similar to {@code add}, however we reuse the fact that the hask buckets have to be calculated - * for {@code add} already, and a separate {@code estimateCount} operation would have to calculate - * them again, so we do it all in one go. - */ - public long addObjectAndEstimateCount(Object item, long count) { - if (count < 0) { - // Actually for negative increments we'll need to use the median - // instead of minimum, and accuracy will suffer somewhat. - // Probably makes sense to add an "allow negative increments" - // parameter to constructor. - throw new IllegalArgumentException("Negative increments not implemented"); - } - Murmur3.hashBuckets(item, recyclableCMSHashBuckets, width); - for (int i = 0; i < depth; ++i) { - table[i][recyclableCMSHashBuckets[i]] += count; - } - size += count; - return estimateCount(recyclableCMSHashBuckets); - } - - public long size() { - return size; - } - - /** - * The estimate is correct within {@code 'epsilon' * (total item count)}, with probability {@code - * confidence}. - */ - public long estimateCount(Object item) { - Murmur3.hashBuckets(item, recyclableCMSHashBuckets, width); - return estimateCount(recyclableCMSHashBuckets); - } - - /** - * The estimate is correct within {@code 'epsilon' * (total item count)}, with probability {@code - * confidence}. - * - * @param buckets the "indexes" of buckets from which we want to calculate the count - */ - private long estimateCount(int[] buckets) { - long res = Long.MAX_VALUE; - for (int i = 0; i < depth; ++i) { - res = Math.min(res, table[i][buckets[i]]); - } - return res; - } - - /** - * Local implementation of murmur3 hash optimized to used in count min sketch - * - * <p>Inspired by scala (scala.util.hashing.MurmurHash3) and C port of MurmurHash3 - * - * <p>scala.util.hashing => - * https://github.com/scala/scala/blob/2.12.x/src/library/scala/util/hashing/MurmurHash3.scala C - * port of MurmurHash3 => https://github.com/PeterScott/murmur3/blob/master/murmur3.c - */ - private static class Murmur3 { - - /** Force all bits of the hash to avalanche. Used for finalizing the hash. */ - private static int avalanche(int hash) { - int h = hash; - - h ^= h >>> 16; - h *= 0x85ebca6b; - h ^= h >>> 13; - h *= 0xc2b2ae35; - h ^= h >>> 16; - - return h; - } - - private static int mixLast(int hash, int data) { - int k = data; - - k *= 0xcc9e2d51; // c1 - k = Integer.rotateLeft(k, 15); - k *= 0x1b873593; // c2 - - return hash ^ k; - } - - private static int mix(int hash, int data) { - int h = mixLast(hash, data); - h = Integer.rotateLeft(h, 13); - return h * 5 + 0xe6546b64; - } - - public static int hash(Object o) { - if (o == null) { - return 0; - } - if (o instanceof ActorRef) { // TODO possibly scary optimisation - // ActorRef hashcode is the ActorPath#uid, which is a random number assigned at its - // creation, - // thus no hashing happens here - the value is already cached. - // TODO it should be thought over if this preciseness (just a random number, and not - // hashing) is good enough here? - // this is not cryptographic one, anything which is stable and random is good enough - return o.hashCode(); - } - if (o instanceof String s) { - return hash(s.getBytes()); - } - if (o instanceof Long l) { - return hashLong(l, 0); - } - if (o instanceof Integer i) { - return hashLong(i, 0); - } - if (o instanceof Double d) { - return hashLong(Double.doubleToRawLongBits(d), 0); - } - if (o instanceof Float f) { - return hashLong(Float.floatToRawIntBits((Float) o), 0); - } - if (o instanceof byte[] array) { - return bytesHash(array, 0); - } - return hash(o.toString()); - } - - static int hashLong(long value, int seed) { - int h = seed; - h = mix(h, (int) (value)); - h = mixLast(h, (int) (value >>> 32)); - return avalanche(h ^ 2); - } - - static int bytesHash(final byte[] data, int seed) { - int len = data.length; - int h = seed; - - // Body - int i = 0; - while (len >= 4) { - int k = SWARUtil.getInt(data, i, ByteOrder.LITTLE_ENDIAN); - h = mix(h, k); - - i += 4; - len -= 4; - } - - // Tail - int k = 0; - if (len == 3) k ^= (data[i + 2] & 0xFF) << 16; - if (len >= 2) k ^= (data[i + 1] & 0xFF) << 8; - if (len >= 1) { - k ^= (data[i] & 0xFF); - h = mixLast(h, k); - } - - // Finalization - return avalanche(h ^ data.length); - } - - /** - * Hash item using pair independent hash functions. - * - * <p>Implementation based on "Less Hashing, Same Performance: Building a Better Bloom Filter" - * https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf - * - * @param item what should be hashed - * @param hashBuckets where hashes should be placed - * @param limit value to shrink result - */ - static void hashBuckets(Object item, final int[] hashBuckets, int limit) { - final int hash1 = hash(item); // specialized hash for ActorRef and Strings - final int hash2 = hashLong(hash1, hash1); - final int depth = hashBuckets.length; - final int mask = limit - 1; - for (int i = 0; i < depth; i++) { - hashBuckets[i] = - Math.abs( - (hash1 + i * hash2) - & mask); // shrink done by AND instead MOD. Assume limit is power of 2 - } - } - } - - private int[] preallocateHashBucketsArray(int depth) { - return new int[depth]; - } - - @Override - public String toString() { - return "CountMinSketch{" - + "confidence=" - + confidence - + ", size=" - + size - + ", depth=" - + depth - + ", width=" - + width - + '}'; - } -} diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala index 4238af2490..63c99e11f3 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala @@ -28,6 +28,7 @@ import pekko.actor.InternalActorRef import pekko.event.Logging import pekko.event.LoggingAdapter import pekko.remote.artery._ +import pekko.util.FastFrequencySketch import pekko.util.OptionVal /** @@ -352,7 +353,7 @@ private[remote] abstract class InboundCompression[T >: Null]( private[this] var resendCount = 0 private[this] val maxResendCount = 3 - private[this] val cms = new CountMinSketch(16, 1024, System.currentTimeMillis().toInt) + private[this] val frequencySketch = FastFrequencySketch[T](capacity = settings.ActorRefs.Max) log.debug("Initializing {} for originUid [{}]", Logging.simpleName(getClass), originUid) @@ -442,8 +443,13 @@ private[remote] abstract class InboundCompression[T >: Null]( * Empty keys are omitted. */ def increment(@nowarn("msg=never used") remoteAddress: Address, value: T, n: Long): Unit = { - val count = cms.addObjectAndEstimateCount(value, n) - addAndCheckIfheavyHitterDetected(value, count) + var i = 0 + while (i < n) { + frequencySketch.increment(value) + i += 1 + } + val frequency = frequencySketch.frequency(value) + addAndCheckIfheavyHitterDetected(value, frequency) alive = true } @@ -537,7 +543,7 @@ private[remote] abstract class InboundCompression[T >: Null]( } override def toString = - s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)""" + s"""${Logging.simpleName(getClass)}(frequencySketch: $frequencySketch, heavyHitters: $heavyHitters)""" } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala index be6e27f532..192880c4af 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala @@ -155,45 +155,46 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl * @return `true` if the added item has become a heavy hitter. */ // TODO possibly can be optimised further? (there is a benchmark) - def update(item: T, count: Long): Boolean = - isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway - val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode - val startIndex = hashCode.get & mask - - // We first try to find the slot where an element with an equal hash value is. This is a possible candidate - // for an actual matching entry (unless it is an entry with a colliding hash value). - // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... - val candidateIndex = findHashIdx(startIndex, hashCode) - - if (candidateIndex == -1) { - // No matching hash value entry is found, so we are sure we don't have this entry yet. - insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha) - true + def update(item: T, count: Long): Boolean = { + val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode + val startIndex = hashCode.get & mask + + // We first try to find the slot where an element with an equal hash value is. This is a possible candidate + // for an actual matching entry (unless it is an entry with a colliding hash value). + // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... + val candidateIndex = findHashIdx(startIndex, hashCode) + + if (candidateIndex == -1) { + // No matching hash value entry is found, so we are sure we don't have this entry yet. + // Only insert new entries if the weight qualifies as heavy. + isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true } + } else { + // We now found, relatively cheaply, the first index where our searched entry *might* be (hashes are equal). + // This is not guaranteed to be the one we are searching for, yet (hash values may collide). + // From this position we can invoke the more costly search which checks actual object equalities. + // With this two step search we avoid equality checks completely for many non-colliding entries. + val actualIdx = findItemIdx(candidateIndex, hashCode, item) + + // usually O(1), worst case O(n) if we need to scan due to hash conflicts + if (actualIdx == -1) { + // So we don't have this entry so far (only a colliding one, it was a false positive from findHashIdx). + // Only insert new entries if the weight qualifies as heavy. + isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true } } else { - // We now found, relatively cheaply, the first index where our searched entry *might* be (hashes are equal). - // This is not guaranteed to be the one we are searching for, yet (hash values may collide). - // From this position we can invoke the more costly search which checks actual object equalities. - // With this two step search we avoid equality checks completely for many non-colliding entries. - val actualIdx = findItemIdx(candidateIndex, hashCode, item) - - // usually O(1), worst case O(n) if we need to scan due to hash conflicts - if (actualIdx == -1) { - // So we don't have this entry so far (only a colliding one, it was a false positive from findHashIdx). - insertKnownNewHeavy(hashCode, item, count) // O(1 + log n), we simply replace the current lowest heavy hitter - true - } else { - // The entry exists, let's update it. - updateExistingHeavyHitter(actualIdx, count) - // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) - false - } + // The entry exists, let's update it. + // Existing heavy hitters always get their weight updated (even if decreased), + // which is needed when using a frequency sketch with periodic reset (aging). + updateExistingHeavyHitter(actualIdx, count) + // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) + false } - } + } /** * Checks the lowest weight entry in this structure and returns true if the given count is larger than that. In * other words this checks if a new entry can be added as it is larger than the known least weight. + * Note: this only gates insertion of new entries; existing entries always get their weight updated. */ private def isHeavy(count: Long): Boolean = count > lowestHitterWeight @@ -233,15 +234,18 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl /** * Replace existing heavy hitter – give it a new `count` value. This will also restore the heap property, so this * might make a previously lowest hitter no longer be one. + * + * Supports both weight increase and decrease. When using a frequency sketch with periodic reset (aging), + * the estimated frequency can decrease, so the weight must be allowed to go down as well. */ private def updateExistingHeavyHitter(foundHashIndex: Int, count: Long): Unit = { - if (weights(foundHashIndex) > count) - throw new IllegalArgumentException( - s"Weights can be only incremented or kept the same, not decremented. " + - s"Previous weight was [${weights(foundHashIndex)}], attempted to modify it to [$count].") + val oldCount = weights(foundHashIndex) weights(foundHashIndex) = count // we don't need to change `hashCode`, `heapIndex` or `item`, those remain the same - // Position in the heap might have changed as count was incremented - fixHeap(heapIndex(foundHashIndex)) + // Position in the heap might have changed as count was updated + if (count > oldCount) + fixHeap(heapIndex(foundHashIndex)) // weight increased: push down towards children + else if (count < oldCount) + fixHeapUp(heapIndex(foundHashIndex)) // weight decreased: bubble up towards parent } /** @@ -311,6 +315,23 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl } } + /** + * Call this if the weight of an entry at heap node `index` was decremented. Since the weight decreased, + * the element may now be smaller than its parent, so we need to restore the heap property "upwards". + */ + @tailrec + private def fixHeapUp(index: Int): Unit = { + if (index > 0) { + val parentIndex = (index - 1) / 2 + val currentWeight: Long = weights(heap(index)) + val parentWeight: Long = weights(heap(parentIndex)) + if (currentWeight < parentWeight) { + swapHeapNode(index, parentIndex) + fixHeapUp(parentIndex) + } + } + } + /** * Swaps two elements in `heap` array and maintain correct index in `heapIndex`. * diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala index 5c61ebe9e4..c1128e7f41 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala @@ -170,6 +170,40 @@ class HeavyHittersSpec extends AnyWordSpecLike with Matchers { hitters.lowestHitterWeight should ===(3) } + "support weight decrease (e.g. from frequency sketch reset)" in { + val hitters = new TopHeavyHitters[String](2) + hitters.update("A", 10) should ===(true) + hitters.update("B", 20) should ===(true) + hitters.lowestHitterWeight should ===(10) + + // Simulate frequency sketch reset: weight decreases + hitters.update("A", 5) should ===(false) + // A is now the lowest hitter + hitters.lowestHitterWeight should ===(5) + + // A new item with weight > 5 should replace A + hitters.update("C", 8) should ===(true) + hitters.iterator.toSet should ===(Set("B", "C")) + } + + "restore heap property upward when weight decreases" in { + val hitters = new TopHeavyHitters[String](4) + hitters.update("A", 10) should ===(true) + hitters.update("B", 20) should ===(true) + hitters.update("C", 30) should ===(true) + hitters.update("D", 40) should ===(true) + hitters.lowestHitterWeight should ===(10) + + // Decrease D's weight significantly - it should bubble up to become the lowest + hitters.update("D", 1) should ===(false) + hitters.lowestHitterWeight should ===(1) + hitters.iterator.toSet should ===(Set("A", "B", "C", "D")) + + // A new item should replace D (the new lowest) + hitters.update("E", 15) should ===(true) + hitters.iterator.toSet should ===(Set("A", "B", "C", "E")) + } + "be disabled with max=0" in { val hitters = new TopHeavyHitters[String](0) hitters.update("A", 10) shouldBe true --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
