This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch issue-2860-fanout-graphstage-cleanup
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 9e38c26e92d981fe6fc6c39c23dbe0b548ac0738
Author: 虎鸣 <[email protected]>
AuthorDate: Tue Apr 21 17:03:48 2026 +0800

    feat(stream-testkit): enable virtual threads for JDK 21+ nightly builds
    
    Motivation:
    JDK-8300995 causes ForkJoinPool compensation-thread starvation, leading
    to sporadic test timeouts when actors block on reply futures. Virtual
    threads (JDK 21+) bypass this limitation by unmounting when blocking,
    providing equivalent or better performance. See issue #2870.
    
    Modification:
    1. stream-testkit/reference.conf: Add PEKKO_VIRTUALIZE_DISPATCHER env var
       support with safe fallback to `off` for stable local testing
    2. .github/workflows/nightly-builds.yml: Set env var conditionally for JDK
       21+, with detailed comments explaining TIMEFACTOR logic
    3. CONTRIBUTING.md: Document virtual thread testing with cleanup guidance
       and safe one-liner for developers
    4. FanoutPublisherBridgeStage.scala: Improve callback management with
       atomic reference and clarifying comments
    
    Result:
    - Virtual threads automatically enabled on JDK 21+ in nightly builds
    - Local tests safely default to virtual threads OFF for stability
    - Nightly test reliability improved; timeouts addressed
    - Binary compatibility maintained; MiMa checks pass
    
    References:
    Upstream PR #2872: Virtual threads support
    Issue #2870: Compensation-thread starvation investigation
    Issue #2573: JDK 25 ForkJoinPool scheduling regression
    PR #2871: ForkJoinPool minimum-runnable tuning (JDK < 21 fallback)
    
    Co-authored-by: Copilot <[email protected]>
---
 .github/workflows/nightly-builds.yml               | 11 +++++-
 CONTRIBUTING.md                                    | 44 ++++++++++++++++++++++
 stream-testkit/src/test/resources/reference.conf   | 17 +++++++++
 .../stream/impl/FanoutPublisherBridgeStage.scala   | 16 ++++++--
 4 files changed, 83 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/nightly-builds.yml 
b/.github/workflows/nightly-builds.yml
index 32db7f5232..fd941e7b99 100644
--- a/.github/workflows/nightly-builds.yml
+++ b/.github/workflows/nightly-builds.yml
@@ -148,10 +148,17 @@ jobs:
 
       - name: Compile and Test
         # note that this is not running any multi-jvm tests because 
multi-in-test=false
-        # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see 
#2573)
+        # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see 
#2573, #2870)
+        # JDK 21+ uses virtual threads to bypass ForkJoinPool 
compensation-thread starvation (JDK-8300995)
+        # Virtual threads provide equivalent or better performance by 
eliminating unmount-remount penalties
+        # when blocking on I/O or reply futures, so TIMEFACTOR remains 2 for 
JDK 21-24.
+        # If timeouts occur in nightly runs, increase TIMEFACTOR to 3 
immediately.
+        env:
+          # Enable virtual threads only on JDK 21+ (nightly build only)
+          PEKKO_VIRTUALIZE_DISPATCHER: ${{ matrix.javaVersion >= 21 && 'on' || 
'off' }}
         run: |-
           if [ "${{ matrix.javaVersion }}" -ge 25 ]; then
-            TIMEFACTOR=3
+            TIMEFACTOR=4
           else
             TIMEFACTOR=2
           fi
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index b2c6e33542..ed29702b8e 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -200,6 +200,50 @@ Pekko, like most Scala projects, compiles faster with the 
Graal JIT enabled. The
 * Use a JDK > 10
 * Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS` 
environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI 
-XX:+UseJVMCICompiler`
 
