This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new b96d35d79d fix: MapAsyncPartitioned drainQueue concurrent-modification 
race (#2899)
b96d35d79d is described below

commit b96d35d79d8482c0408d57721a0e68cf9c500b95
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Apr 24 19:52:14 2026 +0800

    fix: MapAsyncPartitioned drainQueue concurrent-modification race (#2899)
    
    Motivation:
    On JDK 25 CI the property test 'should process elements in parallel
    preserving order in partition' (MapAsyncPartitionedSpec) repeatedly
    hung past the 4 minute futureValue timeout. Local reproduction showed
    0 of 5 attempts completed within 240 s on baseline.
    
    Root cause: drainQueue iterated `buffer` (a mutable.Queue / ArrayDeque)
    with foreach while processElement could synchronously complete the
    returned Future (the #20217 fast-path). The synchronous completion
    re-entered pushNextIfPossible which either dequeued from the front of
    the buffer (ordered mode) or rebuilt it via filter (unordered mode).
    The outer foreach then walked the now-shifted underlying array and
    silently skipped buffered elements, leaving them with no future ever
    registered, so the stream stalled.
    
    Modification:
    Snapshot the buffer to a List before iterating, and guard each item
    with `holder.out eq NotYetThere` so an item already started or already
    pushed during synchronous re-entry is never processed twice. This
    preserves the existing #20217 fast-path while making the iteration
    safe against re-entrant buffer mutation.
    
    Result:
    The previously-failing test passes 4 of 5 local runs (vs 0 of 5 on
    baseline) in well under a second, and the full MapAsyncPartitionedSpec
    (22 tests) finishes in ~17 s.
---
 .../scala/org/apache/pekko/stream/MapAsyncPartitioned.scala  | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala 
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index c50eeb42d2..24f2d77c1b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -252,9 +252,17 @@ private[stream] final class MapAsyncPartitioned[In, Out, 
Partition](
 
       private def drainQueue(): Unit = {
         if (buffer.nonEmpty) {
-          buffer.foreach {
+          // Snapshot the buffer because processElement may complete a Future 
synchronously
+          // (the #20217 optimization), which re-enters pushNextIfPossible and 
either
+          // dequeues from the buffer (ordered) or rebuilds it via filter 
(unordered),
+          // invalidating any in-flight iterator over `buffer`. Items already 
started or
+          // already pushed during re-entry have holder.out set, so the 
NotYetThere
+          // guard ensures we never invoke processElement twice for the same 
Holder.
+          val snapshot = buffer.toList
+          snapshot.foreach {
             case (partition, wrappedInput) =>
-              if (canStartNextElement(partition)) {
+              val holder = wrappedInput.element
+              if ((holder.out eq NotYetThere) && 
canStartNextElement(partition)) {
                 wrappedInput.resume()
                 processElement(partition, wrappedInput)
               }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to