This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 1ab254acfb feat: make ForkJoinPool minimum-runnable configurable and
improve pool documentation (#2871)
1ab254acfb is described below
commit 1ab254acfba9603214b073a51e6fa11cfb0fb30a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Apr 19 04:33:50 2026 +0800
feat: make ForkJoinPool minimum-runnable configurable and improve pool
documentation (#2871)
* feat: make ForkJoinPool minimum-runnable configurable to address JDK 21+
starvation
Motivation:
PekkoForkJoinPool hard-coded minimumRunnable=1 in the JDK 9+ ForkJoinPool
constructor. This is the threshold below which the pool creates compensation
threads to maintain progress. On JDK 21+ (see JDK-8300995 / JDK-8321335),
the compensation-thread mechanism for asyncMode=FIFO pools became more
conservative, making it harder to recover from actor-heavy workloads where
the single "runnable" threshold is insufficient under load.
Modification:
- Added minimum-runnable = 1 to fork-join-executor in reference.conf
(default
preserves existing behaviour; documented relationship to JDK-8300995).
- PekkoForkJoinPool now accepts minimumRunnable as a constructor parameter
(default 1 for binary compatibility).
- ForkJoinExecutorServiceFactory reads minimum-runnable from config and
forwards it to PekkoForkJoinPool.
Result:
Users experiencing dispatcher starvation on JDK 21+ can now increase
minimum-runnable (e.g. to parallelism/2) to prompt earlier compensation-
thread creation without needing to patch Pekko itself. All existing
actor-tests/dispatch tests pass unchanged (71 passed, 1 pending).
Fixes: https://github.com/apache/pekko/issues/2870
Co-authored-by: Copilot <[email protected]>
* docs: improve maximum-pool-size comment explaining JDK 21+ compensation
thread behavior
Motivation:
The previous comment was minimal and did not explain how maximum-pool-size
interacts with ForkJoinPool compensation threads on JDK 21+.
Modification:
Expand the maximum-pool-size comment to describe that on JDK 21+ the pool
uses this as a cap for compensation threads. Tuning it closer to the
target parallelism can reduce latency under actor-heavy workloads
(see JDK-8300995 tracked in #2870).
Result:
Users who see JDK 21+ ForkJoinPool scheduling jitter now have actionable
guidance directly in the default configuration.
References: #2870
---
actor/src/main/resources/reference.conf | 18 +++++++++++++++++-
.../pekko/dispatch/ForkJoinExecutorConfigurator.scala | 14 +++++++++-----
2 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index 5bb1c64888..b2afe5f374 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -478,9 +478,25 @@ pekko {
task-peeking-mode = "FIFO"
# This config is new in Pekko v1.1.0 and only has an effect if you are
running with JDK 9 and above.
- # Read the documentation on `java.util.concurrent.ForkJoinPool` to
find out more. Default in hex is 0x7fff.
+ # Read the documentation on `java.util.concurrent.ForkJoinPool` for
details. Default in hex is 0x7fff.
+ #
+ # On JDK 21+ the pool creates compensation threads to maintain the
target parallelism when workers
+ # block (e.g. awaiting actor replies). Raising this value from the
OS-capped default (32767 - parallelism)
+ # to a few multiples of parallelism can pre-allocate those threads
faster and reduce latency spikes under
+ # heavy actor-round-trip workloads (see also minimum-runnable below
and JDK-8300995).
maximum-pool-size = 32767
+ # Minimum number of non-blocked (runnable) worker threads the pool
tries to maintain.
+ # When blocked worker count causes active threads to drop below this
threshold, the pool
+ # may create a compensation thread to maintain progress.
+ #
+ # The default value of 1 matches the JDK ForkJoinPool behaviour prior
to this setting being
+ # exposed. Higher values (e.g. parallelism/2) can reduce starvation
risk for actor-heavy
+ # workloads on JDK 21+ where ForkJoinPool asyncMode (FIFO)
compensation-thread creation
+ # became more conservative (see JDK-8300995 / JDK-8321335).
+ # Set to 0 to disable compensation entirely.
+ minimum-runnable = 1
+
# This config is new in Pekko v1.2.0 and only has an effect if you are
running with JDK 21 and above,
# When set to `on` but the underlying runtime does not support virtual
threads, an Exception will be thrown.
# Virtualize this dispatcher as a virtual-thread-executor
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
index e5d7e7f8ce..9dc2c95318 100644
---
a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
+++
b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala
@@ -27,9 +27,10 @@ object ForkJoinExecutorConfigurator {
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
maximumPoolSize: Int,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
- asyncMode: Boolean)
+ asyncMode: Boolean,
+ minimumRunnable: Int = 1)
extends ForkJoinPool(parallelism, threadFactory,
unhandledExceptionHandler, asyncMode,
- 0, maximumPoolSize, 1, null,
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
+ 0, maximumPoolSize, minimumRunnable, null,
ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
with LoadMetrics {
override def execute(r: Runnable): Unit =
@@ -85,7 +86,8 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
- val maxPoolSize: Int)
+ val maxPoolSize: Int,
+ val minimumRunnable: Int = 1)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
@@ -109,7 +111,8 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
}
} else threadFactory
- val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode)
+ val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode,
+ minimumRunnable)
if (isVirtualized) {
// we need to cast here,
@@ -145,7 +148,8 @@ class ForkJoinExecutorConfigurator(config: Config,
prerequisites: DispatcherPrer
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
- config.getInt("maximum-pool-size")
+ config.getInt("maximum-pool-size"),
+ config.getInt("minimum-runnable")
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]