This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new e69d8b39dd Recommend property change when Fate threads are mostly idle
(#5129)
e69d8b39dd is described below
commit e69d8b39dd4ecd8bf355e315e954d8db9059b54d
Author: Dave Marion <[email protected]>
AuthorDate: Fri Dec 6 16:01:31 2024 -0500
Recommend property change when Fate threads are mostly idle (#5129)
Added new property that controls the checking of the number
of idle Fate threads in the background thread that resizes the
Fate thread pool. This new logic will print a warning in the log
that the Fate thread pool size property should be increased if
the number of idle Fate threads is zero 95% of the time.
Co-authored-by: Keith Turner <[email protected]>
---
.../org/apache/accumulo/core/conf/Property.java | 6 ++++
.../java/org/apache/accumulo/core/fate/Fate.java | 41 +++++++++++++++++++++-
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 98412a97d7..b9b7f72181 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -425,6 +425,12 @@ public enum Property {
"The number of threads used to run fault-tolerant executions (FATE)."
+ " These are primarily table operations like merge.",
"1.4.3"),
+ MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
+ PropertyType.TIMEDURATION,
+ "The interval at which to check if the number of idle Fate threads has
consistently been zero."
+ + " The way this is checked is an approximation. Logs a warning in
the Manager log to increase"
+ + " MANAGER_FATE_THREADPOOL_SIZE. A value of zero disables this
check and has a maximum value of 60m.",
+ "4.0.0"),
MANAGER_STATUS_THREAD_POOL_SIZE("manager.status.threadpool.size", "0",
PropertyType.COUNT,
"The number of threads to use when fetching the tablet server status for
balancing. Zero "
+ "indicates an unlimited number of threads will be used.",
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 1350cce652..f46cc1aa43 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -34,6 +34,7 @@ import static
org.apache.accumulo.core.util.ShutdownUtil.isIOException;
import java.time.Duration;
import java.util.EnumSet;
import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -83,6 +84,7 @@ public class Fate<T> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final TransferQueue<FateId> workQueue;
private final Thread workFinder;
+ private final ConcurrentLinkedQueue<Integer> idleCountHistory = new
ConcurrentLinkedQueue<>();
public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -355,7 +357,8 @@ public class Fate<T> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for
each thread
- int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) -
pool.getQueue().size();
+ final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
+ final int needed = configured - pool.getQueue().size();
if (needed > 0) {
for (int i = 0; i < needed; i++) {
try {
@@ -372,6 +375,41 @@ public class Fate<T> {
break;
}
}
+ idleCountHistory.clear();
+ } else {
+ // The property did not change, but should it based on idle Fate
threads? Maintain
+ // count of the last X minutes of idle Fate threads. If zero 95% of
the time, then suggest
+ // that the
+ // MANAGER_FATE_THREADPOOL_SIZE be increased.
+ final long interval = Math.min(60, TimeUnit.MILLISECONDS
+
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
+ if (interval == 0) {
+ idleCountHistory.clear();
+ } else {
+ if (idleCountHistory.size() >= interval * 2) { // this task runs
every 30s
+ int zeroFateThreadsIdleCount = 0;
+ for (Integer idleConsumerCount : idleCountHistory) {
+ if (idleConsumerCount == 0) {
+ zeroFateThreadsIdleCount++;
+ }
+ }
+ boolean needMoreThreads =
+ (zeroFateThreadsIdleCount / (double) idleCountHistory.size())
>= 0.95;
+ if (needMoreThreads) {
+ log.warn(
+ "All Fate threads appear to be busy for the last {} minutes,"
+ + " consider increasing property: {}",
+ interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
+ // Clear the history so that we don't log for interval minutes.
+ idleCountHistory.clear();
+ } else {
+ while (idleCountHistory.size() >= interval * 2) {
+ idleCountHistory.remove();
+ }
+ }
+ }
+ idleCountHistory.add(workQueue.getWaitingConsumerCount());
+ }
}
}, 3, 30, SECONDS));
this.transactionExecutor = pool;
@@ -611,5 +649,6 @@ public class Fate<T> {
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdownNow();
}
+ idleCountHistory.clear();
}
}