This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 880b0075d6 fix: Fix virtualized dispatcher shutdown ordering (#2925)
880b0075d6 is described below

commit 880b0075d6459b64f99cea10ba90f57daa591b14
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue May 5 00:06:03 2026 +0800

    fix: Fix virtualized dispatcher shutdown ordering (#2925)
    
    * fix: Fix virtualized dispatcher shutdown ordering
    
    Motivation:
    JDK 21 and 25 can reject virtual-thread continuations when a custom 
scheduler is shut down before already-started virtual threads have finished. 
This breaks virtualized dispatcher shutdown paths in nightly runs.
    
    Modification:
    Shut down the thread-per-task virtual-thread executor first, then cascade 
shutdown to the owned underlying scheduler only after the virtual-thread 
executor has terminated. Add a regression test that verifies a blocked virtual 
thread can resume and finish after dispatcher shutdown.
    
    Result:
    The shutdown behavior now matches ExecutorService semantics while avoiding 
premature scheduler teardown across JDK 21 and JDK 25.
    
    References:
    apache/pekko nightly run 24984219012
    
    * fix: Stabilize nightly failure tests
    
    * fix: Preserve virtualized shutdownNow cascade
    
    Motivation:
    PR review identified that deferred cascade shutdown changed shutdownNow 
semantics for virtualized dispatchers by eventually invoking 
underlying.shutdown() instead of underlying.shutdownNow().
    
    Modification:
    Track shutdownNow intent separately from the deferred shutdown waiter and 
use a small atomic state machine so an owned scheduler is shut down gracefully 
for shutdown(), but escalates to shutdownNow() for shutdownNow() after virtual 
threads have terminated. Restore unrelated delegate-method formatting and 
strengthen the JDK21+ regression spec with explicit scheduler shutdown-mode 
assertions.
    
    Result:
    Virtualized dispatchers keep the delayed scheduler shutdown needed for 
blocked virtual threads while preserving ExecutorService shutdownNow semantics 
for owned schedulers.
    
    * test: stabilize DNS and mapAsyncPartitioned CI checks
    
    * Improve virtualized dispatcher shutdown coordination
---
 .../dispatch/ForkJoinPoolVirtualThreadSpec.scala   | 145 +++++++++++++++++++++
 .../apache/pekko/io/dns/DockerBindDnsService.scala |  26 +++-
 .../pekko/routing/MetricsBasedResizerSpec.scala    |   3 +
 .../pekko/dispatch/VirtualThreadSupport.scala      |  10 ++
 .../dispatch/VirtualizedExecutorService.scala      |  83 +++++++++---
 .../pekko/remote/artery/LateConnectSpec.scala      |  14 +-
 .../scaladsl/FlowMapAsyncPartitionedSpec.scala     |   8 +-
 7 files changed, 260 insertions(+), 29 deletions(-)

diff --git 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
index dd97a74a0d..00f0fb6e0b 100644
--- 
a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
+++ 
b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala
@@ -17,6 +17,10 @@
 
 package org.apache.pekko.dispatch
 
+import java.util
+import java.util.concurrent.{ AbstractExecutorService, CountDownLatch, 
ExecutorService, Executors, TimeUnit }
+import java.util.concurrent.atomic.AtomicBoolean
+
 import org.apache.pekko
 import pekko.actor.{ Actor, Props }
 import pekko.testkit.{ ImplicitSender, PekkoSpec }
@@ -61,6 +65,33 @@ object ForkJoinPoolVirtualThreadSpec {
     }
   }
 
+  final class TrackingExecutorService(delegate: ExecutorService) extends 
AbstractExecutorService {
+    val shutdownCalled = new AtomicBoolean(false)
+    val shutdownNowCalled = new AtomicBoolean(false)
+
+    override def shutdown(): Unit = {
+      shutdownCalled.set(true)
+      delegate.shutdown()
+    }
+
+    override def shutdownNow(): util.List[Runnable] = {
+      shutdownNowCalled.set(true)
+      delegate.shutdownNow()
+    }
+
+    override def isShutdown: Boolean = delegate.isShutdown
+
+    override def isTerminated: Boolean = delegate.isTerminated
+
+    override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
+      delegate.awaitTermination(timeout, unit)
+    }
+
+    override def execute(command: Runnable): Unit = {
+      delegate.execute(command)
+    }
+  }
+
 }
 
 class ForkJoinPoolVirtualThreadSpec extends 
PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
@@ -86,5 +117,119 @@ class ForkJoinPoolVirtualThreadSpec extends 
PekkoSpec(ForkJoinPoolVirtualThreadS
       }
     }
 
+    "allow blocked virtual threads to finish after dispatcher shutdown" in {
+      val underlying = new 
TrackingExecutorService(Executors.newFixedThreadPool(1))
+      val executor = new VirtualizedExecutorService(
+        
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
 0, underlying),
+        underlying,
+        _ => false,
+        cascadeShutdown = true)
+
+      val started = new CountDownLatch(1)
+      val release = new CountDownLatch(1)
+      val finished = new CountDownLatch(1)
+
+      try {
+        executor.execute(new Runnable {
+          override def run(): Unit = {
+            started.countDown()
+            release.await()
+            finished.countDown()
+          }
+        })
+
+        started.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.shutdown()
+        underlying.isShutdown should ===(false)
+
+        release.countDown()
+        finished.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+        underlying.shutdownCalled.get should ===(true)
+        underlying.shutdownNowCalled.get should ===(false)
+        underlying.isTerminated should ===(true)
+      } finally {
+        release.countDown()
+        executor.shutdownNow()
+        underlying.shutdownNow()
+      }
+    }
+
+    "interrupt blocked virtual threads on dispatcher shutdownNow" in {
+      val underlying = new 
TrackingExecutorService(Executors.newFixedThreadPool(1))
+      val executor = new VirtualizedExecutorService(
+        
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
 0, underlying),
+        underlying,
+        _ => false,
+        cascadeShutdown = true)
+
+      val started = new CountDownLatch(1)
+      val release = new CountDownLatch(1)
+      val interrupted = new CountDownLatch(1)
+
+      try {
+        executor.execute(new Runnable {
+          override def run(): Unit = {
+            started.countDown()
+            try release.await()
+            catch {
+              case _: InterruptedException => interrupted.countDown()
+            }
+          }
+        })
+
+        started.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.shutdownNow()
+
+        interrupted.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+        underlying.shutdownNowCalled.get should ===(true)
+        underlying.isTerminated should ===(true)
+      } finally {
+        release.countDown()
+        executor.shutdownNow()
+        underlying.shutdownNow()
+      }
+    }
+
+    "escalate to underlying shutdownNow after dispatcher shutdown" in {
+      val underlying = new 
TrackingExecutorService(Executors.newFixedThreadPool(1))
+      val executor = new VirtualizedExecutorService(
+        
VirtualThreadSupport.newVirtualThreadFactory("fork-join-pool-virtual-thread-spec",
 0, underlying),
+        underlying,
+        _ => false,
+        cascadeShutdown = true)
+
+      val started = new CountDownLatch(1)
+      val release = new CountDownLatch(1)
+      val interrupted = new CountDownLatch(1)
+
+      try {
+        executor.execute(new Runnable {
+          override def run(): Unit = {
+            started.countDown()
+            try release.await()
+            catch {
+              case _: InterruptedException => interrupted.countDown()
+            }
+          }
+        })
+
+        started.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.shutdown()
+        underlying.isShutdown should ===(false)
+
+        executor.shutdownNow()
+        interrupted.await(5, TimeUnit.SECONDS) should ===(true)
+        executor.awaitTermination(5, TimeUnit.SECONDS) should ===(true)
+        underlying.shutdownNowCalled.get should ===(true)
+        underlying.isTerminated should ===(true)
+      } finally {
+        release.countDown()
+        executor.shutdownNow()
+        underlying.shutdownNow()
+      }
+    }
+
   }
 }
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
index 66326512a6..ab27bb2514 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/DockerBindDnsService.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.io.dns
 
+import java.net.{ InetAddress, InetSocketAddress }
+
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
 import scala.util.Try
@@ -25,7 +27,8 @@ import com.github.dockerjava.core.{ 
DefaultDockerClientConfig, DockerClientConfi
 import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
 
 import org.apache.pekko
-import pekko.io.{ Dns, IO }
+import pekko.actor.Props
+import pekko.io.dns.internal.DnsClient
 import pekko.util.Timeout
 
 import pekko.testkit.PekkoSpec
@@ -122,11 +125,22 @@ abstract class DockerBindDnsService(config: Config) 
extends PekkoSpec(config) wi
 
       reader.toString should include("Starting BIND")
     }
