This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix-jdk21-fjp-configuration in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 4aa5e9b83e673116c429ec4b3abd5448211d06d8 Author: He-Pin <[email protected]> AuthorDate: Sun Apr 19 02:26:35 2026 +0800 feat: make ForkJoinPool minimum-runnable configurable to address JDK 21+ starvation Motivation: PekkoForkJoinPool hard-coded minimumRunnable=1 in the JDK 9+ ForkJoinPool constructor. This is the threshold below which the pool creates compensation threads to maintain progress. On JDK 21+ (see JDK-8300995 / JDK-8321335), the compensation-thread mechanism for asyncMode=FIFO pools became more conservative, making it harder to recover from actor-heavy workloads where the single "runnable" threshold is insufficient under load. Modification: - Added minimum-runnable = 1 to fork-join-executor in reference.conf (default preserves existing behaviour; documented relationship to JDK-8300995). - PekkoForkJoinPool now accepts minimumRunnable as a constructor parameter (default 1 for binary compatibility). - ForkJoinExecutorServiceFactory reads minimum-runnable from config and forwards it to PekkoForkJoinPool. Result: Users experiencing dispatcher starvation on JDK 21+ can now increase minimum-runnable (e.g. to parallelism/2) to prompt earlier compensation- thread creation without needing to patch Pekko itself. All existing actor-tests/dispatch tests pass unchanged (71 passed, 1 pending). Fixes: https://github.com/apache/pekko/issues/2870 Co-authored-by: Copilot <[email protected]> --- actor/src/main/resources/reference.conf | 11 +++++++++++ .../pekko/dispatch/ForkJoinExecutorConfigurator.scala | 14 +++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 5bb1c64888..81e10e9801 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -481,6 +481,17 @@ pekko { # Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff. maximum-pool-size = 32767 + # Minimum number of non-blocked (runnable) worker threads the pool tries to maintain. + # When blocked worker count causes active threads to drop below this threshold, the pool + # may create a compensation thread to maintain progress. + # + # The default value of 1 matches the JDK ForkJoinPool behaviour prior to this setting being + # exposed. Higher values (e.g. parallelism/2) can reduce starvation risk for actor-heavy + # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) compensation-thread creation + # became more conservative (see JDK-8300995 / JDK-8321335). + # Set to 0 to disable compensation entirely. + minimum-runnable = 1 + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, # When set to `on` but the underlying runtime does not support virtual threads, an Exception will be thrown. # Virtualize this dispatcher as a virtual-thread-executor diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index e5d7e7f8ce..9dc2c95318 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -27,9 +27,10 @@ object ForkJoinExecutorConfigurator { threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, maximumPoolSize: Int, unhandledExceptionHandler: Thread.UncaughtExceptionHandler, - asyncMode: Boolean) + asyncMode: Boolean, + minimumRunnable: Int = 1) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode, - 0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS) + 0, maximumPoolSize, minimumRunnable, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS) with LoadMetrics { override def execute(r: Runnable): Unit = @@ -85,7 +86,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int, val asyncMode: Boolean, - val maxPoolSize: Int) + val maxPoolSize: Int, + val minimumRunnable: Int = 1) extends ExecutorServiceFactory { def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int, @@ -109,7 +111,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } } else threadFactory - val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) + val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode, + minimumRunnable) if (isVirtualized) { // we need to cast here, @@ -145,7 +148,8 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer config.getDouble("parallelism-factor"), config.getInt("parallelism-max")), asyncMode, - config.getInt("maximum-pool-size") + config.getInt("maximum-pool-size"), + config.getInt("minimum-runnable") ) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
