This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ab254acfb feat: make ForkJoinPool minimum-runnable configurable and 
improve pool documentation (#2871)
1ab254acfb is described below

commit 1ab254acfba9603214b073a51e6fa11cfb0fb30a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 19 04:33:50 2026 +0800

    feat: make ForkJoinPool minimum-runnable configurable and improve pool 
documentation (#2871)
    
    * feat: make ForkJoinPool minimum-runnable configurable to address JDK 21+ 
starvation
    
    Motivation:
    PekkoForkJoinPool hard-coded minimumRunnable=1 in the JDK 9+ ForkJoinPool
    constructor. This is the threshold below which the pool creates compensation
    threads to maintain progress. On JDK 21+ (see JDK-8300995 / JDK-8321335),
    the compensation-thread mechanism for asyncMode=FIFO pools became more
    conservative, making it harder to recover from actor-heavy workloads where
    the single "runnable" threshold is insufficient under load.
    
    Modification:
    - Added minimum-runnable = 1 to fork-join-executor in reference.conf 
(default
      preserves existing behaviour; documented relationship to JDK-8300995).
    - PekkoForkJoinPool now accepts minimumRunnable as a constructor parameter
      (default 1 for binary compatibility).
    - ForkJoinExecutorServiceFactory reads minimum-runnable from config and
      forwards it to PekkoForkJoinPool.
    
    Result:
    Users experiencing dispatcher starvation on JDK 21+ can now increase
    minimum-runnable (e.g. to parallelism/2) to prompt earlier compensation-
    thread creation without needing to patch Pekko itself. All existing
    actor-tests/dispatch tests pass unchanged (71 passed, 1 pending).
    
    Fixes: https://github.com/apache/pekko/issues/2870
    
    Co-authored-by: Copilot <[email protected]>
    
    * docs: improve maximum-pool-size comment explaining JDK 21+ compensation 
thread behavior
    
    Motivation:
    The previous comment was minimal and did not explain how maximum-pool-size
    interacts with ForkJoinPool compensation threads on JDK 21+.
    
    Modification:
    Expand the maximum-pool-size comment to describe that on JDK 21+ the pool
    uses this as a cap for compensation threads. Tuning it closer to the
    target parallelism can reduce latency under actor-heavy workloads
    (see JDK-8300995 tracked in #2870).
    
    Result:
    Users who see JDK 21+ ForkJoinPool scheduling jitter now have actionable
    guidance directly in the default configuration.
    
    References: #2870
---
 actor/src/main/resources/reference.conf                | 18 +++++++++++++++++-
 .../pekko/dispatch/ForkJoinExecutorConfigurator.scala  | 14 +++++++++-----
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/actor/src/main/resources/reference.conf 
b/actor/src/main/resources/reference.conf
index 5bb1c64888..b2afe5f374 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -478,9 +478,25 @@ pekko {
         task-peeking-mode = "FIFO"
 
         # This config is new in Pekko v1.1.0 and only has an effect if you are 
running with JDK 9 and above.
-        # Read the documentation on `java.util.concurrent.ForkJoinPool` to 
find out more. Default in hex is 0x7fff.
+        # Read the documentation on `java.util.concurrent.ForkJoinPool` for 
details. Default in hex is 0x7fff.
+        #
+        # On JDK 21+ the pool creates compensation threads to maintain the 
target parallelism when workers
+        # block (e.g. awaiting actor replies). Raising this value from the 
OS-capped default (32767 - parallelism)
+        # to a few multiples of parallelism can pre-allocate those threads 
faster and reduce latency spikes under
+        # heavy actor-round-trip workloads (see also minimum-runnable below 
and JDK-8300995).
         maximum-pool-size = 32767
 
+        # Minimum number of non-blocked (runnable) worker threads the pool 
tries to maintain.
+        # When blocked worker count causes active threads to drop below this 
threshold, the pool
+        # may create a compensation thread to maintain progress.
+        #
+        # The default value of 1 matches the JDK ForkJoinPool behaviour prior 
to this setting being
+        # exposed. Higher values (e.g. parallelism/2) can reduce starvation 
risk for actor-heavy
+        # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO) 
compensation-thread creation
+        # became more conservative (see JDK-8300995 / JDK-8321335).
+        # Set to 0 to disable compensation entirely.
+        minimum-runnable = 1
+
         # This config is new in Pekko v1.2.0 and only has an effect if you are 
running with JDK 21 and above,
         # When set to `on` but the underlying runtime does not support virtual 
threads, an Exception will be thrown.
         # Virtualize this dispatcher as a virtual-thread-executor
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index e5d7e7f8ce..9dc2c95318 100644
--- 
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -27,9 +27,10 @@ object ForkJoinExecutorConfigurator {
       threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
       maximumPoolSize: Int,
       unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
-      asyncMode: Boolean)
+      asyncMode: Boolean,
+      minimumRunnable: Int = 1)
       extends ForkJoinPool(parallelism, threadFactory, 
unhandledExceptionHandler, asyncMode,
-        0, maximumPoolSize, 1, null, 
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
+        0, maximumPoolSize, minimumRunnable, null, 
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
       with LoadMetrics {
 
     override def execute(r: Runnable): Unit =
@@ -85,7 +86,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
       val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
       val parallelism: Int,
       val asyncMode: Boolean,
-      val maxPoolSize: Int)
+      val maxPoolSize: Int,
+      val minimumRunnable: Int = 1)
       extends ExecutorServiceFactory {
     def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
         parallelism: Int,
@@ -109,7 +111,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
         }
       } else threadFactory
 
-      val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, 
MonitorableThreadFactory.doNothing, asyncMode)
+      val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, 
MonitorableThreadFactory.doNothing, asyncMode,
+        minimumRunnable)
 
       if (isVirtualized) {
         // we need to cast here,
@@ -145,7 +148,8 @@ class ForkJoinExecutorConfigurator(config: Config, 
prerequisites: DispatcherPrer
         config.getDouble("parallelism-factor"),
         config.getInt("parallelism-max")),
       asyncMode,
-      config.getInt("maximum-pool-size")
+      config.getInt("maximum-pool-size"),
+      config.getInt("minimum-runnable")
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to