This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch minimum-runnable-auto-jdk21 in repository https://gitbox.apache.org/repos/asf/pekko.git
commit df8a322606a2b56be51ff81f49bf1dd74852bd3c Author: He-Pin <[email protected]> AuthorDate: Thu Apr 23 13:50:37 2026 +0800 fix: auto-tune ForkJoinPool minimum-runnable on JDK 21+ Motivation: JDK-8300995 / JDK-8321335 changed compensation-thread creation in ForkJoinPool asyncMode (FIFO) to be much more conservative. Pekko fork-join dispatchers using the prior default `minimum-runnable = 1` are then prone to starvation under blocking workloads on JDK 21+, which has shown up as flaky nightly runs (#2870) and is the root cause behind the workflow override added in #2889. Modification: * Introduce `ForkJoinExecutorConfigurator.resolveMinimumRunnable`, an internal helper that computes the effective `minimum-runnable` value from the configured value, the dispatcher parallelism, and the running JDK major version. A negative configured value (the new default `-1`) triggers the JDK-aware policy: on JDK 21+ the value becomes `min(8, max(1, parallelism / 2))`; on JDK < 21 it stays at `1`. Non-negative values are honoured verbatim, so explicit `0` still disables compensation entirely and explicit positive values (including `1`) keep their existing meaning. * Change `pekko.actor.default-dispatcher.fork-join-executor.minimum-runnable` in `reference.conf` to the sentinel `-1` and update the doc block to describe the new auto-selection rule. * Add `ForkJoinExecutorConfiguratorSpec` with three groups of assertions: (1) pure-function matrix on `resolveMinimumRunnable`; (2) directional checks asserting the auto policy strictly raises the value on JDK 21+ and never exceeds the documented cap of 8; (3) wiring integration that builds a `ForkJoinExecutorServiceFactory` from a real dispatcher config and verifies the resolved value reaches the factory (guarding against regressions of the resolver wiring). Result: Production users on JDK 21+ now benefit from the same starvation mitigation that #2889 bolted onto the nightly CI workflow. Source and binary compatibility are preserved (constructor defaults stay at `1`, no signature changes, no MiMa filter required). Users wanting to opt out can set `minimum-runnable = 1` (or any explicit value) to restore the previous behaviour. --- .../ForkJoinExecutorConfiguratorSpec.scala | 165 +++++++++++++++++++++ actor/src/main/resources/reference.conf | 15 +- .../dispatch/ForkJoinExecutorConfigurator.scala | 38 ++++- 3 files changed, 207 insertions(+), 11 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala new file mode 100644 index 0000000000..b9e9fc9f91 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfiguratorSpec.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project. + */ + +package org.apache.pekko.dispatch + +import java.util.concurrent.ThreadFactory + +import org.apache.pekko +import pekko.testkit.PekkoSpec +import pekko.util.JavaVersion + +import com.typesafe.config.{ Config, ConfigFactory } + +object ForkJoinExecutorConfiguratorSpec { + // Keep the root config explicit so the dispatcher-config integration checks below + // see exactly the values this spec is asserting on (and not whatever the project's + // global test configuration happens to set). + val config: Config = ConfigFactory.parseString(""" + |fj-auto-default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | } + |} + |fj-auto-small-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 1 + | parallelism-factor = 1.0 + | parallelism-max = 1 + | } + |} + |fj-explicit-zero-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | minimum-runnable = 0 + | } + |} + |fj-explicit-seven-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 1.0 + | parallelism-max = 64 + | minimum-runnable = 7 + | } + |} + """.stripMargin) +} + +class ForkJoinExecutorConfiguratorSpec extends PekkoSpec(ForkJoinExecutorConfiguratorSpec.config) { + + import ForkJoinExecutorConfigurator.resolveMinimumRunnable + + "ForkJoinExecutorConfigurator.resolveMinimumRunnable" must { + + "honour explicit zero (compensation disabled)" in { + resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 21) shouldBe 0 + resolveMinimumRunnable(configured = 0, parallelism = 16, jdkMajorVersion = 17) shouldBe 0 + } + + "honour explicit positive overrides verbatim" in { + resolveMinimumRunnable(configured = 1, parallelism = 16, jdkMajorVersion = 21) shouldBe 1 + resolveMinimumRunnable(configured = 7, parallelism = 16, jdkMajorVersion = 21) shouldBe 7 + resolveMinimumRunnable(configured = 100, parallelism = 16, jdkMajorVersion = 25) shouldBe 100 + } + + "auto-resolve to 1 on JDK < 21 regardless of parallelism" in { + resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 17) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 17) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 11) shouldBe 1 + } + + "auto-resolve using parallelism / 2 on JDK 21+ with min cap 1 and max cap 8" in { + resolveMinimumRunnable(configured = -1, parallelism = 1, jdkMajorVersion = 21) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 2, jdkMajorVersion = 21) shouldBe 1 + resolveMinimumRunnable(configured = -1, parallelism = 4, jdkMajorVersion = 21) shouldBe 2 + resolveMinimumRunnable(configured = -1, parallelism = 8, jdkMajorVersion = 21) shouldBe 4 + resolveMinimumRunnable(configured = -1, parallelism = 16, jdkMajorVersion = 21) shouldBe 8 + resolveMinimumRunnable(configured = -1, parallelism = 64, jdkMajorVersion = 25) shouldBe 8 + } + + "produce a strictly higher value on JDK 21+ than on JDK 17 for plausible dispatcher sizes" in { + // Directional check: the auto policy must move the needle for the JDKs that need it. + for (parallelism <- Seq(4, 8, 16, 32, 64)) { + val legacy = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 17) + val modern = resolveMinimumRunnable(configured = -1, parallelism, jdkMajorVersion = 21) + withClue(s"parallelism=$parallelism legacy=$legacy modern=$modern: ") { + modern should be > legacy + } + } + } + + "never exceed the documented max cap of 8" in { + for (parallelism <- 1 to 256; jdk <- Seq(21, 25, 30)) { + resolveMinimumRunnable(configured = -1, parallelism, jdk) should be <= 8 + } + } + } + + "ForkJoinExecutorConfigurator wiring" must { + + // Build a factory from a real dispatcher config and return the resolved + // minimum-runnable. This proves the config value actually reaches the + // ForkJoinExecutorServiceFactory — guarding against the trivial regression + // of reverting resolveMinimumRunnable to a direct `config.getInt` read. + def resolvedMinimumRunnable(dispatcherId: String): Int = { + // `system.dispatchers.config(id)` resolves the dispatcher's full config with + // reference.conf defaults applied (so `virtualize`, `minimum-runnable`, etc. + // all have values). + val dispatcherConfig = system.dispatchers.config(dispatcherId) + val configurator = new ForkJoinExecutorConfigurator( + dispatcherConfig.getConfig("fork-join-executor"), + system.dispatchers.prerequisites) + val tf: ThreadFactory = system.dispatchers.prerequisites.threadFactory + val factory = configurator + .createExecutorServiceFactory(dispatcherId, tf) + .asInstanceOf[configurator.ForkJoinExecutorServiceFactory] + factory.minimumRunnable + } + + "respect explicit minimum-runnable = 0" in { + resolvedMinimumRunnable("fj-explicit-zero-dispatcher") shouldBe 0 + } + + "respect explicit minimum-runnable = 7" in { + resolvedMinimumRunnable("fj-explicit-seven-dispatcher") shouldBe 7 + } + + "auto-scale the default (minimum-runnable not set) on JDK 21+" in { + if (JavaVersion.majorVersion < 21) pending + + val resolved = resolvedMinimumRunnable("fj-auto-default-dispatcher") + // The dispatcher declares parallelism-min = 8 so effective parallelism is at + // least 8; auto = min(8, max(1, parallelism/2)) must be at least 4 and never + // exceed the documented cap of 8. + resolved should be >= 4 + resolved should be <= 8 + } + + "keep the legacy value of 1 on JDK < 21 when the default is left untouched" in { + if (JavaVersion.majorVersion >= 21) pending + + resolvedMinimumRunnable("fj-auto-default-dispatcher") shouldBe 1 + } + + "never drop below 1 even for parallelism = 1 dispatchers" in { + val resolved = resolvedMinimumRunnable("fj-auto-small-dispatcher") + // parallelism = 1 implies parallelism/2 = 0, which the min-cap must lift to 1. + // On JDK < 21 the legacy value of 1 is already the expected answer. + resolved shouldBe 1 + } + } +} diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index b2afe5f374..ce337a56fb 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -490,12 +490,15 @@ pekko { # When blocked worker count causes active threads to drop below this threshold, the pool # may create a compensation thread to maintain progress. # - # The default value of 1 matches the JDK ForkJoinPool behaviour prior to this setting being - # exposed. Higher values (e.g. parallelism/2) can reduce starvation risk for actor-heavy - # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) compensation-thread creation - # became more conservative (see JDK-8300995 / JDK-8321335). - # Set to 0 to disable compensation entirely. - minimum-runnable = 1 + # The special value -1 (default) selects a JDK-aware policy: + # * JDK 21+ : effective value = min(8, max(1, parallelism / 2)) + # * JDK < 21: effective value = 1 (preserves the JDK behaviour prior to this setting) + # Auto-selection on JDK 21+ mitigates the asyncMode (FIFO) compensation-thread regression + # tracked in JDK-8300995 / JDK-8321335 that can cause actor-heavy workloads to starve. + # + # Set explicitly to 0 to disable compensation entirely. Set to any non-negative integer + # to override the auto-selection (e.g. 1 to restore the previous default behaviour). + minimum-runnable = -1 # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 9dc2c95318..466f7f0eae 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -17,8 +17,31 @@ import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, Threa import com.typesafe.config.Config +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.util.JavaVersion + object ForkJoinExecutorConfigurator { + /** + * INTERNAL API + * + * Resolves the effective `minimum-runnable` value for a fork-join dispatcher. + * + * A negative value (default `-1` in reference.conf) selects the JDK-aware policy: + * on JDK 21+ the value is `min(8, max(1, parallelism / 2))` to mitigate the + * asyncMode (FIFO) compensation-thread regression tracked in + * JDK-8300995 / JDK-8321335; on older JDKs the value stays at `1` to preserve + * legacy behaviour. Non-negative configured values are honoured verbatim, so + * `0` still disables compensation entirely. + */ + @InternalApi private[pekko] def resolveMinimumRunnable( + configured: Int, + parallelism: Int, + jdkMajorVersion: Int): Int = + if (configured >= 0) configured + else if (jdkMajorVersion >= 21) math.min(8, math.max(1, parallelism / 2)) + else 1 + /** * INTERNAL PEKKO USAGE ONLY */ @@ -140,16 +163,21 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") } + val parallelism = ThreadPoolConfig.scaledPoolSize( + config.getInt("parallelism-min"), + config.getDouble("parallelism-factor"), + config.getInt("parallelism-max")) + new ForkJoinExecutorServiceFactory( id, validate(tf), - ThreadPoolConfig.scaledPoolSize( - config.getInt("parallelism-min"), - config.getDouble("parallelism-factor"), - config.getInt("parallelism-max")), + parallelism, asyncMode, config.getInt("maximum-pool-size"), - config.getInt("minimum-runnable") + ForkJoinExecutorConfigurator.resolveMinimumRunnable( + config.getInt("minimum-runnable"), + parallelism, + JavaVersion.majorVersion) ) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
