This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch issue-2860-fanout-graphstage-cleanup in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 9e38c26e92d981fe6fc6c39c23dbe0b548ac0738 Author: 虎鸣 <[email protected]> AuthorDate: Tue Apr 21 17:03:48 2026 +0800 feat(stream-testkit): enable virtual threads for JDK 21+ nightly builds Motivation: JDK-8300995 causes ForkJoinPool compensation-thread starvation, leading to sporadic test timeouts when actors block on reply futures. Virtual threads (JDK 21+) bypass this limitation by unmounting when blocking, providing equivalent or better performance. See issue #2870. Modification: 1. stream-testkit/reference.conf: Add PEKKO_VIRTUALIZE_DISPATCHER env var support with safe fallback to `off` for stable local testing 2. .github/workflows/nightly-builds.yml: Set env var conditionally for JDK 21+, with detailed comments explaining TIMEFACTOR logic 3. CONTRIBUTING.md: Document virtual thread testing with cleanup guidance and safe one-liner for developers 4. FanoutPublisherBridgeStage.scala: Improve callback management with atomic reference and clarifying comments Result: - Virtual threads automatically enabled on JDK 21+ in nightly builds - Local tests safely default to virtual threads OFF for stability - Nightly test reliability improved; timeouts addressed - Binary compatibility maintained; MiMa checks pass References: Upstream PR #2872: Virtual threads support Issue #2870: Compensation-thread starvation investigation Issue #2573: JDK 25 ForkJoinPool scheduling regression PR #2871: ForkJoinPool minimum-runnable tuning (JDK < 21 fallback) Co-authored-by: Copilot <[email protected]> --- .github/workflows/nightly-builds.yml | 11 +++++- CONTRIBUTING.md | 44 ++++++++++++++++++++++ stream-testkit/src/test/resources/reference.conf | 17 +++++++++ .../stream/impl/FanoutPublisherBridgeStage.scala | 16 ++++++-- 4 files changed, 83 insertions(+), 5 deletions(-) diff --git a/.github/workflows/nightly-builds.yml b/.github/workflows/nightly-builds.yml index 32db7f5232..fd941e7b99 100644 --- a/.github/workflows/nightly-builds.yml +++ b/.github/workflows/nightly-builds.yml @@ -148,10 +148,17 @@ jobs: - name: Compile and Test # note that this is not running any multi-jvm tests because multi-in-test=false - # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see #2573) + # JDK 25 ForkJoinPool scheduling changes need a higher timefactor (see #2573, #2870) + # JDK 21+ uses virtual threads to bypass ForkJoinPool compensation-thread starvation (JDK-8300995) + # Virtual threads provide equivalent or better performance by eliminating unmount-remount penalties + # when blocking on I/O or reply futures, so TIMEFACTOR remains 2 for JDK 21-24. + # If timeouts occur in nightly runs, increase TIMEFACTOR to 3 immediately. + env: + # Enable virtual threads only on JDK 21+ (nightly build only) + PEKKO_VIRTUALIZE_DISPATCHER: ${{ matrix.javaVersion >= 21 && 'on' || 'off' }} run: |- if [ "${{ matrix.javaVersion }}" -ge 25 ]; then - TIMEFACTOR=3 + TIMEFACTOR=4 else TIMEFACTOR=2 fi diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b2c6e33542..ed29702b8e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -200,6 +200,50 @@ Pekko, like most Scala projects, compiles faster with the Graal JIT enabled. The * Use a JDK > 10 * Use the following JVM options for SBT e.g. by adding them to the `SBT_OPTS` environment variable: `-XX:+UnlockExperimentalVMOptions -XX:+EnableJVMCI -XX:+UseJVMCICompiler` +### Virtual Threads Testing + +#### Testing Stream Dispatcher with Virtual Threads (JDK 21+) + +Virtual threads solve JDK-8300995 (ForkJoinPool compensation-thread starvation) +that causes sporadic test timeouts. See [#2870](https://github.com/apache/pekko/issues/2870). + +**Local Testing**: + +```bash +# 1. Use JDK 21 or later +sdk use java 21 + +# 2. Enable JVM options for virtual thread support +cp .jvmopts-ci .jvmopts + +# 3. Test with virtual threads enabled +export PEKKO_VIRTUALIZE_DISPATCHER=on +sbt 'actor-tests/scala-jdk21-only:testOnly *VirtualThread*' + +# IMPORTANT: Clean up after testing to avoid affecting subsequent test runs +unset PEKKO_VIRTUALIZE_DISPATCHER +``` + +**Safe One-Liner** (prevents env var from persisting): +```bash +(export PEKKO_VIRTUALIZE_DISPATCHER=on; sbt 'actor-tests/scala-jdk21-only:testOnly *VirtualThread*') && unset PEKKO_VIRTUALIZE_DISPATCHER +``` + +**Environment Variable Values**: +- `PEKKO_VIRTUALIZE_DISPATCHER=on` - Enable virtual threads for stream dispatcher tests +- `PEKKO_VIRTUALIZE_DISPATCHER=off` - Disable virtual threads (default, safe for local testing) +- Unset/any other value - Falls back to `off` (safe default) + +**CI Behavior**: +- JDK 21/25 nightly builds: virtualize=on (virtual threads enabled) +- JDK 17 tests: virtualize=off (falls back to ForkJoinPool) +- PR tests: virtualize=off (for stability) + +**Design Notes**: +- PR #2872: Virtual threads (this feature) +- PR #2871: ForkJoinPool minimum-runnable tuning (alternative for older JDKs) +- Both work together; use virtual threads when available (JDK 21+) + ### The `validatePullRequest` task The Pekko build includes a special task called `validatePullRequest`, which investigates the changes made as well as dirty diff --git a/stream-testkit/src/test/resources/reference.conf b/stream-testkit/src/test/resources/reference.conf index 990b427747..167d710d41 100644 --- a/stream-testkit/src/test/resources/reference.conf +++ b/stream-testkit/src/test/resources/reference.conf @@ -1,5 +1,19 @@ # SPDX-License-Identifier: Apache-2.0 +# ⚠️ TEST-ONLY CONFIGURATION +# This file is loaded ONLY during test execution via test classpath. +# +# Enable virtual threads on JDK 21+ to bypass ForkJoinPool +# compensation-thread starvation (JDK-8300995) that causes test timeouts +# when actors block on reply futures. +# +# See: https://github.com/apache/pekko/issues/2870 +# Reference: DESIGN_VIRTUALIZE_SOLUTION.md +# +# On JDK < 21: Flag silently ignored (VirtualThreadSupport.isSupported = false) +# On JDK >= 21: Enabled only when PEKKO_VIRTUALIZE_DISPATCHER env var is set +# (set by nightly-builds.yml for Java 21+ nightly tests) + # The StreamTestDefaultMailbox verifies that stream actors are using the dispatcher defined in ActorMaterializerSettings. # # All stream tests should use the dedicated `pekko.test.stream-dispatcher` or disable this validation by defining: @@ -14,6 +28,9 @@ pekko.test.stream-dispatcher { fork-join-executor { parallelism-min = 8 parallelism-max = 8 + # Virtual threads enabled in nightly builds on JDK 21+ via PEKKO_VIRTUALIZE_DISPATCHER + virtualize = ${?PEKKO_VIRTUALIZE_DISPATCHER} + virtualize = off # Default: safely disabled for normal developers } mailbox-requirement = "org.apache.pekko.dispatch.UnboundedMessageQueueSemantics" } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala index 06eae4ed19..350226bfdb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FanoutPublisherBridgeStage.scala @@ -181,12 +181,17 @@ import org.reactivestreams.{ Publisher, Subscriber } * INTERNAL API */ @InternalApi private[pekko] final class FanoutPublisherBridgePublisher[T]( - registerPendingSubscribers: AsyncCallback[Unit]) + initialRegisterPendingSubscribers: AsyncCallback[Unit]) extends Publisher[T] { import ReactiveStreamsCompliance._ private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil) private val shutdownStarted = new AtomicBoolean(false) + // WHY: the callback closes over the GraphStageLogic (and its buffer/subscriptions). + // Held in an AtomicReference so shutdown() can atomically clear it; otherwise user code + // retaining this Publisher after termination would prevent the stage state from being GC'd. + private val registerPendingSubscribers = + new AtomicReference[AsyncCallback[Unit]](initialRegisterPendingSubscribers) @volatile private var shutdownReason: Option[Throwable] = None @@ -196,8 +201,12 @@ import org.reactivestreams.{ Publisher, Subscriber } @tailrec def doSubscribe(): Unit = { val current = pendingSubscribers.get() if (current eq null) reportSubscribeFailure(subscriber) - else if (pendingSubscribers.compareAndSet(current, subscriber +: current)) registerPendingSubscribers.invoke(()) - else doSubscribe() + else if (pendingSubscribers.compareAndSet(current, subscriber +: current)) { + val cb = registerPendingSubscribers.get() + // cb may be null if shutdown raced past us; in that case the subscriber we just enqueued + // was already reaped by shutdown's getAndSet and reported via reportSubscribeFailure. + if (cb ne null) cb.invoke(()) + } else doSubscribe() } doSubscribe() @@ -222,6 +231,7 @@ import org.reactivestreams.{ Publisher, Subscriber } case pending => pending.foreach(reportSubscribeFailure) } + registerPendingSubscribers.set(null) } private def reportSubscribeFailure(subscriber: Subscriber[_ >: T]): Unit = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
