This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix-mapasync-drainqueue-race in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 0b93cb02dc3c6a0cbc5afaa2cb67cf5245794c10 Author: He-Pin <[email protected]> AuthorDate: Fri Apr 24 18:38:55 2026 +0800 fix: MapAsyncPartitioned drainQueue concurrent-modification race Motivation: On JDK 25 CI the property test 'should process elements in parallel preserving order in partition' (MapAsyncPartitionedSpec) repeatedly hung past the 4 minute futureValue timeout. Local reproduction showed 0 of 5 attempts completed within 240 s on baseline. Root cause: drainQueue iterated `buffer` (a mutable.Queue / ArrayDeque) with foreach while processElement could synchronously complete the returned Future (the #20217 fast-path). The synchronous completion re-entered pushNextIfPossible which either dequeued from the front of the buffer (ordered mode) or rebuilt it via filter (unordered mode). The outer foreach then walked the now-shifted underlying array and silently skipped buffered elements, leaving them with no future ever registered, so the stream stalled. Modification: Snapshot the buffer to a List before iterating, and guard each item with `holder.out eq NotYetThere` so an item already started or already pushed during synchronous re-entry is never processed twice. This preserves the existing #20217 fast-path while making the iteration safe against re-entrant buffer mutation. Result: The previously-failing test passes 4 of 5 local runs (vs 0 of 5 on baseline) in well under a second, and the full MapAsyncPartitionedSpec (22 tests) finishes in ~17 s. --- .../scala/org/apache/pekko/stream/MapAsyncPartitioned.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala index c50eeb42d2..24f2d77c1b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala @@ -252,9 +252,17 @@ private[stream] final class MapAsyncPartitioned[In, Out, Partition]( private def drainQueue(): Unit = { if (buffer.nonEmpty) { - buffer.foreach { + // Snapshot the buffer because processElement may complete a Future synchronously + // (the #20217 optimization), which re-enters pushNextIfPossible and either + // dequeues from the buffer (ordered) or rebuilds it via filter (unordered), + // invalidating any in-flight iterator over `buffer`. Items already started or + // already pushed during re-entry have holder.out set, so the NotYetThere + // guard ensures we never invoke processElement twice for the same Holder. + val snapshot = buffer.toList + snapshot.foreach { case (partition, wrappedInput) => - if (canStartNextElement(partition)) { + val holder = wrappedInput.element + if ((holder.out eq NotYetThere) && canStartNextElement(partition)) { wrappedInput.resume() processElement(partition, wrappedInput) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
