This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix-mapasync-ordered-pull in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 8618d9e2d09c9d71157df00dd11901725f4a9d8d Author: He-Pin <[email protected]> AuthorDate: Sat Apr 25 14:50:03 2026 +0800 fix: MapAsyncPartitioned ordered path defers pullIfNeeded to preserve FIFO Motivation: In ordered mode, when the head of the buffer was pushed to downstream the loop called pullIfNeeded() before drainQueue(). With fused stages tryPull synchronously re-enters onPush, which then evaluates canStartNextElement on the new element. Because the slot freed by the just-pushed element is already available, the new element claims that partition slot, leaving any older buffered waiter for the same partition suspended out of order. A second issue: the failure-resume branch and the exit-on-NotYetThere path never invoked pullIfNeeded, so streams whose buffer drained only via resumed failures could miss completion after onUpstreamFinish. Modification: Removed the pullIfNeeded() calls inside the success branch of the ordered loop. Added a single pullIfNeeded() after drainQueue() so that: * suspended FIFO waiters for the just-freed partition are restarted before upstream is asked for a new element; * completion is triggered after buffer drains via resumed failures; * tryPull still happens once the buffer has room. Result: Ordered output respects per-partition FIFO order even when downstream demand is bursty. Failure-resume paths complete the stage as expected. Existing tests in stream-tests and stream-typed-tests continue to pass. --- .../org/apache/pekko/stream/MapAsyncPartitioned.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala index 24f2d77c1b..cf4bf0f63e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala @@ -188,12 +188,12 @@ private[stream] final class MapAsyncPartitioned[In, Out, Partition]( case Success(elem) => if (elem != null) { push(out, elem) - pullIfNeeded() - } else { - // elem is null - pullIfNeeded() } - + // pullIfNeeded is deferred until after drainQueue below so that + // any FIFO-suspended waiter for the just-freed partition gets started + // before upstream is asked for a new element. Pulling early would + // synchronously trigger onPush, which would claim the partition slot + // (canStartNextElement returns true) and starve the older waiter. case Failure(NonFatal(ex)) => holder.supervisionDirectiveFor(decider, ex) match { // this could happen if we are looping in pushNextIfPossible and end up on a failed future before the @@ -209,6 +209,11 @@ private[stream] final class MapAsyncPartitioned[In, Out, Partition]( } } drainQueue() + // Required even when no element was pushed (failure-resume path or + // exit on NotYetThere head): completes the stage when upstream is + // closed and the buffer drained, and triggers a tryPull when buffer + // shrunk via failures. + pullIfNeeded() } private def pushNextIfPossibleUnordered(): Unit = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
