This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 880b0075d6 fix: Fix virtualized dispatcher shutdown ordering (#2925)
880b0075d6 is described below
commit 880b0075d6459b64f99cea10ba90f57daa591b14
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue May 5 00:06:03 2026 +0800
fix: Fix virtualized dispatcher shutdown ordering (#2925)
* fix: Fix virtualized dispatcher shutdown ordering
Motivation:
JDK 21 and 25 can reject virtual-thread continuations when a custom
scheduler is shut down before already-started virtual threads have finished.
This breaks virtualized dispatcher shutdown paths in nightly runs.
Modification:
Shut down the thread-per-task virtual-thread executor first, then cascade
shutdown to the owned underlying scheduler only after the virtual-thread
executor has terminated. Add a regression test that verifies a blocked virtual
thread can resume and finish after dispatcher shutdown.
Result:
The shutdown behavior now matches ExecutorService semantics while avoiding
premature scheduler teardown across JDK 21 and JDK 25.
References:
apache/pekko nightly run 24984219012
* fix: Stabilize nightly failure tests
* fix: Preserve virtualized shutdownNow cascade
Motivation:
PR review identified that deferred cascade shutdown changed shutdownNow
semantics for virtualized dispatchers by eventually invoking
underlying.shutdown() instead of underlying.shutdownNow().
Modification:
Track shutdownNow intent separately from the deferred shutdown waiter and
use a small atomic state machine so an owned scheduler is shut down gracefully
for shutdown(), but escalates to shutdownNow() for shutdownNow() after virtual
threads have terminated. Restore unrelated delegate-method formatting and
strengthen the JDK21+ regression spec with explicit scheduler shutdown-mode
assertions.
Result:
Virtualized dispatchers keep the delayed scheduler shutdown needed for
blocked virtual threads while preserving ExecutorService shutdownNow semantics
for owned schedulers.
* test: stabilize DNS and mapAsyncPartitioned CI checks
* Improve virtualized dispatcher shutdown coordination
---
.../dispatch/ForkJoinPoolVirtualThreadSpec.scala | 145 +++++++++++++++++++++
.../apache/pekko/io/dns/DockerBindDnsService.scala | 26 +++-
.../pekko/routing/MetricsBasedResizerSpec.scala | 3 +
.../pekko/dispatch/VirtualThreadSupport.scala | 10 ++
.../dispatch/VirtualizedExecutorService.scala | 83 +++++++++---
.../pekko/remote/artery/LateConnectSpec.scala | 14 +-
.../scaladsl/FlowMapAsyncPartitionedSpec.scala | 8 +-
7 files changed, 260 insertions(+), 29 deletions(-)
diff --git
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
index dd97a74a0d..00f0fb6e0b 100644
---
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
+++
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
@@ -17,6 +17,10 @@
package org.apache.pekko.dispatch
+import java.util
+import java.util.concurrent.{ AbstractExecutorService, CountDownLatch,
ExecutorService, Executors, TimeUnit }
+import java.util.concurrent.atomic.AtomicBoolean
+
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }
@@ -61,6 +65,33 @@ object ForkJoinPoolVirtualThreadSpec {
}
}
+ final class TrackingExecutorService(delegate: ExecutorService) extends
AbstractExecutorService {
+ val shutdownCalled = new AtomicBoolean(false)
+ val shutdownNowCalled = new AtomicBoolean(false)
+
+ override def shutdown(): Unit = {
+ shutdownCalled.set(true)
+ delegate.shutdown()
+ }
+
+ override def shutdownNow(): util.List[Runnable] = {
+ shutdownNowCalled.set(true)
+ delegate.shutdownNow()
+ }
+
+ override def isShutdown: Boolean = delegate.isShutdown
+
+ override def isTerminated: Boolean = delegate.isTerminated
+
+ override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
+ delegate.awaitTermination(timeout, unit)
+ }
+
+ override def execute(command: Runnable): Unit = {
+ delegate.execute(command)
+ }
+ }
+
}
class ForkJoinPoolVirtualThreadSpec extends
PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
@@ -86,5 +117,119 @@ class ForkJoinPoolVirtualThreadSpec extends
PekkoSpec(ForkJoinPoolVirtualThreadS
}
}
+ "allow blocked virtual threads to finish after dispatcher shutdown" in {
+ val underlying = new
TrackingExecutorService(Executors.newFixedThreadPool(1))
+ val executor = new VirtualizedExecutorService(
+
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
0, underlying),
+ underlying,
+ _ => false,
+ cascadeShutdown = true)
+
+ val started = new CountDownLatch(1)
+ val release = new CountDownLatch(1)
+ val finished = new CountDownLatch(1)
+
+ try {
+ executor.execute(new Runnable {
+ override def run(): Unit = {
+ started.countDown()
+ release.await()
+ finished.countDown()
+ }
+ })
+
+ started.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.shutdown()
+ underlying.isShutdown should ===(false)
+
+ release.countDown()
+ finished.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+ underlying.shutdownCalled.get should ===(true)
+ underlying.shutdownNowCalled.get should ===(false)
+ underlying.isTerminated should ===(true)
+ } finally {
+ release.countDown()
+ executor.shutdownNow()
+ underlying.shutdownNow()
+ }
+ }
+
+ "interrupt blocked virtual threads on dispatcher shutdownNow" in {
+ val underlying = new
TrackingExecutorService(Executors.newFixedThreadPool(1))
+ val executor = new VirtualizedExecutorService(
+
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
0, underlying),
+ underlying,
+ _ => false,
+ cascadeShutdown = true)
+
+ val started = new CountDownLatch(1)
+ val release = new CountDownLatch(1)
+ val interrupted = new CountDownLatch(1)
+
+ try {
+ executor.execute(new Runnable {
+ override def run(): Unit = {
+ started.countDown()
+ try release.await()
+ catch {
+ case _: InterruptedException => interrupted.countDown()
+ }
+ }
+ })
+
+ started.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.shutdownNow()
+
+ interrupted.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+ underlying.shutdownNowCalled.get should ===(true)
+ underlying.isTerminated should ===(true)
+ } finally {
+ release.countDown()
+ executor.shutdownNow()
+ underlying.shutdownNow()
+ }
+ }
+
+ "escalate to underlying shutdownNow after dispatcher shutdown" in {
+ val underlying = new
TrackingExecutorService(Executors.newFixedThreadPool(1))
+ val executor = new VirtualizedExecutorService(
+
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
0, underlying),
+ underlying,
+ _ => false,
+ cascadeShutdown = true)
+
+ val started = new CountDownLatch(1)
+ val release = new CountDownLatch(1)
+ val interrupted = new CountDownLatch(1)
+
+ try {
+ executor.execute(new Runnable {
+ override def run(): Unit = {
+ started.countDown()
+ try release.await()
+ catch {
+ case _: InterruptedException => interrupted.countDown()
+ }
+ }
+ })
+
+ started.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.shutdown()
+ underlying.isShutdown should ===(false)
+
+ executor.shutdownNow()
+ interrupted.await(5, TimeUnit.SECONDS) should ===(true)
+ executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+ underlying.shutdownNowCalled.get should ===(true)
+ underlying.isTerminated should ===(true)
+ } finally {
+ release.countDown()
+ executor.shutdownNow()
+ underlying.shutdownNow()
+ }
+ }
+
}
}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
index 66326512a6..ab27bb2514 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
@@ -13,6 +13,8 @@
package org.apache.pekko.io.dns
+import java.net.{ InetAddress, InetSocketAddress }
+
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Try
@@ -25,7 +27,8 @@ import com.github.dockerjava.core.{
DefaultDockerClientConfig, DockerClientConfi
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
import org.apache.pekko
-import pekko.io.{ Dns, IO }
+import pekko.actor.Props
+import pekko.io.dns.internal.DnsClient
import pekko.util.Timeout
import pekko.testkit.PekkoSpec
@@ -122,11 +125,22 @@ abstract class DockerBindDnsService(config: Config)
extends PekkoSpec(config) wi
reader.toString should include("Starting BIND")
}
- eventually(timeout(25.seconds)) {
- import pekko.pattern.ask
- implicit val timeout: Timeout = 2.seconds
- (IO(Dns) ? DnsProtocol.Resolve("a-single.foo.test", DnsProtocol.Ip(ipv6
= false))).mapTo[
- DnsProtocol.Resolved].futureValue
+ val readinessNameserver = new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), hostPort)
+ val readinessClient = system.actorOf(Props(new
DnsClient(readinessNameserver)))
+ try {
+ var requestId = 0
+ eventually(timeout(25.seconds)) {
+ import pekko.pattern.ask
+ implicit val timeout: Timeout = 2.seconds
+ requestId += 1
+ val answer = (readinessClient ? DnsClient.Question4(requestId.toShort,
"a-single.foo.test"))
+ .mapTo[DnsClient.Answer]
+ .futureValue
+ answer.rrs.collect { case record: ARecord => record.ip } shouldEqual
Seq(
+ InetAddress.getByName("192.168.1.20"))
+ }
+ } finally {
+ system.stop(readinessClient)
}
}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
index 59cf8343d0..984725d17d 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
@@ -241,6 +241,9 @@ class MetricsBasedResizerSpec extends
PekkoSpec(ResizerSpec.config) with Default
router.mockSend(await = true, routeeIdx = 0)
router.mockSend(await = false, routeeIdx = 1)
+ awaitAssert {
+ resizer.updatedStats(router.routees, router.msgs.size)._1.get(2)
should not be empty
+ }
resizer.reportMessageCount(router.routees, router.msgs.size)
resizer.performanceLog.get(2) should not be empty
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
index 257da4d97b..4d2595bce6 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
@@ -55,6 +55,16 @@ private[dispatch] object VirtualThreadSupport {
}
}
+ /**
+ * Start a virtual thread using the default virtual thread scheduler.
+ */
+ def startVirtualThread(prefix: String, runnable: Runnable): Thread = {
+ require(runnable != null, "runnable should not be null.")
+ val thread = newVirtualThreadFactory(prefix, -1).newThread(runnable)
+ thread.start()
+ thread
+ }
+
/**
* Create a virtual thread factory with the default Virtual Thread executor.
* @param prefix the prefix of the virtual thread name.
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
index 5a6a80a38d..be6cf1b0a3 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
@@ -19,6 +19,7 @@ package org.apache.pekko.dispatch
import java.util
import java.util.concurrent.{ Callable, Executor, ExecutorService, Future,
ThreadFactory, TimeUnit }
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import org.apache.pekko.annotation.InternalApi
@@ -39,35 +40,36 @@ final class VirtualizedExecutorService(
require(vtFactory != null, "Virtual thread factory must not be null")
require(loadMetricsProvider != null, "Load metrics provider must not be
null")
+ private final val UnderlyingRunning = 0
+ private final val UnderlyingShutdown = 1
+ private final val UnderlyingShutdownNow = 2
+
private val executor =
VirtualThreadSupport.newThreadPerTaskExecutor(vtFactory)
+ private val underlyingShutdownScheduled = new AtomicBoolean(false)
+ private val underlyingShutdownNowRequested = new AtomicBoolean(false)
+ private val underlyingShutdownState = new AtomicInteger(UnderlyingRunning)
override def atFullThrottle(): Boolean = loadMetricsProvider(this)
override def shutdown(): Unit = {
executor.shutdown()
- if (cascadeShutdown && (underlying ne null)) {
- underlying.shutdown()
- }
+ shutdownUnderlyingWhenVirtualThreadsHaveTerminated()
}
override def shutdownNow(): util.List[Runnable] = {
- val r = executor.shutdownNow()
- if (cascadeShutdown && (underlying ne null)) {
- underlying.shutdownNow()
- }
- r
+ underlyingShutdownNowRequested.set(true)
+ val result = executor.shutdownNow()
+ shutdownUnderlyingWhenVirtualThreadsHaveTerminated()
+ result
}
- override def isShutdown: Boolean = {
- if (cascadeShutdown) {
- executor.isShutdown && ((underlying eq null) || underlying.isShutdown)
- } else {
- executor.isShutdown
- }
- }
+ override def isShutdown: Boolean = executor.isShutdown
override def isTerminated: Boolean = {
if (cascadeShutdown) {
+ if (executor.isTerminated) {
+ shutdownUnderlying()
+ }
executor.isTerminated && ((underlying eq null) ||
underlying.isTerminated)
} else {
executor.isTerminated
@@ -76,12 +78,61 @@ final class VirtualizedExecutorService(
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
if (cascadeShutdown) {
- executor.awaitTermination(timeout, unit) && ((underlying eq null) ||
underlying.awaitTermination(timeout, unit))
+ val timeoutNanos = unit.toNanos(timeout)
+ val deadline = System.nanoTime() + timeoutNanos
+ executor.awaitTermination(timeout, unit) && {
+ shutdownUnderlying()
+ (underlying eq null) || {
+ val remainingNanos = deadline - System.nanoTime()
+ if (remainingNanos <= 0L) underlying.isTerminated
+ else underlying.awaitTermination(remainingNanos,
TimeUnit.NANOSECONDS)
+ }
+ }
} else {
executor.awaitTermination(timeout, unit)
}
}
+ private def shutdownUnderlyingWhenVirtualThreadsHaveTerminated(): Unit = {
+ if (cascadeShutdown && (underlying ne null)) {
+ if (executor.isTerminated) {
+ shutdownUnderlying()
+ } else if (underlyingShutdownScheduled.compareAndSet(false, true)) {
+ VirtualThreadSupport.startVirtualThread(
+ "pekko-virtualized-executor-shutdown",
+ new Runnable {
+ override def run(): Unit = {
+ var interrupted = false
+ try {
+ while (!executor.isTerminated) {
+ try executor.awaitTermination(Long.MaxValue,
TimeUnit.NANOSECONDS)
+ catch {
+ case _: InterruptedException => interrupted = true
+ }
+ }
+ } finally {
+ shutdownUnderlying()
+ if (interrupted) {
+ Thread.currentThread().interrupt()
+ }
+ }
+ }
+ })
+ }
+ }
+ }
+
+ private def shutdownUnderlying(): Unit = {
+ if (cascadeShutdown && (underlying ne null) &&
underlyingShutdownNowRequested.get) {
+ if (underlyingShutdownState.getAndSet(UnderlyingShutdownNow) !=
UnderlyingShutdownNow) {
+ underlying.shutdownNow()
+ }
+ } else if (cascadeShutdown && (underlying ne null) &&
+ underlyingShutdownState.compareAndSet(UnderlyingRunning,
UnderlyingShutdown)) {
+ underlying.shutdown()
+ }
+ }
+
override def submit[T](task: Callable[T]): Future[T] = {
executor.submit(task)
}
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
index 7f036dbc7d..d12b6ab814 100644
--- a/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
+++ b/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
@@ -55,11 +55,15 @@ class LateConnectSpec extends
ArteryMultiNodeSpec(LateConnectSpec.config) with I
val probeB = TestProbe()(systemB)
val echoA =
systemB.actorSelection(RootActorPath(RARP(system).provider.getDefaultAddress) /
"user" / "echoA")
- echoA.tell("ping2", probeB.ref)
- probeB.expectMsg(10.seconds, "ping2")
-
- echoB ! "ping3"
- expectMsg("ping3")
+ awaitAssert({
+ echoA.tell("ping2", probeB.ref)
+ probeB.expectMsg(1.second, "ping2")
+ }, 20.seconds)
+
+ awaitAssert({
+ echoB ! "ping3"
+ expectMsg(1.second, "ping3")
+ }, 20.seconds)
}
}
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index af8ba74235..5d563cf5e8 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -381,11 +381,15 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with
WithLogCapturing {
}
"resume after multiple failures if resume supervision is in place" in {
+ implicit val ec: ExecutionContext = system.dispatcher
+
val expected =
Source(1 to 10)
.mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
- if (elem % 4 < 3) Future.failed(new TE("BOOM!"))
- else Future.successful(elem)
+ Future {
+ if (elem % 4 < 3) throw new TE("BOOM!")
+ else elem
+ }
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]