This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 61c3d960bf optimize: avoid Holder allocation in ordered mapAsync for 
already-completed futures (#3018)
61c3d960bf is described below

commit 61c3d960bf9acbba8823057d6968dfb681e856b2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 31 21:21:37 2026 +0800

    optimize: avoid Holder allocation in ordered mapAsync for already-completed 
futures (#3018)
    
    Motivation:
    Ordered `MapAsync` allocates a `Holder` and performs a buffer 
enqueue/dequeue
    round-trip for every element, even when the future returned by the user 
function
    is already completed successfully and can be emitted immediately. With a
    synchronously-completing function (caching, `Future.successful`, ...) this 
is
    pure per-element garbage.
    
    Modification:
    In `MapAsync.onPush`, add a zero-allocation fast path: when the future is 
already
    completed with a non-null element, nothing is buffered ahead of it (so 
ordering is
    already satisfied) and downstream demand is present, push the element 
straight
    through without allocating a `Holder` or touching the buffer. The 
not-yet-completed
    case is matched first so the pending path is unchanged (single type check, 
never
    evaluates the fast-path guard).
    
    Result:
    JMH MapAsyncBenchmark (parallelism=4, spawn=false): 18.0M -> 20.3M ops/s 
(+12%),
    with the per-element `Holder` allocation eliminated (confirmed via 
async-profiler
    alloc profile). No regression on the asynchronous path (spawn=true). All 27
    FlowMapAsyncSpec tests pass; binary compatible (internal API).
    
    References:
    bench-jmh MapAsyncBenchmark; relates to the existing 
already-completed-future
    optimization (#20217).
---
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index a16955d24a..4aec981690 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -1319,19 +1319,30 @@ private[stream] object Collect {
       override def onPush(): Unit = {
         try {
           val future = f(grab(in))
-          val holder = new Holder[Out](NotYetThere, futureCB)
-          buffer.enqueue(holder)
-
           future.value match {
-            case None    => 
future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic)
+            // Not-yet-completed future: reserve this element's ordered slot 
with a Holder. Listed first so
+            // the pending path costs only a single type check and never 
evaluates the fast-path guard below
+            // (behaviourally and cost-wise identical to before this fast path 
was introduced).
+            case None =>
+              val holder = new Holder[Out](NotYetThere, futureCB)
+              buffer.enqueue(holder)
+              
future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic)
+            // Zero-allocation fast path: the future is already completed with 
a non-null element and
+            // nothing is buffered ahead of it, so ordering is already 
satisfied and downstream demand is
+            // present. Push straight through, skipping the Holder allocation 
and the buffer round-trip.
+            case Some(Success(elem)) if (elem != null) && buffer.isEmpty && 
isAvailable(out) =>
+              push(out, elem)
             case Some(v) =>
+              val holder = new Holder[Out](NotYetThere, futureCB)
+              buffer.enqueue(holder)
               // #20217 the future is already here, optimization: avoid 
scheduling it on the dispatcher and
               // run the logic directly on this thread
               holder.setElem(v)
               v match {
                 // this optimization also requires us to stop the stage to 
fail fast if the decider says so:
-                case Failure(ex) if holder.supervisionDirectiveFor(decider, 
ex) == Supervision.Stop => failStage(ex)
-                case _                                                         
                     => pushNextIfPossible()
+                case Failure(ex) if holder.supervisionDirectiveFor(decider, 
ex) == Supervision.Stop =>
+                  failStage(ex)
+                case _ => pushNextIfPossible()
               }
           }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to