-    eventually(timeout(25.seconds)) {
-      import pekko.pattern.ask
-      implicit val timeout: Timeout = 2.seconds
-      (IO(Dns) ? DnsProtocol.Resolve("a-single.foo.test", DnsProtocol.Ip(ipv6 
= false))).mapTo[
-        DnsProtocol.Resolved].futureValue
+    val readinessNameserver = new 
InetSocketAddress(InetAddress.getByName("127.0.0.1"), hostPort)
+    val readinessClient = system.actorOf(Props(new 
DnsClient(readinessNameserver)))
+    try {
+      var requestId = 0
+      eventually(timeout(25.seconds)) {
+        import pekko.pattern.ask
+        implicit val timeout: Timeout = 2.seconds
+        requestId += 1
+        val answer = (readinessClient ? DnsClient.Question4(requestId.toShort, 
"a-single.foo.test"))
+          .mapTo[DnsClient.Answer]
+          .futureValue
+        answer.rrs.collect { case record: ARecord => record.ip } shouldEqual 
Seq(
+          InetAddress.getByName("192.168.1.20"))
+      }
+    } finally {
+      system.stop(readinessClient)
     }
   }
 
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
index 59cf8343d0..984725d17d 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/routing/MetricsBasedResizerSpec.scala
@@ -241,6 +241,9 @@ class MetricsBasedResizerSpec extends 
PekkoSpec(ResizerSpec.config) with Default
 
       router.mockSend(await = true, routeeIdx = 0)
       router.mockSend(await = false, routeeIdx = 1)
+      awaitAssert {
+        resizer.updatedStats(router.routees, router.msgs.size)._1.get(2) 
should not be empty
+      }
       resizer.reportMessageCount(router.routees, router.msgs.size)
       resizer.performanceLog.get(2) should not be empty
 
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
index 257da4d97b..4d2595bce6 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala
@@ -55,6 +55,16 @@ private[dispatch] object VirtualThreadSupport {
     }
   }
 