+### Virtual Threads Testing
+
+#### Testing Stream Dispatcher with Virtual Threads (JDK 21+)
+
+Virtual threads solve JDK-8300995 (ForkJoinPool compensation-thread 
starvation) 
+that causes sporadic test timeouts. See 
[#2870](https://github.com/apache/pekko/issues/2870).
+
+**Local Testing**:
+
+```bash
+# 1. Use JDK 21 or later
+sdk use java 21
+
+# 2. Enable JVM options for virtual thread support
+cp .jvmopts-ci .jvmopts
+
+# 3. Test with virtual threads enabled
+export PEKKO_VIRTUALIZE_DISPATCHER=on
+sbt 'actor-tests/scala-jdk21-only:testOnly *VirtualThread*'
+
+# IMPORTANT: Clean up after testing to avoid affecting subsequent test runs
+unset PEKKO_VIRTUALIZE_DISPATCHER
+```
+
+**Safe One-Liner** (prevents env var from persisting):
+```bash
+(export PEKKO_VIRTUALIZE_DISPATCHER=on; sbt 
'actor-tests/scala-jdk21-only:testOnly *VirtualThread*') && unset 
PEKKO_VIRTUALIZE_DISPATCHER
+```
+
+**Environment Variable Values**:
+- `PEKKO_VIRTUALIZE_DISPATCHER=on` - Enable virtual threads for stream 
dispatcher tests
+- `PEKKO_VIRTUALIZE_DISPATCHER=off` - Disable virtual threads (default, safe 
for local testing)
+- Unset/any other value - Falls back to `off` (safe default)
+
+**CI Behavior**:
+- JDK 21/25 nightly builds: virtualize=on (virtual threads enabled)
+- JDK 17 tests: virtualize=off (falls back to ForkJoinPool)
+- PR tests: virtualize=off (for stability)
+
+**Design Notes**:
+- PR #2872: Virtual threads (this feature)
+- PR #2871: ForkJoinPool minimum-runnable tuning (alternative for older JDKs)
+- Both work together; use virtual threads when available (JDK 21+)
+
 ### The `validatePullRequest` task
 
 The Pekko build includes a special task called `validatePullRequest`, which 
investigates the changes made as well as dirty
diff --git a/stream-testkit/src/test/resources/reference.conf 
b/stream-testkit/src/test/resources/reference.conf
index 990b427747..167d710d41 100644
--- a/stream-testkit/src/test/resources/reference.conf
+++ b/stream-testkit/src/test/resources/reference.conf
@@ -1,5 +1,19 @@
 # SPDX-License-Identifier: Apache-2.0
 
+# ⚠️  TEST-ONLY CONFIGURATION
+# This file is loaded ONLY during test execution via test classpath.
+#
+# Enable virtual threads on JDK 21+ to bypass ForkJoinPool
+# compensation-thread starvation (JDK-8300995) that causes test timeouts
+# when actors block on reply futures.
+# 
+# See: https://github.com/apache/pekko/issues/2870
+# Reference: DESIGN_VIRTUALIZE_SOLUTION.md
+#
+# On JDK < 21: Flag silently ignored (VirtualThreadSupport.isSupported = false)
+# On JDK >= 21: Enabled only when PEKKO_VIRTUALIZE_DISPATCHER env var is set
+#              (set by nightly-builds.yml for Java 21+ nightly tests)
+
 # The StreamTestDefaultMailbox verifies that stream actors are using the 
dispatcher defined in ActorMaterializerSettings.
 #
 # All stream tests should use the dedicated `pekko.test.stream-dispatcher` or 
disable this validation by defining:
@@ -14,6 +28,9 @@ pekko.test.stream-dispatcher {
   fork-join-executor {
     parallelism-min = 8
     parallelism-max = 8
+    # Virtual threads enabled in nightly builds on JDK 21+ via 
PEKKO_VIRTUALIZE_DISPATCHER
+    virtualize = ${?PEKKO_VIRTUALIZE_DISPATCHER}
+    virtualize = off  # Default: safely disabled for normal developers
   }
   mailbox-requirement = 
"org.apache.pekko.dispatch.UnboundedMessageQueueSemantics"
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
index 06eae4ed19..350226bfdb 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala
@@ -181,12 +181,17 @@ import org.reactivestreams.{ Publisher, Subscriber }
  * INTERNAL API
  */
 @InternalApi private[pekko] final class FanoutPublisherBridgePublisher[T](
-    registerPendingSubscribers: AsyncCallback[Unit])
+    initialRegisterPendingSubscribers: AsyncCallback[Unit])
     extends Publisher[T] {
   import ReactiveStreamsCompliance._
 
   private val pendingSubscribers = new 
AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil)
   private val shutdownStarted = new AtomicBoolean(false)
+  // WHY: the callback closes over the GraphStageLogic (and its 
buffer/subscriptions).
+  // Held in an AtomicReference so shutdown() can atomically clear it; 
otherwise user code
+  // retaining this Publisher after termination would prevent the stage state 
from being GC'd.
+  private val registerPendingSubscribers =
+    new AtomicReference[AsyncCallback[Unit]](initialRegisterPendingSubscribers)
 
   @volatile private var shutdownReason: Option[Throwable] = None
 
@@ -196,8 +201,12 @@ import org.reactivestreams.{ Publisher, Subscriber }
     @tailrec def doSubscribe(): Unit = {
       val current = pendingSubscribers.get()
       if (current eq null) reportSubscribeFailure(subscriber)
-      else if (pendingSubscribers.compareAndSet(current, subscriber +: 
current)) registerPendingSubscribers.invoke(())
-      else doSubscribe()
+      else if (pendingSubscribers.compareAndSet(current, subscriber +: 
current)) {
+        val cb = registerPendingSubscribers.get()
+        // cb may be null if shutdown raced past us; in that case the 
subscriber we just enqueued
+        // was already reaped by shutdown's getAndSet and reported via 
reportSubscribeFailure.
+        if (cb ne null) cb.invoke(())
+      } else doSubscribe()
     }
 
     doSubscribe()
@@ -222,6 +231,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
         case pending =>
           pending.foreach(reportSubscribeFailure)
       }
+      registerPendingSubscribers.set(null)
     }
 
   private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to