This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 28d61c5368 perf: replace ArrayList consumer wheel with LongMap for
O(1) keyed removal (#3063)
28d61c5368 is described below
commit 28d61c53684dcd22302560220698d74a7c5da789
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Jun 14 19:15:54 2026 +0800
perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal
(#3063)
* perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal
Motivation:
BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is
O(k) per event with lambda allocation on every call. In high-fan-out
scenarios (thousands of consumers clustered in the same wheel slot),
this creates a producer backpressure bottleneck: the head can only
advance after the head slot is empty, and draining a large slot
requires k linear scans each of O(k) cost.
Modification:
Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]]
keyed by Consumer.id. Slots are lazily allocated (null = empty) and
released to null when drained, eliminating baseline memory for empty
slots and enabling GC of drained LongMaps.
Hot path uses getOrNull + -= (two primitive hash lookups) instead of
remove (which would allocate Option), achieving zero heap allocation
per add/remove cycle. No Long boxing since LongMap stores primitive
long keys.
Adds null guards in Advance/NeedWakeup event handlers to prevent
latent NPE when findAndRemoveConsumer returns null. Updates
onUpstreamFailure and wakeupIdx to skip null (empty) slots.
Result:
Consumer add/remove is O(1) with zero Long boxing and zero Option
allocation. High-consumer lockstep scenarios see dramatically reduced
producer backpressure from wheel slot contention. Memory for empty
wheel slots drops from ~40 bytes per ArrayList to 0 (null).
Tests:
- sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed
- sbt "++3.3.8; stream/compile" → success
- sbt "stream/mimaReportBinaryIssues" → no issues
- sbt "bench-jmh/compile" → success
References:
Inspired by akkadotnet/akka.net#8264 (Dictionary-based consumer wheel).
Pekko uses scala.collection.mutable.LongMap instead of HashMap for
zero boxing on Long keys and contiguous open-addressing memory layout.
* perf: add standalone BroadcastHub wheel benchmark runner
Adds BroadcastHubBenchRunner for direct measurement of consumer wheel
throughput under high-fan-out scenarios, bypassing JMH infrastructure
classpath issues in the bench-jmh module.
Measures lockstep broadcast throughput at 4 consumer counts (64, 256,
1000, 2000) across 2 buffer sizes (64, 256) with 2 warmup + 3 measured
runs per configuration.
Results on Apple M-series (elements/sec, higher is better):
Buffer=64 (128 wheel slots, max clustering):
64 consumers: 296,756 elem/s
256 consumers: 76,075 elem/s
1000 consumers: 19,737 elem/s
2000 consumers: 10,223 elem/s
Buffer=256 (512 wheel slots, moderate clustering):
64 consumers: 1,148,340 elem/s
256 consumers: 271,505 elem/s
1000 consumers: 70,727 elem/s
2000 consumers: 33,717 elem/s
Throughput degrades gracefully with consumer count, demonstrating the
O(1) LongMap removal holds up under high per-slot contention.
Tests:
- sbt "bench-jmh/compile" → success
- sbt "bench-jmh/runMain org.apache.pekko.stream.BroadcastHubBenchRunner" →
completed
References:
Refs #3063
* perf: add license header to BroadcastHubBenchRunner and comparison results
Run headerCreateAll for the new benchmark runner file.
Comparison benchmark results (old ArrayList vs new LongMap):
Buffer=64 (128 wheel slots):
Consumers ArrayList(elem/s) LongMap(elem/s) Speedup
64 305,657 296,756 0.97x
256 72,446 76,075 1.05x
1000 13,070 19,737 1.51x
2000 4,348 10,223 2.35x
Buffer=256 (512 wheel slots):
Consumers ArrayList(elem/s) LongMap(elem/s) Speedup
64 1,099,345 1,148,340 1.04x
256 197,676 271,505 1.37x
1000 27,804 70,727 2.54x
2000 7,943 33,717 4.24x
The LongMap optimization provides 2.35x-4.24x speedup at 2000 consumers,
with the gap widening as consumer count increases — confirming the O(k)
linear scan was the dominant bottleneck.
Tests:
- sbt "bench-jmh/headerCreateAll" → header created
- sbt "bench-jmh/compile" → success
References:
Refs #3063
---
.../pekko/stream/BroadcastHubBenchRunner.scala | 113 +++++++++++++++++++++
.../pekko/stream/BroadcastHubBenchmark.scala | 50 ++++++++-
.../org/apache/pekko/stream/scaladsl/HubSpec.scala | 45 ++++++++
.../org/apache/pekko/stream/scaladsl/Hub.scala | 72 ++++++++-----
4 files changed, 254 insertions(+), 26 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
new file mode 100644
index 0000000000..0efe9e8d49
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.remote.artery.{ BenchTestSource, LatchSink }
+import org.apache.pekko.stream.scaladsl._
+
+import com.typesafe.config.ConfigFactory
+
+/**
+ * Standalone benchmark runner for BroadcastHub consumer wheel performance.
+ * Run with: sbt "bench-jmh/runMain
org.apache.pekko.stream.BroadcastHubBenchRunner"
+ */
+object BroadcastHubBenchRunner {
+
+ final val Elements = 100000
+ final val SmallBuffer = 64
+ final val LargeBuffer = 256
+ final val WarmupRuns = 2
+ final val MeasureRuns = 3
+
+ def main(args: Array[String]): Unit = {
+ val config = ConfigFactory.parseString("""
+ pekko.actor.default-dispatcher {
+ executor = "fork-join-executor"
+ fork-join-executor {
+ parallelism-factor = 1
+ }
+ }
+ """)
+
+ val consumerCounts = Array(64, 256, 1000, 2000)
+
+ println("=" * 80)
+ println("BroadcastHub Consumer Wheel Benchmark")
+ println(s"Elements per run: $Elements")
+ println(s"Warmup: $WarmupRuns runs, Measure: $MeasureRuns runs")
+ println("=" * 80)
+
+ for (bufferSize <- Array(SmallBuffer, LargeBuffer)) {
+ println(s"\n--- Buffer size: $bufferSize (wheel slots: ${bufferSize *
2}) ---")
+ println(f"${"Consumers"}%-12s ${"Avg (elem/s)"}%16s ${"Min"}%12s
${"Max"}%12s ${"StdDev"}%10s")
+ println("-" * 70)
+
+ for (consumerCount <- consumerCounts) {
+ implicit val system: ActorSystem =
ActorSystem(s"bench-$consumerCount-$bufferSize", config)
+
+ // eager init
+ SystemMaterializer(system).materializer
+
+ val results = new Array[Double](WarmupRuns + MeasureRuns)
+
+ for (run <- 0 until WarmupRuns + MeasureRuns) {
+ val latch = new CountDownLatch(consumerCount)
+ val broadcastSink =
+ BroadcastHub.sink[java.lang.Integer](bufferSize = bufferSize,
startAfterNrOfConsumers = consumerCount)
+ val testSource = Source.fromGraph(new BenchTestSource(Elements))
+ val source = testSource.runWith(broadcastSink)
+
+ val start = System.nanoTime()
+ var idx = 0
+ while (idx < consumerCount) {
+ source.runWith(new LatchSink(Elements, latch))
+ idx += 1
+ }
+
+ if (!latch.await(120, TimeUnit.SECONDS)) {
+ println(s" TIMEOUT at consumers=$consumerCount buffer=$bufferSize
run=$run")
+ Await.result(system.terminate(), 10.seconds)
+ System.exit(1)
+ }
+ val elapsed = (System.nanoTime() - start) / 1e9
+ results(run) = Elements / elapsed
+ }
+
+ val measured = results.drop(WarmupRuns)
+ val avg = measured.sum / measured.length
+ val min = measured.min
+ val max = measured.max
+ val variance = measured.map(x => (x - avg) * (x - avg)).sum /
measured.length
+ val stddev = math.sqrt(variance)
+
+ println(f"$consumerCount%-12d $avg%16.0f $min%12.0f $max%12.0f
$stddev%10.0f")
+
+ Await.result(system.terminate(), 10.seconds)
+ }
+ }
+
+ println("\n" + "=" * 80)
+ println("Done.")
+ }
+}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
index c46bcf5aee..687f695183 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
@@ -32,8 +32,24 @@ import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit
import com.typesafe.config.ConfigFactory
+/**
+ * Benchmarks BroadcastHub throughput under high-fan-out lockstep consumer
scenarios.
+ *
+ * The consumer wheel uses a LongMap per slot for O(1) keyed add/remove
without Long boxing.
+ * In lockstep, all consumers cluster in the same wheel slot, maximizing
per-slot contention.
+ * With a small buffer (64), the wheel has only 128 slots, so `consumerCount /
128` consumers
+ * share each slot — the old ArrayList.removeIf was O(k) per removal, now O(1).
+ *
+ * The `broadcast` benchmark parameterizes over consumer count with a fixed
small buffer,
+ * measuring how throughput scales as wheel slot pressure increases.
+ *
+ * The `broadcastLargeBuffer` benchmark uses a larger buffer (256) for
comparison,
+ * showing how the optimization holds up when consumers are spread across more
slots.
+ */
object BroadcastHubBenchmark {
final val OperationsPerInvocation = 100000
+ final val SmallBufferSize = 64
+ final val LargeBufferSize = 256
}
@State(Scope.Benchmark)
@@ -56,7 +72,7 @@ class BroadcastHubBenchmark {
var testSource: Source[java.lang.Integer, NotUsed] = _
- @Param(Array("64", "256"))
+ @Param(Array("64", "256", "1000", "2000"))
var parallelism = 0
@Setup
@@ -71,12 +87,40 @@ class BroadcastHubBenchmark {
Await.result(system.terminate(), 5.seconds)
}
+ /**
+ * Lockstep broadcast with small buffer (64).
+ * All consumers stay at roughly the same wheel offset, clustering in the
same slot.
+ * With 128 wheel slots and 2000 consumers, ~16 consumers share each slot on
average;
+ * during NeedWakeup bursts, thousands cluster in a single slot.
+ * This maximizes the O(1) vs O(k) per-removal difference.
+ */
@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def broadcast(): Unit = {
val latch = new CountDownLatch(parallelism)
val broadcastSink =
- BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism,
startAfterNrOfConsumers = parallelism)
+ BroadcastHub.sink[java.lang.Integer](bufferSize = SmallBufferSize,
startAfterNrOfConsumers = parallelism)
+ val sink = new LatchSink(OperationsPerInvocation, latch)
+ val source = testSource.runWith(broadcastSink)
+ var idx = 0
+ while (idx < parallelism) {
+ source.runWith(sink)
+ idx += 1
+ }
+ awaitLatch(latch)
+ }
+
+ /**
+ * Lockstep broadcast with larger buffer (256) for comparison.
+ * The wheel has 512 slots, so consumers are spread more thinly.
+ * Shows how the optimization scales when per-slot pressure is lower.
+ */
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def broadcastLargeBuffer(): Unit = {
+ val latch = new CountDownLatch(parallelism)
+ val broadcastSink =
+ BroadcastHub.sink[java.lang.Integer](bufferSize = LargeBufferSize,
startAfterNrOfConsumers = parallelism)
val sink = new LatchSink(OperationsPerInvocation, latch)
val source = testSource.runWith(broadcastSink)
var idx = 0
@@ -88,7 +132,7 @@ class BroadcastHubBenchmark {
}
private def awaitLatch(latch: CountDownLatch): Unit = {
- if (!latch.await(30, TimeUnit.SECONDS)) {
+ if (!latch.await(60, TimeUnit.SECONDS)) {
StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor)
throw new RuntimeException("Latch didn't complete in time")
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index a6d57dee19..b32c8746b1 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -629,6 +629,51 @@ class HubSpec extends StreamSpec {
in.sendComplete()
sinkProbe2.cancel()
}
+
+ "deliver all elements in order to many consumers" in {
+ val consumerCount = 200
+ val messageCount = 2000
+
+ val source = Source(0 until
messageCount).runWith(BroadcastHub.sink(bufferSize = 256,
+ startAfterNrOfConsumers = consumerCount))
+
+ val futures = (0 until consumerCount).map { _ =>
+ source.runWith(Sink.seq)
+ }
+
+ val results = Await.result(Future.sequence(futures), 30.seconds)
+ results.foreach { result =>
+ result should ===(0 until messageCount)
+ }
+ }
+
+ "handle many consumers when some cancel mid-stream" in {
+ val totalConsumers = 64
+ val cancellingConsumers = 16
+ val cancelAfter = 64
+ val messageCount = 512
+
+ val source = Source(0 until messageCount).runWith(
+ BroadcastHub.sink(bufferSize = 256, startAfterNrOfConsumers =
totalConsumers))
+
+ val cancellingFutures = (0 until cancellingConsumers).map { _ =>
+ source.take(cancelAfter).runWith(Sink.seq)
+ }
+
+ val remainingFutures = (0 until (totalConsumers -
cancellingConsumers)).map { _ =>
+ source.runWith(Sink.seq)
+ }
+
+ val cancellingResults = Await.result(Future.sequence(cancellingFutures),
30.seconds)
+ cancellingResults.foreach { result =>
+ result should ===(0 until cancelAfter)
+ }
+
+ val remainingResults = Await.result(Future.sequence(remainingFutures),
30.seconds)
+ remainingResults.foreach { result =>
+ result should ===(0 until messageCount)
+ }
+ }
}
"PartitionHub" must {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 4b6ca0063e..8849529e6d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -536,14 +536,17 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
* of priorities always fall to a range
*
* This wheel tracks the position of Consumers relative to the slowest
ones. Every slot
- * contains a list of Consumers being known at that location (this might
be out of date!).
+ * contains a map of Consumers being known at that location (this might be
out of date!).
* Consumers from time to time send Advance messages to indicate that they
have progressed
* by reading from the broadcast queue. Consumers that are blocked (due to
reaching tail) request
* a wakeup and update their position at the same time.
*
+ * Each slot uses a LongMap keyed by Consumer.id for O(1) add/remove
without Long boxing.
+ * Empty slots are null (no backing map allocated), reducing baseline
memory and GC pressure.
+ * When a slot drains to zero consumers, its map is released (set to null).
*/
private[this] val consumerWheel =
- Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new
util.ArrayList[Consumer]())
+ new Array[LongMap[Consumer]](bufferSize * 2)
private[this] var activeConsumers = 0
override def preStart(): Unit = {
@@ -574,15 +577,19 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
val newOffset = previousOffset + DemandThreshold
// Move the consumer from its last known offset to its new one.
Check if we are unblocked.
val consumer = findAndRemoveConsumer(id, previousOffset)
- addConsumer(consumer, newOffset)
+ if (consumer ne null) {
+ addConsumer(consumer, newOffset)
+ }
checkUnblock(previousOffset)
case NeedWakeup(id, previousOffset, currentOffset) =>
// Move the consumer from its last known offset to its new one.
Check if we are unblocked.
val consumer = findAndRemoveConsumer(id, previousOffset)
- addConsumer(consumer, currentOffset)
+ if (consumer ne null) {
+ addConsumer(consumer, currentOffset)
- // Also check if the consumer is now unblocked since we published an
element since it went asleep.
- if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+ // Also check if the consumer is now unblocked since we published
an element since it went asleep.
+ if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+ }
checkUnblock(previousOffset)
case RegistrationPending =>
@@ -650,10 +657,14 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
consumer.callback.invoke(failMessage)
}
- // Notify registered consumers
+ // Notify registered consumers — skip null (empty) slots
var idx = 0
while (idx < consumerWheel.length) {
- consumerWheel(idx).forEach(_.callback.invoke(failMessage))
+ val bucket = consumerWheel(idx)
+ if (bucket ne null) {
+ val itr = bucket.valuesIterator
+ while (itr.hasNext) itr.next().callback.invoke(failMessage)
+ }
idx += 1
}
failStage(ex)
@@ -664,21 +675,19 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
*
* NB: You cannot remove a consumer without knowing its last offset!
Consumers on the Source side always must
* track this so this can be a fast operation.
+ *
+ * Uses LongMap.getOrNull + -= to avoid Option allocation on the hot path.
*/
private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = {
- // TODO: Try to eliminate modulo division somehow...
val wheelSlot = offset & WheelMask
- val consumersInSlot = consumerWheel(wheelSlot)
- var removedConsumer: Consumer = null
- if (consumersInSlot.size() > 0) {
- consumersInSlot.removeIf(consumer => {
- if (consumer.id == id) {
- removedConsumer = consumer
- true
- } else false
- })
+ val bucket = consumerWheel(wheelSlot)
+ if (bucket eq null) return null
+ val consumer = bucket.getOrNull(id)
+ if (consumer ne null) {
+ bucket -= id
+ if (bucket.isEmpty) consumerWheel(wheelSlot) = null
}
- removedConsumer
+ consumer
}
/*
@@ -697,7 +706,7 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
if (offsetOfConsumerRemoved == head) {
// Try to advance along the wheel. We can skip any wheel slots which
have no waiting Consumers, until
// we either find a nonempty one, or we reached the end of the buffer.
- while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
+ while (isConsumerWheelSlotEmpty(head & WheelMask) && head != tail) {
queue(head & Mask) = null
head += 1
unblocked = true
@@ -706,18 +715,35 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
unblocked
}
+ private def isConsumerWheelSlotEmpty(slot: Int): Boolean = {
+ val bucket = consumerWheel(slot)
+ (bucket eq null) || bucket.isEmpty
+ }
+
private def addConsumer(consumer: Consumer, offset: Int): Unit = {
val slot = offset & WheelMask
- consumerWheel(slot).add(consumer)
+ val bucket = consumerWheel(slot)
+ if (bucket ne null) bucket.update(consumer.id, consumer)
+ else {
+ val newBucket = LongMap.empty[Consumer]
+ newBucket.update(consumer.id, consumer)
+ consumerWheel(slot) = newBucket
+ }
}
/*
* Send a wakeup signal to all the Consumers at a certain wheel index.
Note, this needs the actual index,
* which is offset modulo (bufferSize + 1).
+ *
+ * Enumeration order of the bucket is not semantically significant — every
consumer receives the same
+ * wakeup signal independently.
*/
private def wakeupIdx(idx: Int): Unit = {
- val itr = consumerWheel(idx).iterator
- while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+ val bucket = consumerWheel(idx)
+ if (bucket ne null) {
+ val itr = bucket.valuesIterator
+ while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+ }
}
private def complete(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]