This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix-jdk21-fjp-configuration
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 4aa5e9b83e673116c429ec4b3abd5448211d06d8
Author: He-Pin <[email protected]>
AuthorDate: Sun Apr 19 02:26:35 2026 +0800

    feat: make ForkJoinPool minimum-runnable configurable to address JDK 21+ 
starvation
    
    Motivation:
    PekkoForkJoinPool hard-coded minimumRunnable=1 in the JDK 9+ ForkJoinPool
    constructor. This is the threshold below which the pool creates compensation
    threads to maintain progress. On JDK 21+ (see JDK-8300995 / JDK-8321335),
    the compensation-thread mechanism for asyncMode=FIFO pools became more
    conservative, making it harder to recover from actor-heavy workloads where
    the single "runnable" threshold is insufficient under load.
    
    Modification:
    - Added minimum-runnable = 1 to fork-join-executor in reference.conf 
(default
      preserves existing behaviour; documented relationship to JDK-8300995).
    - PekkoForkJoinPool now accepts minimumRunnable as a constructor parameter
      (default 1 for binary compatibility).
    - ForkJoinExecutorServiceFactory reads minimum-runnable from config and
      forwards it to PekkoForkJoinPool.
    
    Result:
    Users experiencing dispatcher starvation on JDK 21+ can now increase
    minimum-runnable (e.g. to parallelism/2) to prompt earlier compensation-
    thread creation without needing to patch Pekko itself. All existing
    actor-tests/dispatch tests pass unchanged (71 passed, 1 pending).
    
    Fixes: https://github.com/apache/pekko/issues/2870
    
    Co-authored-by: Copilot <[email protected]>
---
 actor/src/main/resources/reference.conf                    | 11 +++++++++++
 .../pekko/dispatch/ForkJoinExecutorConfigurator.scala      | 14 +++++++++-----
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/actor/src/main/resources/reference.conf 
b/actor/src/main/resources/reference.conf
index 5bb1c64888..81e10e9801 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -481,6 +481,17 @@ pekko {
         # Read the documentation on `java.util.concurrent.ForkJoinPool` to 
find out more. Default in hex is 0x7fff.
         maximum-pool-size = 32767
 
+        # Minimum number of non-blocked (runnable) worker threads the pool 
tries to maintain.
+        # When blocked worker count causes active threads to drop below this 
threshold, the pool
+        # may create a compensation thread to maintain progress.
+        #
+        # The default value of 1 matches the JDK ForkJoinPool behaviour prior 
to this setting being
+        # exposed. Higher values (e.g. parallelism/2) can reduce starvation 
risk for actor-heavy
+        # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) 
compensation-thread creation
+        # became more conservative (see JDK-8300995 / JDK-8321335).
+        # Set to 0 to disable compensation entirely.
+        minimum-runnable = 1
+
         # This config is new in Pekko v1.2.0 and only has an effect if you are 
running with JDK 21 and above,
         # When set to `on` but the underlying runtime does not support virtual 
threads, an Exception will be thrown.
         # Virtualize this dispatcher as a virtual-thread-executor
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index e5d7e7f8ce..9dc2c95318 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -27,9 +27,10 @@ object ForkJoinExecutorConfigurator {
       threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
       maximumPoolSize: Int,
       unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
-      asyncMode: Boolean)
+      asyncMode: Boolean,
+      minimumRunnable: Int = 1)
       extends ForkJoinPool(parallelism, threadFactory, 
unhandledExceptionHandler, asyncMode,
-        0, maximumPoolSize, 1, null, 
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
+        0, maximumPoolSize, minimumRunnable, null, 
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
       with LoadMetrics {
 
     override def execute(r: Runnable): Unit =
@@ -85,7 +86,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
       val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
       val parallelism: Int,
       val asyncMode: Boolean,
-      val maxPoolSize: Int)
+      val maxPoolSize: Int,
+      val minimumRunnable: Int = 1)
       extends ExecutorServiceFactory {
     def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
         parallelism: Int,
@@ -109,7 +111,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
         }
       } else threadFactory
 
-      val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, 
MonitorableThreadFactory.doNothing, asyncMode)
+      val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, 
MonitorableThreadFactory.doNothing, asyncMode,
+        minimumRunnable)
 
       if (isVirtualized) {
         // we need to cast here,
@@ -145,7 +148,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
         config.getDouble("parallelism-factor"),
         config.getInt("parallelism-max")),
       asyncMode,
-      config.getInt("maximum-pool-size")
+      config.getInt("maximum-pool-size"),
+      config.getInt("minimum-runnable")
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to