This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fab9594069 fix: stabilise JDK 25 nightly flakes (#2907)
fab9594069 is described below
commit fab959406911262a38f14f25782c0254d2f9efb6
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Apr 25 22:57:58 2026 +0800
fix: stabilise JDK 25 nightly flakes (#2907)
---
.github/workflows/nightly-builds.yml | 23 +++++++++++++++++-
CONTRIBUTING.md | 28 ++++++++++++++++++++++
docs/src/main/paradox/dispatchers.md | 10 +++++---
docs/src/main/paradox/typed/dispatchers.md | 14 ++++++++---
.../DurableStateBehaviorStashOverflowSpec.scala | 2 +-
stream-tests/src/test/resources/application.conf | 3 ++-
.../stream/scaladsl/BoundedSourceQueueSpec.scala | 12 ++++++----
7 files changed, 78 insertions(+), 14 deletions(-)
diff --git a/.github/workflows/nightly-builds.yml
b/.github/workflows/nightly-builds.yml
index 5d4ff1bbe6..8ac060072e 100644
--- a/.github/workflows/nightly-builds.yml
+++ b/.github/workflows/nightly-builds.yml
@@ -150,15 +150,36 @@ jobs:
# note that this is not running any multi-jvm tests because
multi-in-test=false
# JDK 21+ ForkJoinPool scheduling changes need a higher timefactor and
a larger
# minimum-runnable to avoid compensation-thread starvation (see #2573,
#2870).
+ # Run the internal fork-join dispatchers virtualized in nightly builds
so
+ # blocking work can unmount from carrier threads. JDK 25 is more
sensitive
+ # on the GitHub runners and needs the capped minimum-runnable value.
run: |-
EXTRA_JVM_OPTS=""
+ VIRTUAL_THREAD_JVM_OPTS=""
+ MINIMUM_RUNNABLE=4
if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
TIMEFACTOR=4
+ MINIMUM_RUNNABLE=8
else
TIMEFACTOR=2
fi
if [ "${{ matrix.javaVersion }}" -ge 21 ]; then
-
EXTRA_JVM_OPTS="-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4
-Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=4"
+
VIRTUAL_THREAD_JVM_OPTS="--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED"
+ export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS:-}
$VIRTUAL_THREAD_JVM_OPTS"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.remote.default-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.test.stream-dispatcher.fork-join-executor.minimum-runnable=$MINIMUM_RUNNABLE"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on"
+ EXTRA_JVM_OPTS="$EXTRA_JVM_OPTS
-Dpekko.test.stream-dispatcher.fork-join-executor.virtualize=on"
fi
sbt \
-Dpekko.cluster.assert=on \
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 2c84774d18..0521bb95fc 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -200,6 +200,34 @@ Pekko, like most Scala projects, compiles faster with the
Graal JIT enabled. The
* Use a JDK > 10
* Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS`
environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI
-XX:+UseJVMCICompiler`
+### JDK 21+ Nightly Virtual Threads
+
+The JDK nightly build enables virtual threads for selected dispatchers on JDK
21 and newer with system properties in
+`.github/workflows/nightly-builds.yml`. This is a CI-only opt-in; local runs,
PR validation, and the reference
+configuration keep `virtualize = off` unless explicitly overridden.
+For fork-join dispatchers, `virtualize = on` uses each dispatcher's own
fork-join pool as the virtual-thread scheduler,
+preserving dispatcher isolation rather than routing the selected dispatchers
through the JVM-wide default virtual-thread
+scheduler.
+
+The nightly override covers:
+
+* `pekko.actor.default-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.actor.internal-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.remote.default-remote-dispatcher.fork-join-executor.virtualize=on`
+*
`pekko.remote.classic.backoff-remote-dispatcher.fork-join-executor.virtualize=on`
+*
`pekko.persistence.dispatchers.default-replay-dispatcher.fork-join-executor.virtualize=on`
+*
`pekko.persistence.dispatchers.default-stream-dispatcher.fork-join-executor.virtualize=on`
+* `pekko.test.stream-dispatcher.fork-join-executor.virtualize=on`
+
+When reproducing this workflow locally on JDK 21+, use `cp .jvmopts-ci
.jvmopts` for the CI launcher settings and add
+the same virtual-thread `--add-opens` options that the nightly workflow
exports through `JDK_JAVA_OPTIONS`:
+
+* `--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED`
+* `--add-opens=java.base/java.lang=ALL-UNNAMED`
+
+The nightly build also keeps the JDK 21+ `minimum-runnable` override because
virtualized fork-join dispatchers still use
+carrier threads.
+
### The `validatePullRequest` task
The Pekko build includes a special task called `validatePullRequest`, which
investigates the changes made as well as dirty
diff --git a/docs/src/main/paradox/dispatchers.md
b/docs/src/main/paradox/dispatchers.md
index 27fbd42803..ba28c97da1 100644
--- a/docs/src/main/paradox/dispatchers.md
+++ b/docs/src/main/paradox/dispatchers.md
@@ -53,8 +53,13 @@ Set `minimum-runnable` explicitly (any non-negative integer)
to opt out of the a
`minimum-runnable = 1` restores the previous default, and `minimum-runnable =
0` disables compensation entirely.
**Experimental**: When Running on Java 21+, you can use `virtualize=on` to
enable the virtual threads feature.
-When using virtual threads, all virtual threads will use the same `unparker`,
so you may want to
-increase the number of `jdk.unparker.maxPoolSize`.
+When `virtualize=on` is used with a dispatcher executor such as
`fork-join-executor` or `thread-pool-executor`, Pekko
+uses that dispatcher's own executor as the virtual-thread scheduler. This
preserves dispatcher isolation instead of
+routing all virtual threads through the JVM-wide default virtual-thread
scheduler. The JVM still uses a shared
+`unparker`, so you may want to increase the number of
`jdk.unparker.maxPoolSize`.
+The setting can also be enabled with a system property such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's
reference configuration keeps it `off` by
+default; use an explicit application or test configuration when opting in.
#### Requirements
@@ -107,4 +112,3 @@ So in this example it's a top-level section, but you could
for instance put it a
where you'd use periods to denote sub-sections, like this:
`"foo.bar.my-dispatcher"`
@@@
-
diff --git a/docs/src/main/paradox/typed/dispatchers.md
b/docs/src/main/paradox/typed/dispatchers.md
index abbb694586..dc547e4ec0 100644
--- a/docs/src/main/paradox/typed/dispatchers.md
+++ b/docs/src/main/paradox/typed/dispatchers.md
@@ -138,8 +138,13 @@ Set `minimum-runnable` explicitly (any non-negative
integer) to opt out of the a
`minimum-runnable = 1` restores the previous default, and `minimum-runnable =
0` disables compensation entirely.
**Experimental**: When Running on Java 21+, you can use `virtualize=on` to
enable the virtual threads feature.
-When using virtual threads, all virtual threads will use the same `unparker`,
so you may want to
-increase the number of `jdk.unparker.maxPoolSize`.
+When `virtualize=on` is used with a dispatcher executor such as
`fork-join-executor` or `thread-pool-executor`, Pekko
+uses that dispatcher's own executor as the virtual-thread scheduler. This
preserves dispatcher isolation instead of
+routing all virtual threads through the JVM-wide default virtual-thread
scheduler. The JVM still uses a shared
+`unparker`, so you may want to increase the number of
`jdk.unparker.maxPoolSize`.
+The setting can also be enabled with a system property such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`. Pekko's
reference configuration keeps it `off` by
+default; use an explicit application or test configuration when opting in.
#### Requirements
@@ -387,7 +392,10 @@ Configuring a dispatcher with virtual threads, requires
Java 21 or above:
@@snip
[DispatcherDocSpec.scala](/docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala)
{ #virtual-thread-dispatcher-config }
With this, an actor will run in a virtual thread, so you may want to configure
it further with :
-`jdk.virtualThreadScheduler.parallelism`
,`jdk.virtualThreadScheduler.maxPoolSize` and `jdk.unparker.maxPoolSize`.
+dispatcher pool settings such as `parallelism-min` and `parallelism-max`, and
with `jdk.unparker.maxPoolSize` if the
+shared unparker becomes a bottleneck.
+You can enable the same setting from the command line with a system property
such as
+`-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on`.
### Fixed pool size
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
index dfbf1ed3e7..e710612d59 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/state/scaladsl/DurableStateBehaviorStashOverflowSpec.scala
@@ -129,7 +129,7 @@ class DurableStateBehaviorStashOverflowSpec
journal.completeUpsertFuture()
// exactly how many is racy but at least the first stash buffer full
should complete
- probe.receiveMessages(stashCapacity)
+ probe.receiveMessages(stashCapacity, 30.seconds)
}
}
}
diff --git a/stream-tests/src/test/resources/application.conf
b/stream-tests/src/test/resources/application.conf
index 98f92a5d50..c5f24920a5 100644
--- a/stream-tests/src/test/resources/application.conf
+++ b/stream-tests/src/test/resources/application.conf
@@ -1,7 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# Test-only: Virtual thread configuration.
-# Default: off for local development. Nightly CI sets
PEKKO_VIRTUALIZE_DISPATCHER=on for JDK 21+.
+# Default: off for local development. Nightly CI enables this with explicit
system properties for JDK 21+.
+# This environment variable remains available for module-local experiments.
pekko.test.stream-dispatcher.fork-join-executor {
# Default to off, then allow env var to override when set.
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index 1c25fe1697..cb858c897e 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -22,6 +22,7 @@ import org.apache.pekko
import pekko.stream.QueueOfferResult
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestDuration
import pekko.testkit.WithLogCapturing
class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
@@ -143,9 +144,9 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
class QueueingThread extends Thread {
override def run(): Unit = {
- var numElemsEnqueued = 0
- var numElemsDropped = 0
- def runLoop(): Unit = {
+ def runLoop(): (Int, Int) = {
+ var numElemsEnqueued = 0
+ var numElemsDropped = 0
val r = ThreadLocalRandom.current()
var done = false
@@ -164,11 +165,12 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
done = true
} else if (i % 100 == 0) Thread.sleep(1) // probabilistic
producer throttling delay
}
+ (numElemsEnqueued, numElemsDropped)
}
startBarrier.countDown()
startBarrier.await() // wait for all threads being in this state
before starting race
- runLoop()
+ val (numElemsEnqueued, numElemsDropped) = runLoop()
stopBarrier.countDown()
log.debug(
f"Thread $getName%-20s enqueued: $numElemsEnqueued%7d dropped:
$numElemsDropped%7d before completion")
@@ -204,7 +206,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
if (sendQueue.offer(round * 1000 + n) != QueueOfferResult.Enqueued)
fail(s"offer failed at round $round message $n")
}
- downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
+ downstream.expectNext(30.seconds.dilated, (1 to burstSize).map(_ +
round * 1000).toList)
downstream.request(1)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]