This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch minimum-runnable-auto-jdk21
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit df8a322606a2b56be51ff81f49bf1dd74852bd3c
Author: He-Pin <[email protected]>
AuthorDate: Thu Apr 23 13:50:37 2026 +0800

    fix: auto-tune ForkJoinPool minimum-runnable on JDK 21+
    
    Motivation:
    JDK-8300995 / JDK-8321335 changed compensation-thread creation in
    ForkJoinPool asyncMode (FIFO) to be much more conservative. Pekko fork-join
    dispatchers using the prior default `minimum-runnable = 1` are then prone
    to starvation under blocking workloads on JDK 21+, which has shown up as
    flaky nightly runs (#2870) and is the root cause behind the workflow
    override added in #2889.
    
    Modification:
    * Introduce `ForkJoinExecutorConfigurator.resolveMinimumRunnable`, an
      internal helper that computes the effective `minimum-runnable` value
      from the configured value, the dispatcher parallelism, and the running
      JDK major version. A negative configured value (the new default `-1`)
      triggers the JDK-aware policy: on JDK 21+ the value becomes
      `min(8, max(1, parallelism / 2))`; on JDK < 21 it stays at `1`.
      Non-negative values are honoured verbatim, so explicit `0` still
      disables compensation entirely and explicit positive values (including
      `1`) keep their existing meaning.
    * Change 
`pekko.actor.default-dispatcher.fork-join-executor.minimum-runnable`
      in `reference.conf` to the sentinel `-1` and update the doc block to
      describe the new auto-selection rule.
    * Add `ForkJoinExecutorConfiguratorSpec` with three groups of assertions:
      (1) pure-function matrix on `resolveMinimumRunnable`; (2) directional
      checks asserting the auto policy strictly raises the value on JDK 21+
      and never exceeds the documented cap of 8; (3) wiring integration that
      builds a `ForkJoinExecutorServiceFactory` from a real dispatcher config
      and verifies the resolved value reaches the factory (guarding against
      regressions of the resolver wiring).
    
    Result:
    Production users on JDK 21+ now benefit from the same starvation
    mitigation that #2889 bolted onto the nightly CI workflow. Source and
    binary compatibility are preserved (constructor defaults stay at `1`,
    no signature changes, no MiMa filter required). Users wanting to opt
    out can set `minimum-runnable = 1` (or any explicit value) to restore
    the previous behaviour.
---
 .../ForkJoinExecutorConfiguratorSpec.scala         | 165 +++++++++++++++++++++
 actor/src/main/resources/reference.conf            |  15 +-
 .../dispatch/ForkJoinExecutorConfigurator.scala    |  38 ++++-
 3 files changed, 207 insertions(+), 11 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
new file mode 100644
index 0000000000..b9e9fc9f91
--- /dev/null
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project.
+ */
+
+package org.apache.pekko.dispatch
+
+import java.util.concurrent.ThreadFactory
+
+import org.apache.pekko
+import pekko.testkit.PekkoSpec
+import pekko.util.JavaVersion
+
+import com.typesafe.config.{ Config, ConfigFactory }
+
+object ForkJoinExecutorConfiguratorSpec {
+  // Keep the root config explicit so the dispatcher-config integration checks 
below
+  // see exactly the values this spec is asserting on (and not whatever the 
project's
+  // global test configuration happens to set).
+  val config: Config = ConfigFactory.parseString("""
+      |fj-auto-default-dispatcher {
+      |  executor = "fork-join-executor"
+      |  fork-join-executor {
+      |    parallelism-min = 8
+      |    parallelism-factor = 1.0
+      |    parallelism-max = 64
+      |  }
+      |}
+      |fj-auto-small-dispatcher {
+      |  executor = "fork-join-executor"
+      |  fork-join-executor {
+      |    parallelism-min = 1
+      |    parallelism-factor = 1.0
+      |    parallelism-max = 1
+      |  }
+      |}
+      |fj-explicit-zero-dispatcher {
+      |  executor = "fork-join-executor"
+      |  fork-join-executor {
+      |    parallelism-min = 8
+      |    parallelism-factor = 1.0
+      |    parallelism-max = 64
+      |    minimum-runnable = 0
+      |  }
+      |}
+      |fj-explicit-seven-dispatcher {
+      |  executor = "fork-join-executor"
+      |  fork-join-executor {
+      |    parallelism-min = 8
+      |    parallelism-factor = 1.0
+      |    parallelism-max = 64
+      |    minimum-runnable = 7
+      |  }
+      |}
+    """.stripMargin)
+}
+
+class ForkJoinExecutorConfiguratorSpec extends 
PekkoSpec(ForkJoinExecutorConfiguratorSpec.config) {
+
+  import ForkJoinExecutorConfigurator.resolveMinimumRunnable
+
+  "ForkJoinExecutorConfigurator.resolveMinimumRunnable" must {
+
+    "honour explicit zero (compensation disabled)" in {
+      resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion 
= 21) shouldBe 0
+      resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion 
= 17) shouldBe 0
+    }
+
+    "honour explicit positive overrides verbatim" in {
+      resolveMinimumRunnable(configured = 1, parallelism = 16, jdkMajorVersion 
= 21) shouldBe 1
+      resolveMinimumRunnable(configured = 7, parallelism = 16, jdkMajorVersion 
= 21) shouldBe 7
+      resolveMinimumRunnable(configured = 100, parallelism = 16, 
jdkMajorVersion = 25) shouldBe 100
+    }
+
+    "auto-resolve to 1 on JDK < 21 regardless of parallelism" in {
+      resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion 
= 17) shouldBe 1
+      resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion 
= 17) shouldBe 1
+      resolveMinimumRunnable(configured = -1, parallelism = 64, 
jdkMajorVersion = 11) shouldBe 1
+    }
+
+    "auto-resolve using parallelism / 2 on JDK 21+ with min cap 1 and max cap 
8" in {
+      resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion 
= 21) shouldBe 1
+      resolveMinimumRunnable(configured = -1, parallelism = 2, jdkMajorVersion 
= 21) shouldBe 1
+      resolveMinimumRunnable(configured = -1, parallelism = 4, jdkMajorVersion 
= 21) shouldBe 2
+      resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion 
= 21) shouldBe 4
+      resolveMinimumRunnable(configured = -1, parallelism = 16, 
jdkMajorVersion = 21) shouldBe 8
+      resolveMinimumRunnable(configured = -1, parallelism = 64, 
jdkMajorVersion = 25) shouldBe 8
+    }
+
+    "produce a strictly higher value on JDK 21+ than on JDK 17 for plausible 
dispatcher sizes" in {
+      // Directional check: the auto policy must move the needle for the JDKs 
that need it.
+      for (parallelism <- Seq(4, 8, 16, 32, 64)) {
+        val legacy = resolveMinimumRunnable(configured = -1, parallelism, 
jdkMajorVersion = 17)
+        val modern = resolveMinimumRunnable(configured = -1, parallelism, 
jdkMajorVersion = 21)
+        withClue(s"parallelism=$parallelism legacy=$legacy modern=$modern: ") {
+          modern should be > legacy
+        }
+      }
+    }
+
+    "never exceed the documented max cap of 8" in {
+      for (parallelism <- 1 to 256; jdk <- Seq(21, 25, 30)) {
+        resolveMinimumRunnable(configured = -1, parallelism, jdk) should be <= 
8
+      }
+    }
+  }
+
+  "ForkJoinExecutorConfigurator wiring" must {
+
+    // Build a factory from a real dispatcher config and return the resolved
+    // minimum-runnable. This proves the config value actually reaches the
+    // ForkJoinExecutorServiceFactory — guarding against the trivial regression
+    // of reverting resolveMinimumRunnable to a direct `config.getInt` read.
+    def resolvedMinimumRunnable(dispatcherId: String): Int = {
+      // `system.dispatchers.config(id)` resolves the dispatcher's full config 
with
+      // reference.conf defaults applied (so `virtualize`, `minimum-runnable`, 
etc.
+      // all have values).
+      val dispatcherConfig = system.dispatchers.config(dispatcherId)
+      val configurator = new ForkJoinExecutorConfigurator(
+        dispatcherConfig.getConfig("fork-join-executor"),
+        system.dispatchers.prerequisites)
+      val tf: ThreadFactory = system.dispatchers.prerequisites.threadFactory
+      val factory = configurator
+        .createExecutorServiceFactory(dispatcherId, tf)
+        .asInstanceOf[configurator.ForkJoinExecutorServiceFactory]
+      factory.minimumRunnable
+    }
+
+    "respect explicit minimum-runnable = 0" in {
+      resolvedMinimumRunnable("fj-explicit-zero-dispatcher") shouldBe 0
+    }
+
+    "respect explicit minimum-runnable = 7" in {
+      resolvedMinimumRunnable("fj-explicit-seven-dispatcher") shouldBe 7
+    }
+
+    "auto-scale the default (minimum-runnable not set) on JDK 21+" in {
+      if (JavaVersion.majorVersion < 21) pending
+
+      val resolved = resolvedMinimumRunnable("fj-auto-default-dispatcher")
+      // The dispatcher declares parallelism-min = 8 so effective parallelism 
is at
+      // least 8; auto = min(8, max(1, parallelism/2)) must be at least 4 and 
never
+      // exceed the documented cap of 8.
+      resolved should be >= 4
+      resolved should be <= 8
+    }
+
+    "keep the legacy value of 1 on JDK < 21 when the default is left 
untouched" in {
+      if (JavaVersion.majorVersion >= 21) pending
+
+      resolvedMinimumRunnable("fj-auto-default-dispatcher") shouldBe 1
+    }
+
+    "never drop below 1 even for parallelism = 1 dispatchers" in {
+      val resolved = resolvedMinimumRunnable("fj-auto-small-dispatcher")
+      // parallelism = 1 implies parallelism/2 = 0, which the min-cap must 
lift to 1.
+      // On JDK < 21 the legacy value of 1 is already the expected answer.
+      resolved shouldBe 1
+    }
+  }
+}
diff --git a/actor/src/main/resources/reference.conf 
b/actor/src/main/resources/reference.conf
index b2afe5f374..ce337a56fb 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -490,12 +490,15 @@ pekko {
         # When blocked worker count causes active threads to drop below this 
threshold, the pool
         # may create a compensation thread to maintain progress.
         #
-        # The default value of 1 matches the JDK ForkJoinPool behaviour prior 
to this setting being
-        # exposed. Higher values (e.g. parallelism/2) can reduce starvation 
risk for actor-heavy
-        # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) 
compensation-thread creation
-        # became more conservative (see JDK-8300995 / JDK-8321335).
-        # Set to 0 to disable compensation entirely.
-        minimum-runnable = 1
+        # The special value -1 (default) selects a JDK-aware policy:
+        #   * JDK 21+ : effective value = min(8, max(1, parallelism / 2))
+        #   * JDK < 21: effective value = 1 (preserves the JDK behaviour prior 
to this setting)
+        # Auto-selection on JDK 21+ mitigates the asyncMode (FIFO) 
compensation-thread regression
+        # tracked in JDK-8300995 / JDK-8321335 that can cause actor-heavy 
workloads to starve.
+        #
+        # Set explicitly to 0 to disable compensation entirely. Set to any 
non-negative integer
+        # to override the auto-selection (e.g. 1 to restore the previous 
default behaviour).
+        minimum-runnable = -1
 
         # This config is new in Pekko v1.2.0 and only has an effect if you are 
