This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 61c3d960bf optimize: avoid Holder allocation in ordered mapAsync for
already-completed futures (#3018)
61c3d960bf is described below
commit 61c3d960bf9acbba8823057d6968dfb681e856b2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 31 21:21:37 2026 +0800
optimize: avoid Holder allocation in ordered mapAsync for already-completed
futures (#3018)
Motivation:
Ordered `MapAsync` allocates a `Holder` and performs a buffer
enqueue/dequeue
round-trip for every element, even when the future returned by the user
function
is already completed successfully and can be emitted immediately. With a
synchronously-completing function (caching, `Future.successful`, ...) this
is
pure per-element garbage.
Modification:
In `MapAsync.onPush`, add a zero-allocation fast path: when the future is
already
completed with a non-null element, nothing is buffered ahead of it (so
ordering is
already satisfied) and downstream demand is present, push the element
straight
through without allocating a `Holder` or touching the buffer. The
not-yet-completed
case is matched first so the pending path is unchanged (single type check,
never
evaluates the fast-path guard).
Result:
JMH MapAsyncBenchmark (parallelism=4, spawn=false): 18.0M -> 20.3M ops/s
(+12%),
with the per-element `Holder` allocation eliminated (confirmed via
async-profiler
alloc profile). No regression on the asynchronous path (spawn=true). All 27
FlowMapAsyncSpec tests pass; binary compatible (internal API).
References:
bench-jmh MapAsyncBenchmark; relates to the existing
already-completed-future
optimization (#20217).
---
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index a16955d24a..4aec981690 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -1319,19 +1319,30 @@ private[stream] object Collect {
override def onPush(): Unit = {
try {
val future = f(grab(in))
- val holder = new Holder[Out](NotYetThere, futureCB)
- buffer.enqueue(holder)
-
future.value match {
- case None =>
future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic)
+ // Not-yet-completed future: reserve this element's ordered slot
with a Holder. Listed first so
+ // the pending path costs only a single type check and never
evaluates the fast-path guard below
+ // (behaviourally and cost-wise identical to before this fast path
was introduced).
+ case None =>
+ val holder = new Holder[Out](NotYetThere, futureCB)
+ buffer.enqueue(holder)
+
future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic)
+ // Zero-allocation fast path: the future is already completed with
a non-null element and
+ // nothing is buffered ahead of it, so ordering is already
satisfied and downstream demand is
+ // present. Push straight through, skipping the Holder allocation
and the buffer round-trip.
+ case Some(Success(elem)) if (elem != null) && buffer.isEmpty &&
isAvailable(out) =>
+ push(out, elem)
case Some(v) =>
+ val holder = new Holder[Out](NotYetThere, futureCB)
+ buffer.enqueue(holder)
// #20217 the future is already here, optimization: avoid
scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setElem(v)
v match {
// this optimization also requires us to stop the stage to
fail fast if the decider says so:
- case Failure(ex) if holder.supervisionDirectiveFor(decider,
ex) == Supervision.Stop => failStage(ex)
- case _
=> pushNextIfPossible()
+ case Failure(ex) if holder.supervisionDirectiveFor(decider,
ex) == Supervision.Stop =>
+ failStage(ex)
+ case _ => pushNextIfPossible()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]