This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix-mapasync-ordered-pull
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 8618d9e2d09c9d71157df00dd11901725f4a9d8d
Author: He-Pin <[email protected]>
AuthorDate: Sat Apr 25 14:50:03 2026 +0800

    fix: MapAsyncPartitioned ordered path defers pullIfNeeded to preserve FIFO
    
    Motivation:
    In ordered mode, when the head of the buffer was pushed to downstream the
    loop called pullIfNeeded() before drainQueue(). With fused stages tryPull
    synchronously re-enters onPush, which then evaluates canStartNextElement
    on the new element. Because the slot freed by the just-pushed element is
    already available, the new element claims that partition slot, leaving any
    older buffered waiter for the same partition suspended out of order.
    A second issue: the failure-resume branch and the exit-on-NotYetThere path
    never invoked pullIfNeeded, so streams whose buffer drained only via
    resumed failures could miss completion after onUpstreamFinish.
    
    Modification:
    Removed the pullIfNeeded() calls inside the success branch of the ordered
    loop. Added a single pullIfNeeded() after drainQueue() so that:
      * suspended FIFO waiters for the just-freed partition are restarted
        before upstream is asked for a new element;
      * completion is triggered after buffer drains via resumed failures;
      * tryPull still happens once the buffer has room.
    
    Result:
    Ordered output respects per-partition FIFO order even when downstream
    demand is bursty. Failure-resume paths complete the stage as expected.
    Existing tests in stream-tests and stream-typed-tests continue to pass.
---
 .../org/apache/pekko/stream/MapAsyncPartitioned.scala     | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala 
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index 24f2d77c1b..cf4bf0f63e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -188,12 +188,12 @@ private[stream] final class MapAsyncPartitioned[In, Out, 
Partition](
               case Success(elem) =>
                 if (elem != null) {
                   push(out, elem)
-                  pullIfNeeded()
-                } else {
-                  // elem is null
-                  pullIfNeeded()
                 }
-
+              // pullIfNeeded is deferred until after drainQueue below so that
+              // any FIFO-suspended waiter for the just-freed partition gets 
started
+              // before upstream is asked for a new element. Pulling early 
would
+              // synchronously trigger onPush, which would claim the partition 
slot
+              // (canStartNextElement returns true) and starve the older 
waiter.
               case Failure(NonFatal(ex)) =>
                 holder.supervisionDirectiveFor(decider, ex) match {
                   // this could happen if we are looping in pushNextIfPossible 
and end up on a failed future before the
@@ -209,6 +209,11 @@ private[stream] final class MapAsyncPartitioned[In, Out, 
Partition](
             }
           }
           drainQueue()
+          // Required even when no element was pushed (failure-resume path or
+          // exit on NotYetThere head): completes the stage when upstream is
+          // closed and the buffer drained, and triggers a tryPull when buffer
+          // shrunk via failures.
+          pullIfNeeded()
         }
 
       private def pushNextIfPossibleUnordered(): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to