+  /**
+   * Start a virtual thread using the default virtual thread scheduler.
+   */
+  def startVirtualThread(prefix: String, runnable: Runnable): Thread = {
+    require(runnable != null, "runnable should not be null.")
+    val thread = newVirtualThreadFactory(prefix, -1).newThread(runnable)
+    thread.start()
+    thread
+  }
+
   /**
    * Create a virtual thread factory with the default Virtual Thread executor.
    * @param prefix the prefix of the virtual thread name.
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
index 5a6a80a38d..be6cf1b0a3 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala
@@ -19,6 +19,7 @@ package org.apache.pekko.dispatch
 
 import java.util
 import java.util.concurrent.{ Callable, Executor, ExecutorService, Future, 
ThreadFactory, TimeUnit }
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
 
 import org.apache.pekko.annotation.InternalApi
 
@@ -39,35 +40,36 @@ final class VirtualizedExecutorService(
   require(vtFactory != null, "Virtual thread factory must not be null")
   require(loadMetricsProvider != null, "Load metrics provider must not be 
null")
 
+  private final val UnderlyingRunning = 0
+  private final val UnderlyingShutdown = 1
+  private final val UnderlyingShutdownNow = 2
+
   private val executor = 
VirtualThreadSupport.newThreadPerTaskExecutor(vtFactory)
+  private val underlyingShutdownScheduled = new AtomicBoolean(false)
+  private val underlyingShutdownNowRequested = new AtomicBoolean(false)
+  private val underlyingShutdownState = new AtomicInteger(UnderlyingRunning)
 
   override def atFullThrottle(): Boolean = loadMetricsProvider(this)
 
   override def shutdown(): Unit = {
     executor.shutdown()
-    if (cascadeShutdown && (underlying ne null)) {
-      underlying.shutdown()
-    }
+    shutdownUnderlyingWhenVirtualThreadsHaveTerminated()
   }
 
   override def shutdownNow(): util.List[Runnable] = {
-    val r = executor.shutdownNow()
-    if (cascadeShutdown && (underlying ne null)) {
-      underlying.shutdownNow()
-    }
-    r
+    underlyingShutdownNowRequested.set(true)
+    val result = executor.shutdownNow()
+    shutdownUnderlyingWhenVirtualThreadsHaveTerminated()
+    result
   }
 
-  override def isShutdown: Boolean = {
-    if (cascadeShutdown) {
-      executor.isShutdown && ((underlying eq null) || underlying.isShutdown)
-    } else {
-      executor.isShutdown
-    }
-  }
+  override def isShutdown: Boolean = executor.isShutdown
 
   override def isTerminated: Boolean = {
     if (cascadeShutdown) {
+      if (executor.isTerminated) {
+        shutdownUnderlying()
+      }
       executor.isTerminated && ((underlying eq null) || 
underlying.isTerminated)
     } else {
       executor.isTerminated
@@ -76,12 +78,61 @@ final class VirtualizedExecutorService(
 
   override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
     if (cascadeShutdown) {
-      executor.awaitTermination(timeout, unit) && ((underlying eq null) || 
underlying.awaitTermination(timeout, unit))
+      val timeoutNanos = unit.toNanos(timeout)
+      val deadline = System.nanoTime() + timeoutNanos
+      executor.awaitTermination(timeout, unit) && {
+        shutdownUnderlying()
+        (underlying eq null) || {
+          val remainingNanos = deadline - System.nanoTime()
+          if (remainingNanos <= 0L) underlying.isTerminated
+          else underlying.awaitTermination(remainingNanos, 
TimeUnit.NANOSECONDS)
+        }
+      }
     } else {
       executor.awaitTermination(timeout, unit)
     }
   }
 
+  private def shutdownUnderlyingWhenVirtualThreadsHaveTerminated(): Unit = {
+    if (cascadeShutdown && (underlying ne null)) {
+      if (executor.isTerminated) {
+        shutdownUnderlying()
+      } else if (underlyingShutdownScheduled.compareAndSet(false, true)) {
+        VirtualThreadSupport.startVirtualThread(
+          "pekko-virtualized-executor-shutdown",
+          new Runnable {
+            override def run(): Unit = {
+              var interrupted = false
+              try {
+                while (!executor.isTerminated) {
+                  try executor.awaitTermination(Long.MaxValue, 
TimeUnit.NANOSECONDS)
+                  catch {
+                    case _: InterruptedException => interrupted = true
+                  }
+                }
+              } finally {
+                shutdownUnderlying()
+                if (interrupted) {
+                  Thread.currentThread().interrupt()
+                }
+              }
+            }
+          })
+      }
+    }
+  }
+
+  private def shutdownUnderlying(): Unit = {
+    if (cascadeShutdown && (underlying ne null) && 
underlyingShutdownNowRequested.get) {
+      if (underlyingShutdownState.getAndSet(UnderlyingShutdownNow) != 
UnderlyingShutdownNow) {
+        underlying.shutdownNow()
+      }
+    } else if (cascadeShutdown && (underlying ne null) &&
+      underlyingShutdownState.compareAndSet(UnderlyingRunning, 
UnderlyingShutdown)) {
+      underlying.shutdown()
+    }
+  }
+
   override def submit[T](task: Callable[T]): Future[T] = {
     executor.submit(task)
   }
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala 
b/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
index 7f036dbc7d..d12b6ab814 100644
--- a/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
+++ b/remote/src/test/scala/org/apache/pekko/remote/artery/LateConnectSpec.scala
@@ -55,11 +55,15 @@ class LateConnectSpec extends 
ArteryMultiNodeSpec(LateConnectSpec.config) with I
 
       val probeB = TestProbe()(systemB)
       val echoA = 
systemB.actorSelection(RootActorPath(RARP(system).provider.getDefaultAddress) / 
"user" / "echoA")
-      echoA.tell("ping2", probeB.ref)
-      probeB.expectMsg(10.seconds, "ping2")
-
-      echoB ! "ping3"
-      expectMsg("ping3")
+      awaitAssert({
+          echoA.tell("ping2", probeB.ref)
+          probeB.expectMsg(1.second, "ping2")
+        }, 20.seconds)
+
+      awaitAssert({
+          echoB ! "ping3"
+          expectMsg(1.second, "ping3")
+        }, 20.seconds)
     }
   }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index af8ba74235..5d563cf5e8 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -381,11 +381,15 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with 
WithLogCapturing {
     }
 
     "resume after multiple failures if resume supervision is in place" in {
+      implicit val ec: ExecutionContext = system.dispatcher
+
       val expected =
         Source(1 to 10)
           .mapAsyncPartitioned(4)(_ % 3) { (elem, _) =>
-            if (elem % 4 < 3) Future.failed(new TE("BOOM!"))
-            else Future.successful(elem)
+            Future {
+              if (elem % 4 < 3) throw new TE("BOOM!")
+              else elem
+            }
           }
           
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
           .runWith(Sink.seq)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to