This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch feature/artery-frequency-sketch
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 378d21d6722188a718b39f2211c592c72f2bdb2c
Author: He-Pin <[email protected]>
AuthorDate: Mon Jun 1 02:05:35 2026 +0800

    feat: replace CountMinSketch with FastFrequencySketch in Artery compression
    
    Motivation:
    The Artery compression system uses a CountMinSketch (128KB per connection)
    for heavy hitter detection. Pekko already has FastFrequencySketch in
    pekko-actor (used for cluster sharding passivation) which is ~32x more
    memory efficient (~4KB) and includes TinyLFU reset for natural aging of
    frequency data, enabling the heavy hitters table to adapt to changing
    traffic patterns.
    
    Modification:
    - Replace CountMinSketch with FastFrequencySketch in InboundCompression
    - Modify TopHeavyHitters to support non-monotonic weights (needed because
      FrequencySketch's periodic reset halves all counters)
    - Add fixHeapUp method for bidirectional heap property restoration
    - Remove CountMinSketch.java (INTERNAL API, no external users)
    - Update benchmark to use FastFrequencySketch
    - Update legal files to remove CountMinSketch attribution
    - Add tests for weight decrease and heap upward restoration
    
    Result:
    Artery compression uses ~4KB instead of ~128KB per inbound connection.
    Heavy hitters now adapt to changing traffic patterns via TinyLFU aging.
    
    Tests:
    - remote/testOnly ...HeavyHittersSpec: 13/13 passed
    - remote/testOnly ...CompressionTableSpec: 3/3 passed
    - remote/testOnly ...OutboundCompressionSpec: 2/2 passed
    
    Refs: https://github.com/akka/akka-core/issues/31093
---
 .../artery/compress/CountMinSketchBenchmark.scala  |  28 +--
 legal/pekko-remote-jar-license.txt                 |   8 -
 legal/pekko-remote-jar-notice.txt                  |   9 -
 .../remote/artery/compress/CountMinSketch.java     | 264 ---------------------
 .../artery/compress/InboundCompressions.scala      |  14 +-
 .../remote/artery/compress/TopHeavyHitters.scala   |  97 +++++---
 .../remote/artery/compress/HeavyHittersSpec.scala  |  34 +++
 7 files changed, 115 insertions(+), 339 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala
index 7c2ad73078..d991250df4 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/remote/artery/compress/CountMinSketchBenchmark.scala
@@ -18,41 +18,37 @@ import java.util.Random
 import org.openjdk.jmh.annotations._
 import org.openjdk.jmh.infra.Blackhole
 
+import org.apache.pekko.util.FastFrequencySketch
+
 @State(Scope.Benchmark)
 @BenchmarkMode(Array(Mode.Throughput))
 @Fork(2)
 class CountMinSketchBenchmark {
 
-  //  @Param(Array("4", "8", "12", "16"))
-  @Param(Array("16", "256", "4096", "65536"))
-  var w: Int = _
-  @Param(Array("16", "128", "1024"))
-  var d: Int = _
-
-  private val seed: Int = 20160726
+  @Param(Array("256", "1024", "4096"))
+  var capacity: Int = _
 
-  val rand = new Random(seed)
+  val rand = new Random(20160726)
 
-  val preallocateIds = Array.ofDim[Int](8192)
-  val preallocateValues = Array.ofDim[Long](8192)
+  val preallocatedIds = Array.ofDim[Int](8192)
 
-  var countMinSketch: CountMinSketch = _
+  var frequencySketch: FastFrequencySketch[Int] = _
 
   @Setup
   def init(): Unit = {
-    countMinSketch = new CountMinSketch(d, w, seed)
+    frequencySketch = FastFrequencySketch[Int](capacity)
     (0 to 8191).foreach { index =>
-      preallocateIds(index) = rand.nextInt()
-      preallocateValues(index) = Math.abs(rand.nextInt())
+      preallocatedIds(index) = rand.nextInt()
     }
   }
 
   @Benchmark
   @OperationsPerInvocation(8192)
-  def updateRandomNumbers(blackhole: Blackhole): Unit = {
+  def incrementAndFrequency(blackhole: Blackhole): Unit = {
     var i: Int = 0
     while (i < 8192) {
-      
blackhole.consume(countMinSketch.addObjectAndEstimateCount(preallocateIds(i), 
preallocateValues(i)))
+      frequencySketch.increment(preallocatedIds(i))
+      blackhole.consume(frequencySketch.frequency(preallocatedIds(i)))
       i += 1
     }
   }
diff --git a/legal/pekko-remote-jar-license.txt 
b/legal/pekko-remote-jar-license.txt
index 1fbb21b303..e6a71a3f56 100644
--- a/legal/pekko-remote-jar-license.txt
+++ b/legal/pekko-remote-jar-license.txt
@@ -202,14 +202,6 @@
 
 ---------------
 
-pekko-remote contains CountMinSketch.java which contains code derived from 
MurmurHash3,
-written by Austin Appleby. He has placed his code in the public domain.
-The author has disclaimed copyright to that source code.
-CountMinSketch.java also contains additional code developed under an Apache 
2.0 license.
-Copyright 2016 AddThis
-
----------------
-
 pekko-remote contains code from Aeron <https://github.com/real-logic/aeron>.
 
 ./remote/src/test/java/org/apache/pekko/remote/artery/aeron/AeronStat.java
diff --git a/legal/pekko-remote-jar-notice.txt 
b/legal/pekko-remote-jar-notice.txt
index e6cafef8bb..8a9cabaec0 100644
--- a/legal/pekko-remote-jar-notice.txt
+++ b/legal/pekko-remote-jar-notice.txt
@@ -9,12 +9,3 @@ Copyright (C) 2009-2022 Lightbend Inc. 
<https://www.lightbend.com>
 
 Apache Pekko is derived from Akka 2.6.x, the last version that was distributed 
under the
 Apache License, Version 2.0 License.
-
----------------
-
-pekko-remote contains CountMinSketch.java which was developed under an Apache 
2.0 license.
-
-stream-lib
-Copyright 2016 AddThis
-
-This product includes software developed by AddThis.
diff --git 
a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
 
b/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
deleted file mode 100644
index 737841dff3..0000000000
--- 
a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.remote.artery.compress;
-
-import java.nio.ByteOrder;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.util.SWARUtil;
-
-/**
- * INTERNAL API: Count-Min Sketch datastructure.
- *
- * <p>Not thread-safe.
- *
- * <p>An Improved Data Stream Summary: The Count-Min Sketch and its 
Applications
- * 
https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf
- * This implementation is mostly taken and adjusted from the Apache V2 
licensed project
- * `stream-lib`, located here:
- * 
https://github.com/clearspring/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/frequency/CountMinSketch.java
- */
-public class CountMinSketch {
-
-  private int depth;
-  private int width;
-  private long[][] table;
-  private long size;
-  private double eps;
-  private double confidence;
-
-  private int[] recyclableCMSHashBuckets;
-
-  public CountMinSketch(int depth, int width, int seed) {
-    if ((width & (width - 1)) != 0) {
-      throw new IllegalArgumentException("width must be a power of 2, was: " + 
width);
-    }
-    this.depth = depth;
-    this.width = width;
-    this.eps = 2.0 / width;
-    this.confidence = 1 - 1 / Math.pow(2, depth);
-    recyclableCMSHashBuckets = preallocateHashBucketsArray(depth);
-    initTablesWith(depth, width, seed);
-  }
-
-  private void initTablesWith(int depth, int width, int seed) {
-    this.table = new long[depth][width];
-  }
-
-  /** Referred to as {@code epsilon} in the whitepaper */
-  public double relativeError() {
-    return eps;
-  }
-
-  public double confidence() {
-    return confidence;
-  }
-
-  /**
-   * Similar to {@code add}, however we reuse the fact that the hask buckets 
have to be calculated
-   * for {@code add} already, and a separate {@code estimateCount} operation 
would have to calculate
-   * them again, so we do it all in one go.
-   */
-  public long addObjectAndEstimateCount(Object item, long count) {
-    if (count < 0) {
-      // Actually for negative increments we'll need to use the median
-      // instead of minimum, and accuracy will suffer somewhat.
-      // Probably makes sense to add an "allow negative increments"
-      // parameter to constructor.
-      throw new IllegalArgumentException("Negative increments not 
implemented");
-    }
-    Murmur3.hashBuckets(item, recyclableCMSHashBuckets, width);
-    for (int i = 0; i < depth; ++i) {
-      table[i][recyclableCMSHashBuckets[i]] += count;
-    }
-    size += count;
-    return estimateCount(recyclableCMSHashBuckets);
-  }
-
-  public long size() {
-    return size;
-  }
-
-  /**
-   * The estimate is correct within {@code 'epsilon' * (total item count)}, 
with probability {@code
-   * confidence}.
-   */
-  public long estimateCount(Object item) {
-    Murmur3.hashBuckets(item, recyclableCMSHashBuckets, width);
-    return estimateCount(recyclableCMSHashBuckets);
-  }
-
-  /**
-   * The estimate is correct within {@code 'epsilon' * (total item count)}, 
with probability {@code
-   * confidence}.
-   *
-   * @param buckets the "indexes" of buckets from which we want to calculate 
the count
-   */
-  private long estimateCount(int[] buckets) {
-    long res = Long.MAX_VALUE;
-    for (int i = 0; i < depth; ++i) {
-      res = Math.min(res, table[i][buckets[i]]);
-    }
-    return res;
-  }
-
-  /**
-   * Local implementation of murmur3 hash optimized to used in count min sketch
-   *
-   * <p>Inspired by scala (scala.util.hashing.MurmurHash3) and C port of 
MurmurHash3
-   *
-   * <p>scala.util.hashing =>
-   * 
https://github.com/scala/scala/blob/2.12.x/src/library/scala/util/hashing/MurmurHash3.scala
 C
-   * port of MurmurHash3 => 
https://github.com/PeterScott/murmur3/blob/master/murmur3.c
-   */
-  private static class Murmur3 {
-
-    /** Force all bits of the hash to avalanche. Used for finalizing the hash. 
*/
-    private static int avalanche(int hash) {
-      int h = hash;
-
-      h ^= h >>> 16;
-      h *= 0x85ebca6b;
-      h ^= h >>> 13;
-      h *= 0xc2b2ae35;
-      h ^= h >>> 16;
-
-      return h;
-    }
-
-    private static int mixLast(int hash, int data) {
-      int k = data;
-
-      k *= 0xcc9e2d51; // c1
-      k = Integer.rotateLeft(k, 15);
-      k *= 0x1b873593; // c2
-
-      return hash ^ k;
-    }
-
-    private static int mix(int hash, int data) {
-      int h = mixLast(hash, data);
-      h = Integer.rotateLeft(h, 13);
-      return h * 5 + 0xe6546b64;
-    }
-
-    public static int hash(Object o) {
-      if (o == null) {
-        return 0;
-      }
-      if (o instanceof ActorRef) { // TODO possibly scary optimisation
-        // ActorRef hashcode is the ActorPath#uid, which is a random number 
assigned at its
-        // creation,
-        // thus no hashing happens here - the value is already cached.
-        // TODO it should be thought over if this preciseness (just a random 
number, and not
-        // hashing) is good enough here?
-        // this is not cryptographic one, anything which is stable and random 
is good enough
-        return o.hashCode();
-      }
-      if (o instanceof String s) {
-        return hash(s.getBytes());
-      }
-      if (o instanceof Long l) {
-        return hashLong(l, 0);
-      }
-      if (o instanceof Integer i) {
-        return hashLong(i, 0);
-      }
-      if (o instanceof Double d) {
-        return hashLong(Double.doubleToRawLongBits(d), 0);
-      }
-      if (o instanceof Float f) {
-        return hashLong(Float.floatToRawIntBits((Float) o), 0);
-      }
-      if (o instanceof byte[] array) {
-        return bytesHash(array, 0);
-      }
-      return hash(o.toString());
-    }
-
-    static int hashLong(long value, int seed) {
-      int h = seed;
-      h = mix(h, (int) (value));
-      h = mixLast(h, (int) (value >>> 32));
-      return avalanche(h ^ 2);
-    }
-
-    static int bytesHash(final byte[] data, int seed) {
-      int len = data.length;
-      int h = seed;
-
-      // Body
-      int i = 0;
-      while (len >= 4) {
-        int k = SWARUtil.getInt(data, i, ByteOrder.LITTLE_ENDIAN);
-        h = mix(h, k);
-
-        i += 4;
-        len -= 4;
-      }
-
-      // Tail
-      int k = 0;
-      if (len == 3) k ^= (data[i + 2] & 0xFF) << 16;
-      if (len >= 2) k ^= (data[i + 1] & 0xFF) << 8;
-      if (len >= 1) {
-        k ^= (data[i] & 0xFF);
-        h = mixLast(h, k);
-      }
-
-      // Finalization
-      return avalanche(h ^ data.length);
-    }
-
-    /**
-     * Hash item using pair independent hash functions.
-     *
-     * <p>Implementation based on "Less Hashing, Same Performance: Building a 
Better Bloom Filter"
-     * https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
-     *
-     * @param item what should be hashed
-     * @param hashBuckets where hashes should be placed
-     * @param limit value to shrink result
-     */
-    static void hashBuckets(Object item, final int[] hashBuckets, int limit) {
-      final int hash1 = hash(item); // specialized hash for ActorRef and 
Strings
-      final int hash2 = hashLong(hash1, hash1);
-      final int depth = hashBuckets.length;
-      final int mask = limit - 1;
-      for (int i = 0; i < depth; i++) {
-        hashBuckets[i] =
-            Math.abs(
-                (hash1 + i * hash2)
-                    & mask); // shrink done by AND instead MOD. Assume limit 
is power of 2
-      }
-    }
-  }
-
-  private int[] preallocateHashBucketsArray(int depth) {
-    return new int[depth];
-  }
-
-  @Override
-  public String toString() {
-    return "CountMinSketch{"
-        + "confidence="
-        + confidence
-        + ", size="
-        + size
-        + ", depth="
-        + depth
-        + ", width="
-        + width
-        + '}';
-  }
-}
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala
 
b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala
index 4238af2490..63c99e11f3 100644
--- 
a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala
+++ 
b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/InboundCompressions.scala
@@ -28,6 +28,7 @@ import pekko.actor.InternalActorRef
 import pekko.event.Logging
 import pekko.event.LoggingAdapter
 import pekko.remote.artery._
+import pekko.util.FastFrequencySketch
 import pekko.util.OptionVal
 
 /**
@@ -352,7 +353,7 @@ private[remote] abstract class InboundCompression[T >: 
Null](
   private[this] var resendCount = 0
   private[this] val maxResendCount = 3
 
-  private[this] val cms = new CountMinSketch(16, 1024, 
System.currentTimeMillis().toInt)
+  private[this] val frequencySketch = FastFrequencySketch[T](capacity = 
settings.ActorRefs.Max)
 
   log.debug("Initializing {} for originUid [{}]", 
Logging.simpleName(getClass), originUid)
 
@@ -442,8 +443,13 @@ private[remote] abstract class InboundCompression[T >: 
Null](
    * Empty keys are omitted.
    */
   def increment(@nowarn("msg=never used") remoteAddress: Address, value: T, n: 
Long): Unit = {
-    val count = cms.addObjectAndEstimateCount(value, n)
-    addAndCheckIfheavyHitterDetected(value, count)
+    var i = 0
+    while (i < n) {
+      frequencySketch.increment(value)
+      i += 1
+    }
+    val frequency = frequencySketch.frequency(value)
+    addAndCheckIfheavyHitterDetected(value, frequency)
     alive = true
   }
 
@@ -537,7 +543,7 @@ private[remote] abstract class InboundCompression[T >: 
Null](
   }
 
   override def toString =
-    s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: 
$heavyHitters)"""
+    s"""${Logging.simpleName(getClass)}(frequencySketch: $frequencySketch, 
heavyHitters: $heavyHitters)"""
 
 }
 
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala
 
b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala
index be6e27f532..192880c4af 100644
--- 
a/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala
+++ 
b/remote/src/main/scala/org/apache/pekko/remote/artery/compress/TopHeavyHitters.scala
@@ -155,45 +155,46 @@ private[remote] final class TopHeavyHitters[T >: 
Null](val max: Int)(implicit cl
    * @return `true` if the added item has become a heavy hitter.
    */
   // TODO possibly can be optimised further? (there is a benchmark)
-  def update(item: T, count: Long): Boolean =
-    isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a 
heavy hitter anyway
-      val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating 
hashCode
-      val startIndex = hashCode.get & mask
-
-      // We first try to find the slot where an element with an equal hash 
value is. This is a possible candidate
-      // for an actual matching entry (unless it is an entry with a colliding 
hash value).
-      // worst case O(n), common O(1 + alpha), can't really bin search here 
since indexes are kept in synch with other arrays hmm...
-      val candidateIndex = findHashIdx(startIndex, hashCode)
-
-      if (candidateIndex == -1) {
-        // No matching hash value entry is found, so we are sure we don't have 
this entry yet.
-        insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha)
-        true
+  def update(item: T, count: Long): Boolean = {
+    val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating 
hashCode
+    val startIndex = hashCode.get & mask
+
+    // We first try to find the slot where an element with an equal hash value 
is. This is a possible candidate
+    // for an actual matching entry (unless it is an entry with a colliding 
hash value).
+    // worst case O(n), common O(1 + alpha), can't really bin search here 
since indexes are kept in synch with other arrays hmm...
+    val candidateIndex = findHashIdx(startIndex, hashCode)
+
+    if (candidateIndex == -1) {
+      // No matching hash value entry is found, so we are sure we don't have 
this entry yet.
+      // Only insert new entries if the weight qualifies as heavy.
+      isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true }
+    } else {
+      // We now found, relatively cheaply, the first index where our searched 
entry *might* be (hashes are equal).
+      // This is not guaranteed to be the one we are searching for, yet (hash 
values may collide).
+      // From this position we can invoke the more costly search which checks 
actual object equalities.
+      // With this two step search we avoid equality checks completely for 
many non-colliding entries.
+      val actualIdx = findItemIdx(candidateIndex, hashCode, item)
+
+      // usually O(1), worst case O(n) if we need to scan due to hash conflicts
+      if (actualIdx == -1) {
+        // So we don't have this entry so far (only a colliding one, it was a 
false positive from findHashIdx).
+        // Only insert new entries if the weight qualifies as heavy.
+        isHeavy(count) && { insertKnownNewHeavy(hashCode, item, count); true }
       } else {
-        // We now found, relatively cheaply, the first index where our 
searched entry *might* be (hashes are equal).
-        // This is not guaranteed to be the one we are searching for, yet 
(hash values may collide).
-        // From this position we can invoke the more costly search which 
checks actual object equalities.
-        // With this two step search we avoid equality checks completely for 
many non-colliding entries.
-        val actualIdx = findItemIdx(candidateIndex, hashCode, item)
-
-        // usually O(1), worst case O(n) if we need to scan due to hash 
conflicts
-        if (actualIdx == -1) {
-          // So we don't have this entry so far (only a colliding one, it was 
a false positive from findHashIdx).
-          insertKnownNewHeavy(hashCode, item, count) // O(1 + log n), we 
simply replace the current lowest heavy hitter
-          true
-        } else {
-          // The entry exists, let's update it.
-          updateExistingHeavyHitter(actualIdx, count)
-          // not a "new" heavy hitter, since we only replaced it (so it was 
signaled as new once before)
-          false
-        }
+        // The entry exists, let's update it.
+        // Existing heavy hitters always get their weight updated (even if 
decreased),
+        // which is needed when using a frequency sketch with periodic reset 
(aging).
+        updateExistingHeavyHitter(actualIdx, count)
+        // not a "new" heavy hitter, since we only replaced it (so it was 
signaled as new once before)
+        false
       }
-
     }
+  }
 
   /**
    * Checks the lowest weight entry in this structure and returns true if the 
given count is larger than that. In
    * other words this checks if a new entry can be added as it is larger than 
the known least weight.
+   * Note: this only gates insertion of new entries; existing entries always 
get their weight updated.
    */
   private def isHeavy(count: Long): Boolean =
     count > lowestHitterWeight
@@ -233,15 +234,18 @@ private[remote] final class TopHeavyHitters[T >: 
Null](val max: Int)(implicit cl
   /**
    * Replace existing heavy hitter – give it a new `count` value. This will 
also restore the heap property, so this
    * might make a previously lowest hitter no longer be one.
+   *
+   * Supports both weight increase and decrease. When using a frequency sketch 
with periodic reset (aging),
+   * the estimated frequency can decrease, so the weight must be allowed to go 
down as well.
    */
   private def updateExistingHeavyHitter(foundHashIndex: Int, count: Long): 
Unit = {
-    if (weights(foundHashIndex) > count)
-      throw new IllegalArgumentException(
-        s"Weights can be only incremented or kept the same, not decremented. " 
+
-        s"Previous weight was [${weights(foundHashIndex)}], attempted to 
modify it to [$count].")
+    val oldCount = weights(foundHashIndex)
     weights(foundHashIndex) = count // we don't need to change `hashCode`, 
`heapIndex` or `item`, those remain the same
-    // Position in the heap might have changed as count was incremented
-    fixHeap(heapIndex(foundHashIndex))
+    // Position in the heap might have changed as count was updated
+    if (count > oldCount)
+      fixHeap(heapIndex(foundHashIndex)) // weight increased: push down 
towards children
+    else if (count < oldCount)
+      fixHeapUp(heapIndex(foundHashIndex)) // weight decreased: bubble up 
towards parent
   }
 
   /**
@@ -311,6 +315,23 @@ private[remote] final class TopHeavyHitters[T >: Null](val 
max: Int)(implicit cl
     }
   }
 
+  /**
+   * Call this if the weight of an entry at heap node `index` was decremented. 
Since the weight decreased,
+   * the element may now be smaller than its parent, so we need to restore the 
heap property "upwards".
+   */
+  @tailrec
+  private def fixHeapUp(index: Int): Unit = {
+    if (index > 0) {
+      val parentIndex = (index - 1) / 2
+      val currentWeight: Long = weights(heap(index))
+      val parentWeight: Long = weights(heap(parentIndex))
+      if (currentWeight < parentWeight) {
+        swapHeapNode(index, parentIndex)
+        fixHeapUp(parentIndex)
+      }
+    }
+  }
+
   /**
    * Swaps two elements in `heap` array and maintain correct index in 
`heapIndex`.
    *
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala
index 5c61ebe9e4..c1128e7f41 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/compress/HeavyHittersSpec.scala
@@ -170,6 +170,40 @@ class HeavyHittersSpec extends AnyWordSpecLike with 
Matchers {
       hitters.lowestHitterWeight should ===(3)
     }
 
+    "support weight decrease (e.g. from frequency sketch reset)" in {
+      val hitters = new TopHeavyHitters[String](2)
+      hitters.update("A", 10) should ===(true)
+      hitters.update("B", 20) should ===(true)
+      hitters.lowestHitterWeight should ===(10)
+
+      // Simulate frequency sketch reset: weight decreases
+      hitters.update("A", 5) should ===(false)
+      // A is now the lowest hitter
+      hitters.lowestHitterWeight should ===(5)
+
+      // A new item with weight > 5 should replace A
+      hitters.update("C", 8) should ===(true)
+      hitters.iterator.toSet should ===(Set("B", "C"))
+    }
+
+    "restore heap property upward when weight decreases" in {
+      val hitters = new TopHeavyHitters[String](4)
+      hitters.update("A", 10) should ===(true)
+      hitters.update("B", 20) should ===(true)
+      hitters.update("C", 30) should ===(true)
+      hitters.update("D", 40) should ===(true)
+      hitters.lowestHitterWeight should ===(10)
+
+      // Decrease D's weight significantly - it should bubble up to become the 
lowest
+      hitters.update("D", 1) should ===(false)
+      hitters.lowestHitterWeight should ===(1)
+      hitters.iterator.toSet should ===(Set("A", "B", "C", "D"))
+
+      // A new item should replace D (the new lowest)
+      hitters.update("E", 15) should ===(true)
+      hitters.iterator.toSet should ===(Set("A", "B", "C", "E"))
+    }
+
     "be disabled with max=0" in {
       val hitters = new TopHeavyHitters[String](0)
       hitters.update("A", 10) shouldBe true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to