running with JDK 21 and above,
         # When set to `on` but the underlying runtime does not support virtual 
threads, an Exception will be thrown.
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index 9dc2c95318..466f7f0eae 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -17,8 +17,31 @@ import java.util.concurrent.{ ExecutorService, ForkJoinPool, 
ForkJoinTask, Threa
 
 import com.typesafe.config.Config
 
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.util.JavaVersion
+
 object ForkJoinExecutorConfigurator {
 
+  /**
+   * INTERNAL API
+   *
+   * Resolves the effective `minimum-runnable` value for a fork-join 
dispatcher.
+   *
+   * A negative value (default `-1` in reference.conf) selects the JDK-aware 
policy:
+   * on JDK 21+ the value is `min(8, max(1, parallelism / 2))` to mitigate the
+   * asyncMode (FIFO) compensation-thread regression tracked in
+   * JDK-8300995 / JDK-8321335; on older JDKs the value stays at `1` to 
preserve
+   * legacy behaviour. Non-negative configured values are honoured verbatim, so
+   * `0` still disables compensation entirely.
+   */
+  @InternalApi private[pekko] def resolveMinimumRunnable(
+      configured: Int,
+      parallelism: Int,
+      jdkMajorVersion: Int): Int =
+    if (configured >= 0) configured
+    else if (jdkMajorVersion >= 21) math.min(8, math.max(1, parallelism / 2))
+    else 1
+
   /**
    * INTERNAL PEKKO USAGE ONLY
    */
@@ -140,16 +163,21 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
           """"task-peeking-mode" in "fork-join-executor" section could only 
set to "FIFO" or "LIFO".""")
     }
 
+    val parallelism = ThreadPoolConfig.scaledPoolSize(
+      config.getInt("parallelism-min"),
+      config.getDouble("parallelism-factor"),
+      config.getInt("parallelism-max"))
+
     new ForkJoinExecutorServiceFactory(
       id,
       validate(tf),
-      ThreadPoolConfig.scaledPoolSize(
-        config.getInt("parallelism-min"),
-        config.getDouble("parallelism-factor"),
-        config.getInt("parallelism-max")),
+      parallelism,
       asyncMode,
       config.getInt("maximum-pool-size"),
-      config.getInt("minimum-runnable")
+      ForkJoinExecutorConfigurator.resolveMinimumRunnable(
+        config.getInt("minimum-runnable"),
+        parallelism,
+        JavaVersion.majorVersion)
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to