This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new b96d35d79d fix: MapAsyncPartitioned drainQueue concurrent-modification
race (#2899)
b96d35d79d is described below
commit b96d35d79d8482c0408d57721a0e68cf9c500b95
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Apr 24 19:52:14 2026 +0800
fix: MapAsyncPartitioned drainQueue concurrent-modification race (#2899)
Motivation:
On JDK 25 CI the property test 'should process elements in parallel
preserving order in partition' (MapAsyncPartitionedSpec) repeatedly
hung past the 4 minute futureValue timeout. Local reproduction showed
0 of 5 attempts completed within 240 s on baseline.
Root cause: drainQueue iterated `buffer` (a mutable.Queue / ArrayDeque)
with foreach while processElement could synchronously complete the
returned Future (the #20217 fast-path). The synchronous completion
re-entered pushNextIfPossible which either dequeued from the front of
the buffer (ordered mode) or rebuilt it via filter (unordered mode).
The outer foreach then walked the now-shifted underlying array and
silently skipped buffered elements, leaving them with no future ever
registered, so the stream stalled.
Modification:
Snapshot the buffer to a List before iterating, and guard each item
with `holder.out eq NotYetThere` so an item already started or already
pushed during synchronous re-entry is never processed twice. This
preserves the existing #20217 fast-path while making the iteration
safe against re-entrant buffer mutation.
Result:
The previously-failing test passes 4 of 5 local runs (vs 0 of 5 on
baseline) in well under a second, and the full MapAsyncPartitionedSpec
(22 tests) finishes in ~17 s.
---
.../scala/org/apache/pekko/stream/MapAsyncPartitioned.scala | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
index c50eeb42d2..24f2d77c1b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala
@@ -252,9 +252,17 @@ private[stream] final class MapAsyncPartitioned[In, Out,
Partition](
private def drainQueue(): Unit = {
if (buffer.nonEmpty) {
- buffer.foreach {
+ // Snapshot the buffer because processElement may complete a Future
synchronously
+ // (the #20217 optimization), which re-enters pushNextIfPossible and
either
+ // dequeues from the buffer (ordered) or rebuilds it via filter
(unordered),
+ // invalidating any in-flight iterator over `buffer`. Items already
started or
+ // already pushed during re-entry have holder.out set, so the
NotYetThere
+ // guard ensures we never invoke processElement twice for the same
Holder.
+ val snapshot = buffer.toList
+ snapshot.foreach {
case (partition, wrappedInput) =>
- if (canStartNextElement(partition)) {
+ val holder = wrappedInput.element
+ if ((holder.out eq NotYetThere) &&
canStartNextElement(partition)) {
wrappedInput.resume()
processElement(partition, wrappedInput)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]