This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a57a0db001 fix: support virtualized dispatcher test assumptions (#2912)
a57a0db001 is described below
commit a57a0db001ec3133b51c8c884dd52bfe35673279
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 26 21:34:41 2026 +0800
fix: support virtualized dispatcher test assumptions (#2912)
Motivation:
Nightly builds can enable virtualized fork-join dispatchers on newer JDKs.
That legitimately changes observable dispatcher thread names and disables
Dispatcher batching, so tests that asserted numeric worker suffixes or
default-dispatcher batching failed.
Modification:
- Accept both platform-thread and virtual-thread dispatcher name suffixes
in thread-name assertions.
- Keep production virtual-thread naming unchanged.
- Run ExecutionContext batching checks on a dedicated non-virtualized
dispatcher.
Result:
Nightly can virtualize the default and internal dispatchers while the
affected tests still validate their intended behavior on JDK 17, JDK 21, and
JDK 25.
Tests:
- JDK 25: sbt -Dpekko.test.timefactor=4
-Dpekko.actor.testkit.typed.timefactor=4
-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=8
-Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=8
-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on
-Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on "slf4j /
Test / testOnly org.apache.pekko.event.slf4j.Slf4jLoggerSpec" "actor-tests /
Test / testOnly org.apache.pekko.actor.dispatc [...]
- JDK 21: sbt -Dpekko.test.timefactor=2
-Dpekko.actor.testkit.typed.timefactor=2
-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4
-Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=4
-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on
-Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on "slf4j /
Test / testOnly org.apache.pekko.event.slf4j.Slf4jLoggerSpec" "actor-tests /
Test / testOnly org.apache.pekko.actor.dispatc [...]
- JDK 21 / Scala 3.3.x: sbt -Dpekko.test.timefactor=2
-Dpekko.actor.testkit.typed.timefactor=2
-Dpekko.actor.default-dispatcher.fork-join-executor.minimum-runnable=4
-Dpekko.actor.internal-dispatcher.fork-join-executor.minimum-runnable=4
-Dpekko.actor.default-dispatcher.fork-join-executor.virtualize=on
-Dpekko.actor.internal-dispatcher.fork-join-executor.virtualize=on "++ 3.3.x"
"slf4j / Test / testOnly org.apache.pekko.event.slf4j.Slf4jLoggerSpec"
"actor-tests / Test / testOnly org.a [...]
- JDK 17: sbt -Dpekko.test.timefactor=2
-Dpekko.actor.testkit.typed.timefactor=2 "slf4j / Test / testOnly
org.apache.pekko.event.slf4j.Slf4jLoggerSpec" "actor-tests / Test / testOnly
org.apache.pekko.actor.dispatch.DispatchersSpec
org.apache.pekko.dispatch.ExecutionContextSpec"
- sbt "slf4j / Test / scalafmt" "actor-tests / Test / scalafmtCheck" "slf4j
/ Test / scalafmtCheck"
- git diff --check
References:
- https://github.com/apache/pekko/pull/2907
- https://github.com/apache/pekko/actions/runs/24933645620
---
.../pekko/actor/dispatch/DispatchersSpec.scala | 20 +++++++++-----
.../pekko/dispatch/ExecutionContextSpec.scala | 31 +++++++++++++++-------
.../apache/pekko/event/slf4j/Slf4jLoggerSpec.scala | 4 ++-
3 files changed, 37 insertions(+), 18 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/actor/dispatch/DispatchersSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/actor/dispatch/DispatchersSpec.scala
index ec98c13810..2fe8900f41 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/actor/dispatch/DispatchersSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/actor/dispatch/DispatchersSpec.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.actor.dispatch
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.regex.Pattern
import scala.annotation.nowarn
import scala.reflect.ClassTag
@@ -109,6 +110,11 @@ object DispatchersSpec {
private val r = s.r
def unapplySeq(arg: CharSequence) = r.unapplySeq(arg)
}
+
+ private val DispatcherThreadSuffixRegex =
"(?:[1-9][0-9]*|virtual-thread(?:-[0-9]+)?)"
+
+ def dispatcherThreadName(prefix: String): R =
+ R(s"(${Pattern.quote(prefix)}-$DispatcherThreadSuffixRegex)")
}
class DispatchersSpec extends PekkoSpec(DispatchersSpec.config) with
ImplicitSender {
@@ -145,7 +151,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
def assertMyDispatcherIsUsed(actor: ActorRef): Unit = {
actor ! "what's the name?"
- val Expected = R("(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)")
+ val Expected = dispatcherThreadName("DispatchersSpec-myapp.mydispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -234,7 +240,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
"include system name and dispatcher id in thread names for
thread-pool-executor" in {
system.actorOf(Props[ThreadNameEcho]().withDispatcher("myapp.thread-pool-dispatcher"))
! "what's the name?"
- val Expected =
R("(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)")
+ val Expected =
dispatcherThreadName("DispatchersSpec-myapp.thread-pool-dispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -242,7 +248,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
"include system name and dispatcher id in thread names for
default-dispatcher" in {
system.actorOf(Props[ThreadNameEcho]()) ! "what's the name?"
- val Expected =
R("(DispatchersSpec-pekko.actor.default-dispatcher-[1-9][0-9]*)")
+ val Expected =
dispatcherThreadName("DispatchersSpec-pekko.actor.default-dispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -250,7 +256,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
"include system name and dispatcher id in thread names for pinned
dispatcher" in {
system.actorOf(Props[ThreadNameEcho]().withDispatcher("myapp.my-pinned-dispatcher"))
! "what's the name?"
- val Expected =
R("(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)")
+ val Expected =
dispatcherThreadName("DispatchersSpec-myapp.my-pinned-dispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -258,7 +264,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
"include system name and dispatcher id in thread names for balancing
dispatcher" in {
system.actorOf(Props[ThreadNameEcho]().withDispatcher("myapp.balancing-dispatcher"))
! "what's the name?"
- val Expected =
R("(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)")
+ val Expected =
dispatcherThreadName("DispatchersSpec-myapp.balancing-dispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -278,7 +284,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
pool ! Identify(None)
val routee = expectMsgType[ActorIdentity].ref.get
routee ! "what's the name?"
- val Expected =
R("""(DispatchersSpec-pekko\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""")
+ val Expected =
dispatcherThreadName("DispatchersSpec-pekko.actor.deployment./pool1.pool-dispatcher")
expectMsgPF() {
case Expected(_) =>
}
@@ -286,7 +292,7 @@ class DispatchersSpec extends
PekkoSpec(DispatchersSpec.config) with ImplicitSen
"use balancing-pool router with special routees mailbox of deployment
config" in {
system.actorOf(FromConfig.props(Props[ThreadNameEcho]()), name =
"balanced") ! "what's the name?"
- val Expected =
R("""(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""")
+ val Expected =
dispatcherThreadName("DispatchersSpec-BalancingPool-/balanced")
expectMsgPF() {
case Expected(_) =>
}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala
index 7cc6c33f12..36a164f9d8 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala
@@ -29,7 +29,23 @@ import pekko.testkit.TestActorRef
import pekko.testkit.TestProbe
import pekko.util.SerializedSuspendableExecutionContext
-class ExecutionContextSpec extends PekkoSpec with DefaultTimeout {
+object ExecutionContextSpec {
+ val BatchingDispatcher = "pekko.test.batching-dispatcher"
+
+ val config = s"""
+ $BatchingDispatcher = $${pekko.actor.default-dispatcher}
+ $BatchingDispatcher.fork-join-executor.virtualize = off
+ """
+}
+
+class ExecutionContextSpec extends PekkoSpec(ExecutionContextSpec.config) with
DefaultTimeout {
+ import ExecutionContextSpec._
+
+ def batchingDispatcher: ExecutionContextExecutor = {
+ val dispatcher = system.dispatchers.lookup(BatchingDispatcher)
+ dispatcher.isInstanceOf[BatchingExecutor] should ===(true)
+ dispatcher
+ }
"An ExecutionContext" must {
@@ -53,9 +69,7 @@ class ExecutionContextSpec extends PekkoSpec with
DefaultTimeout {
}
"be able to use Batching" in {
- system.dispatcher.isInstanceOf[BatchingExecutor] should ===(true)
-
- import system.dispatcher
+ implicit val dispatcher: ExecutionContextExecutor = batchingDispatcher
def batchable[T](f: => T)(implicit ec: ExecutionContext): Unit =
ec.execute(new Batchable {
@@ -83,8 +97,7 @@ class ExecutionContextSpec extends PekkoSpec with
DefaultTimeout {
}
"be able to avoid starvation when Batching is used and Await/blocking is
called" in {
- system.dispatcher.isInstanceOf[BatchingExecutor] should ===(true)
- import system.dispatcher
+ implicit val dispatcher: ExecutionContextExecutor = batchingDispatcher
def batchable[T](f: => T)(implicit ec: ExecutionContext): Unit =
ec.execute(new Batchable {
@@ -108,8 +121,7 @@ class ExecutionContextSpec extends PekkoSpec with
DefaultTimeout {
}
"work with tasks that use blocking{} multiple times" in {
- system.dispatcher.isInstanceOf[BatchingExecutor] should be(true)
- import system.dispatcher
+ implicit val dispatcher: ExecutionContextExecutor = batchingDispatcher
val f = Future(()).flatMap { _ =>
// this needs to be within an OnCompleteRunnable so that things are
added to the batch
@@ -135,8 +147,7 @@ class ExecutionContextSpec extends PekkoSpec with
DefaultTimeout {
}
"work with tasks that block inside blocking" in {
- system.dispatcher.isInstanceOf[BatchingExecutor] should be(true)
- import system.dispatcher
+ implicit val dispatcher: ExecutionContextExecutor = batchingDispatcher
val f = Future(()).flatMap { _ =>
blocking {
diff --git
a/slf4j/src/test/scala/org/apache/pekko/event/slf4j/Slf4jLoggerSpec.scala
b/slf4j/src/test/scala/org/apache/pekko/event/slf4j/Slf4jLoggerSpec.scala
index 616aefc961..cbaa2b2c7b 100644
--- a/slf4j/src/test/scala/org/apache/pekko/event/slf4j/Slf4jLoggerSpec.scala
+++ b/slf4j/src/test/scala/org/apache/pekko/event/slf4j/Slf4jLoggerSpec.scala
@@ -94,7 +94,9 @@ class Slf4jLoggerSpec extends
PekkoSpec(Slf4jLoggerSpec.config) with BeforeAndAf
output.reset()
}
- val sourceThreadRegex =
"sourceThread=Slf4jLoggerSpec-pekko.actor.default-dispatcher-[1-9][0-9]*"
+ val dispatcherThreadSuffixRegex =
"(?:[1-9][0-9]*|virtual-thread(?:-[0-9]+)?)"
+ val sourceThreadRegex =
+
s"sourceThread=Slf4jLoggerSpec-pekko\\.actor\\.default-dispatcher-$dispatcherThreadSuffixRegex"
"Slf4jLogger" must {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]