This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fab9594069 fix: stabilise JDK 25 nightly flakes (#2907)
fab9594069 is described below

commit fab959406911262a38f14f25782c0254d2f9efb6
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Apr 25 22:57:58 2026 +0800

    fix: stabilise JDK 25 nightly flakes (#2907)
---
 .github/workflows/nightly-builds.yml               | 23 +++++++++++++++++-
 CONTRIBUTING.md                                    | 28 ++++++++++++++++++++++
 docs/src/main/paradox/dispatchers.md               | 10 +++++---
 docs/src/main/paradox/typed/dispatchers.md         | 14 ++++++++---
 .../DurableStateBehaviorStashOverflowSpec.scala    |  2 +-
 stream-tests/src/test/resources/application.conf   |  3 ++-
 .../stream/scaladsl/BoundedSourceQueueSpec.scala   | 12 ++++++----
 7 files changed, 78 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/nightly-builds.yml 
b/.github/workflows/nightly-builds.yml
index 5d4ff1bbe6..8ac060072e 100644
--- a/.github/workflows/nightly-builds.yml
+++ b/.github/workflows/nightly-builds.yml
@@ -150,15 +150,36 @@ jobs:
         # note that this is not running any multi-jvm tests because 
multi-in-test=false
         # JDK 21+ ForkJoinPool scheduling changes need a higher timefactor and 
a larger
         # minimum-runnable to avoid compensation-thread starvation (see #2573, 
#2870).
+        # Run the internal fork-join dispatchers virtualized in nightly builds 
so
+        # blocking work can unmount from carrier threads. JDK 25 is more 
sensitive
+        # on the GitHub runners and needs the capped minimum-runnable value.
         run: |-
           EXTRA_JVM_OPTS=""
+          VIRTUAL_THREAD_JVM_OPTS=""
+          MINIMUM_RUNNABLE=4
           if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
             TIMEFACTOR=4
+            MINIMUM_RUNNABLE=8
           else
             TIMEFACTOR=2
           fi
           if [ "${{ matrix.javaVersion }}" -ge 21 ]; then
-            
EXTRA_JVM_OPTS="-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4
 -Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=4"
+            
VIRTUAL_THREAD_JVM_OPTS="--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED"
+            export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS:-} 
$VIRTUAL_THREAD_JVM_OPTS"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.test.stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on"
+            EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS 
-Dpekko.test.stream-dispatcher.fork-join-executor.virtualize=on"
           fi
           sbt \
             -Dpekko.cluster.assert=on \
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 2c84774d18..0521bb95fc 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -200,6 +200,34 @@ Pekko, like most Scala projects, compiles faster with the 
Graal JIT enabled. The
 * Use a JDK > 10
 * Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS` 
environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI 
-XX:+UseJVMCICompiler`
 
+### JDK 21+ Nightly Virtual Threads
+
+The JDK nightly build enables virtual threads for selected dispatchers on JDK 
21 and newer with system properties in
+`.github/workflows/nightly-builds.yml`. This is a CI-only opt-in; local runs, 
PR validation, and the reference
+configuration keep `virtualize = off` unless explicitly overridden.
+For fork-join dispatchers, `virtualize = on` uses each dispatcher's own 
fork-join pool as the virtual-thread scheduler,
+preserving dispatcher isolation rather than routing the selected dispatchers 
through the JVM-wide default virtual-thread
+scheduler.
+
+The nightly override covers:
+
+* `pekko.actor.default-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.actor.internal-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on`
+* 
`pekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on`
+* 
`pekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on`
+* 
`pekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.test.stream-dispatcher.fork-join-executor.virtualize=on`
+
+When reproducing this workflow locally on JDK 21+, use `cp .jvmopts-ci 
.jvmopts` for the CI launcher settings and add
+the same virtual-thread `--add-opens` options that the nightly workflow 
exports through `JDK_JAVA_OPTIONS`:
+
+* `--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED`
+* `--add-opens=java.base/java.lang=ALL-UNNAMED`
+
+The nightly build also keeps the JDK 21+ `minimum-runnable` override because 
virtualized fork-join dispatchers still use
+carrier threads.
+
 ### The `validatePullRequest` task
 
 The Pekko build includes a special task called `validatePullRequest`, which 
investigates the changes made as well as dirty
diff --git a/docs/src/main/paradox/dispatchers.md 
b/docs/src/main/paradox/dispatchers.md
index 27fbd42803..ba28c97da1 100644
--- a/docs/src/main/paradox/dispatchers.md
+++ b/docs/src/main/paradox/dispatchers.md
@@ -53,8 +53,13 @@ Set `minimum-runnable` explicitly (any non-negative integer) 
to opt out of the a
 `minimum-runnable = 1` restores the previous default, and `minimum-runnable = 
0` disables compensation entirely.
 
 **Experimental**: When Running on Java 21+, you can use `virtualize=on` to 
enable the virtual threads feature.
-When using virtual threads, all virtual threads will use the same `unparker`, 
so you may want to 
-increase the number of `jdk.unparker.maxPoolSize`. 
+When `virtualize=on` is used with a dispatcher executor such as 
`fork-join-executor` or `thread-pool-executor`, Pekko
+uses that dispatcher's own executor as the virtual-thread scheduler. This 
preserves dispatcher isolation instead of
+routing all virtual threads through the JVM-wide default virtual-thread 
scheduler. The JVM still uses a shared
+`unparker`, so you may want to increase the number of 
`jdk.unparker.maxPoolSize`.
+The setting can also be enabled with a system property such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's 
reference configuration keeps it `off` by
+default; use an explicit application or test configuration when opting in.
 
 #### Requirements
 
@@ -107,4 +112,3 @@ So in this example it's a top-level section, but you could 
for instance put it a
 where you'd use periods to denote sub-sections, like this: 
`"foo.bar.my-dispatcher"`
 
 @@@
-
diff --git a/docs/src/main/paradox/typed/dispatchers.md 
b/docs/src/main/paradox/typed/dispatchers.md
index abbb694586..dc547e4ec0 100644
--- a/docs/src/main/paradox/typed/dispatchers.md
+++ b/docs/src/main/paradox/typed/dispatchers.md
@@ -138,8 +138,13 @@ Set `minimum-runnable` explicitly (any non-negative 
integer) to opt out of the a
 `minimum-runnable = 1` restores the previous default, and `minimum-runnable = 
0` disables compensation entirely.
 
 **Experimental**: When Running on Java 21+, you can use `virtualize=on` to 
enable the virtual threads feature.
-When using virtual threads, all virtual threads will use the same `unparker`, 
so you may want to
-increase the number of `jdk.unparker.maxPoolSize`.
+When `virtualize=on` is used with a dispatcher executor such as 
`fork-join-executor` or `thread-pool-executor`, Pekko
+uses that dispatcher's own executor as the virtual-thread scheduler. This 
preserves dispatcher isolation instead of
+routing all virtual threads through the JVM-wide default virtual-thread 
scheduler. The JVM still uses a shared
+`unparker`, so you may want to increase the number of 
`jdk.unparker.maxPoolSize`.
+The setting can also be enabled with a system property such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's 
reference configuration keeps it `off` by
+default; use an explicit application or test configuration when opting in.
 
 #### Requirements
 
@@ -387,7 +392,10 @@ Configuring a dispatcher with virtual threads, requires 
Java 21 or above:
 @@snip 
[DispatcherDocSpec.scala](/docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala)
 { #virtual-thread-dispatcher-config }
 
 With this, an actor will run in a virtual thread, so you may want to configure 
it further with :
-`jdk.virtualThreadScheduler.parallelism` 
,`jdk.virtualThreadScheduler.maxPoolSize` and `jdk.unparker.maxPoolSize`.
+dispatcher pool settings such as `parallelism-min` and `parallelism-max`, and 
with `jdk.unparker.maxPoolSize` if the
+shared unparker becomes a bottleneck.
+You can enable the same setting from the command line with a system property 
such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`.
 
 ### Fixed pool size
 
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
index dfbf1ed3e7..e710612d59 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
@@ -129,7 +129,7 @@ class DurableStateBehaviorStashOverflowSpec
       journal.completeUpsertFuture()
 
       // exactly how many is racy but at least the first stash buffer full 
should complete
-      probe.receiveMessages(stashCapacity)
+      probe.receiveMessages(stashCapacity, 30.seconds)
     }
   }
 }
diff --git a/stream-tests/src/test/resources/application.conf 
b/stream-tests/src/test/resources/application.conf
index 98f92a5d50..c5f24920a5 100644
--- a/stream-tests/src/test/resources/application.conf
+++ b/stream-tests/src/test/resources/application.conf
@@ -1,7 +1,8 @@
 # SPDX-License-Identifier: Apache-2.0
 
 # Test-only: Virtual thread configuration.
-# Default: off for local development. Nightly CI sets 
PEKKO_VIRTUALIZE_DISPATCHER=on for JDK 21+.
+# Default: off for local development. Nightly CI enables this with explicit 
system properties for JDK 21+.
+# This environment variable remains available for module-local experiments.
 
 pekko.test.stream-dispatcher.fork-join-executor {
   # Default to off, then allow env var to override when set.
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index 1c25fe1697..cb858c897e 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -22,6 +22,7 @@ import org.apache.pekko
 import pekko.stream.QueueOfferResult
 import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
 import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestDuration
 import pekko.testkit.WithLogCapturing
 
 class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
@@ -143,9 +144,9 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
 
       class QueueingThread extends Thread {
         override def run(): Unit = {
-          var numElemsEnqueued = 0
-          var numElemsDropped = 0
-          def runLoop(): Unit = {
+          def runLoop(): (Int, Int) = {
+            var numElemsEnqueued = 0
+            var numElemsDropped = 0
             val r = ThreadLocalRandom.current()
 
             var done = false
@@ -164,11 +165,12 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
                 done = true
               } else if (i % 100 == 0) Thread.sleep(1) // probabilistic 
producer throttling delay
             }
+            (numElemsEnqueued, numElemsDropped)
           }
 
           startBarrier.countDown()
           startBarrier.await() // wait for all threads being in this state 
before starting race
-          runLoop()
+          val (numElemsEnqueued, numElemsDropped) = runLoop()
           stopBarrier.countDown()
           log.debug(
             f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped: 
$numElemsDropped%7d before completion")
@@ -204,7 +206,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
           if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued)
             fail(s"offer failed at round $round message $n")
         }
-        downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
+        downstream.expectNext(30.seconds.dilated, (1 to burstSize).map(_ + 
round * 1000).toList)
         downstream.request(1)
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to