This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 11d972cd8b Improves FATE metrics (#5798)
11d972cd8b is described below

commit 11d972cd8b9140aafd424f969050142dc68f669f
Author: Kevin Rathbun <[email protected]>
AuthorDate: Thu Aug 28 09:54:40 2025 -0400

    Improves FATE metrics (#5798)
    
    * Improves FATE metrics
    
    - Removes FATE metrics which are no longer useful (due to 4.0 FATE changes) 
and replaces them with more appropriate metrics.
            - These new metrics are: a Gauge to track the number of total 
threads per FateExecutor and a Gauge to track the total number of idle threads 
(not currently working on a fate operation) per FateExecutor.
                    - Background info/existing functionality:
                            - Fate's set of fate executors may change as Fate 
config is changed. Each FateExecutor has a pool of threads which work on some 
subset of fate operations based on the fate configuration
                            properties. If these properties are changed such 
that a FateExecutor is no longer applicable, the FateExecutor is stopped and 
new FateExecutor(s) are started to accurately represent
                            config changes.
                    - What this means for these metric changes:
                            - If a FateExecutor is stopped, it's metrics will 
be removed from the registry. This avoids the registry getting flooded with 
metrics that are no longer applicable/no longer give
                            relevant info on the Fate system.
    - Adds testing for these new metrics in MetricsIT and adds two new classes 
to aid in testing these metrics: SlowFateSplitManager and SlowFateSplit.
    - Some minor misc. changes:
            - Improved description for the deprecated 
MANAGER_FATE_THREADPOOL_SIZE property
            - Improved the set of non thrift ops in the Fate class to gather 
the ops at run time
            - Improved the FATE_TX metric description
    
    closes #5147
---
 .../org/apache/accumulo/core/conf/Property.java    |  16 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  38 +++-
 .../apache/accumulo/core/fate/FateExecutor.java    |  38 +++-
 .../accumulo/core/fate/FateExecutorMetrics.java    | 121 ++++++++++
 .../org/apache/accumulo/core/metrics/Metric.java   |  17 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  14 +-
 .../accumulo/manager/metrics/ManagerMetrics.java   |  16 +-
 .../accumulo/manager/metrics/fate/FateMetrics.java |  28 ++-
 .../manager/metrics/fate/meta/MetaFateMetrics.java |   8 +-
 .../manager/metrics/fate/user/UserFateMetrics.java |   9 +-
 .../org/apache/accumulo/test/fate/FlakyFate.java   |   2 +-
 .../apache/accumulo/test/fate/SlowFateSplit.java   |  84 +++++++
 .../accumulo/test/fate/SlowFateSplitManager.java   |  57 +++++
 .../apache/accumulo/test/metrics/MetricsIT.java    | 250 ++++++++++++++++++++-
 14 files changed, 645 insertions(+), 53 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b63b6a0d9f..1b21986f5e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -458,13 +458,6 @@ public enum Property {
   
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
 "60s",
       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval.",
       "1.9.3"),
-  @Deprecated(since = "4.0.0")
-  MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
-      PropertyType.FATE_THREADPOOL_SIZE,
-      "Previously, the number of threads used to run fault-tolerant executions 
(FATE)."
-          + " This is no longer used in 4.0+. MANAGER_FATE_USER_CONFIG and"
-          + " MANAGER_FATE_META_CONFIG are the replacement and must be set 
instead.",
-      "1.4.3"),
   MANAGER_FATE_USER_CONFIG("manager.fate.user.config",
       
"{\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
           + 
"NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER,"
@@ -491,6 +484,15 @@ public enum Property {
           + "more FATE operations and each value is the number of threads that 
will be assigned "
           + "to the pool.",
       "4.0.0"),
+  @Deprecated(since = "4.0.0")
+  MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
+      PropertyType.FATE_THREADPOOL_SIZE,
+      String.format(
+          "Previously, the number of threads used to run fault-tolerant 
executions (FATE)."
+              + " This is no longer used in 4.0+. %s and %s are the 
replacement and must be"
+              + " set instead.",
+          MANAGER_FATE_USER_CONFIG.getKey(), 
MANAGER_FATE_META_CONFIG.getKey()),
+      "1.4.3"),
   MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
       PropertyType.TIMEDURATION,
       "The interval at which to check if the number of idle Fate threads has 
consistently been zero."
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 42b30d88b7..87c237f2ad 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -80,7 +80,7 @@ public class Fate<T> {
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
   public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
   private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3);
-  private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
+  public static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
 
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
   // Visible for FlakyFate test object
@@ -115,8 +115,8 @@ public class Fate<T> {
     TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);
 
     private final TFateOperation top;
-    private static final Set<FateOperation> nonThriftOps = 
Collections.unmodifiableSet(
-        EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, 
SYSTEM_MERGE));
+    private static final Set<FateOperation> nonThriftOps = 
Arrays.stream(FateOperation.values())
+        .filter(fateOp -> fateOp.top == 
null).collect(Collectors.toUnmodifiableSet());
     private static final Set<FateOperation> allUserFateOps =
         Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class));
     private static final Set<FateOperation> allMetaFateOps =
@@ -171,7 +171,7 @@ public class Fate<T> {
     public void run() {
       // Read from the config here and here only. Must avoid reading the same 
property from the
       // config more than once since it can change at any point in this 
execution
-      final var poolConfigs = getPoolConfigurations(conf);
+      final var poolConfigs = getPoolConfigurations(conf, store.type());
       final var idleCheckIntervalMillis =
           conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
 
@@ -188,7 +188,7 @@ public class Fate<T> {
               log.debug(
                   "[{}] The config for {} has changed invalidating {}. 
Gracefully shutting down "
                       + "the FateExecutor.",
-                  store.type(), getFateConfigProp(), fateExecutor);
+                  store.type(), getFateConfigProp(store.type()), fateExecutor);
               fateExecutor.initiateShutdown();
             } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
               log.debug("[{}] {} has been shutdown, but is still actively 
working on transactions.",
@@ -272,7 +272,6 @@ public class Fate<T> {
       ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
     }
     this.deadResCleanerExecutor = deadResCleanerExecutor;
-
   }
 
   /**
@@ -280,9 +279,11 @@ public class Fate<T> {
    * of fate operations and each value is an integer for the number of threads 
assigned to work
    * those fate operations.
    */
-  protected Map<Set<FateOperation>,Integer> 
getPoolConfigurations(AccumuloConfiguration conf) {
+  @VisibleForTesting
+  public static Map<Set<FateOperation>,Integer> 
getPoolConfigurations(AccumuloConfiguration conf,
+      FateInstanceType type) {
     Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
-    final var json = 
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+    final var json = 
JsonParser.parseString(conf.get(getFateConfigProp(type))).getAsJsonObject();
 
     for (var entry : json.entrySet()) {
       var key = entry.getKey();
@@ -305,19 +306,31 @@ public class Fate<T> {
     return store;
   }
 
-  protected Property getFateConfigProp() {
-    return this.store.type() == FateInstanceType.USER ? 
Property.MANAGER_FATE_USER_CONFIG
+  protected static Property getFateConfigProp(FateInstanceType type) {
+    return type == FateInstanceType.USER ? Property.MANAGER_FATE_USER_CONFIG
         : Property.MANAGER_FATE_META_CONFIG;
   }
 
+  /**
+   * Exists for overrides in test code. Internal access to this field needs to 
be through this
+   * getter
+   */
   public Duration getDeadResCleanupDelay() {
     return DEAD_RES_CLEANUP_DELAY;
   }
 
+  /**
+   * Exists for overrides in test code. Internal access to this field needs to 
be through this
+   * getter
+   */
   public Duration getPoolWatcherDelay() {
     return POOL_WATCHER_DELAY;
   }
 
+  public Set<FateExecutor<T>> getFateExecutors() {
+    return fateExecutors;
+  }
+
   /**
    * Returns the number of TransactionRunners active for the FateExecutor 
assigned to work on the
    * given set of fate operations. "Active" meaning it is waiting for a 
transaction to work on or
@@ -523,9 +536,12 @@ public class Fate<T> {
 
     // interrupt the background threads
     synchronized (fateExecutors) {
-      for (var fateExecutor : fateExecutors) {
+      var fateExecutorsIter = fateExecutors.iterator();
+      while (fateExecutorsIter.hasNext()) {
+        var fateExecutor = fateExecutorsIter.next();
         fateExecutor.shutdownNow();
         fateExecutor.getIdleCountHistory().clear();
+        fateExecutorsIter.remove();
       }
     }
     if (deadResCleanerExecutor != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 234ce34432..9710981b09 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
@@ -61,7 +62,9 @@ import com.google.common.base.Preconditions;
 
 /**
  * Handles finding and working on FATE work. Only finds/works on fate 
operations that it is assigned
- * to work on defined by 'fateOps'
+ * to work on defined by 'fateOps'. These executors may be stopped and new 
ones started throughout
+ * FATEs life, depending on changes to {@link 
Property#MANAGER_FATE_USER_CONFIG} and
+ * {@link Property#MANAGER_FATE_META_CONFIG}.
  */
 public class FateExecutor<T> {
   private static final Logger log = 
LoggerFactory.getLogger(FateExecutor.class);
@@ -71,19 +74,22 @@ public class FateExecutor<T> {
   private final Fate<T> fate;
   private final Thread workFinder;
   private final TransferQueue<FateId> workQueue;
-  private final AtomicInteger idleWorkerCount = new AtomicInteger(0);
+  private final AtomicInteger idleWorkerCount;
   private final String poolName;
   private final ThreadPoolExecutor transactionExecutor;
   private final Set<TransactionRunner> runningTxRunners;
   private final Set<Fate.FateOperation> fateOps;
   private final ConcurrentLinkedQueue<Integer> idleCountHistory = new 
ConcurrentLinkedQueue<>();
+  private final FateExecutorMetrics<T> fateExecutorMetrics;
 
   public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> 
fateOps, int poolSize) {
-    final String operatesOn = fate.getStore().type().name().toLowerCase() + "."
-        + fateOps.stream().map(fo -> 
fo.name().toLowerCase()).collect(Collectors.joining("."));
+    final FateInstanceType type = fate.getStore().type();
+    final String typeStr = type.name().toLowerCase();
+    final String operatesOn = fateOps.stream().map(fo -> 
fo.name().toLowerCase()).sorted()
+        .collect(Collectors.joining("."));
     final String transactionRunnerPoolName =
-        ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + operatesOn;
-    final String workFinderThreadName = "fate.work.finder." + operatesOn;
+        ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." + 
operatesOn;
+    final String workFinderThreadName = "fate.work.finder." + typeStr + "." + 
operatesOn;
 
     this.fate = fate;
     this.environment = environment;
@@ -91,9 +97,11 @@ public class FateExecutor<T> {
     this.workQueue = new LinkedTransferQueue<>();
     this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
     this.poolName = transactionRunnerPoolName;
-    this.transactionExecutor =
-        
ThreadPools.getServerThreadPools().getPoolBuilder(transactionRunnerPoolName)
-            .numCoreThreads(poolSize).enableThreadPoolMetrics().build();
+    this.transactionExecutor = ThreadPools.getServerThreadPools()
+        
.getPoolBuilder(transactionRunnerPoolName).numCoreThreads(poolSize).build();
+    this.idleWorkerCount = new AtomicInteger(0);
+    this.fateExecutorMetrics =
+        new FateExecutorMetrics<>(type, operatesOn, runningTxRunners, 
idleWorkerCount);
 
     this.workFinder = Threads.createCriticalThread(workFinderThreadName, new 
WorkFinder());
     this.workFinder.start();
@@ -159,7 +167,7 @@ public class FateExecutor<T> {
         // split into separate pools.
         final long interval =
             Math.min(60, 
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
-        var fateConfigProp = fate.getFateConfigProp();
+        var fateConfigProp = Fate.getFateConfigProp(fate.getStore().type());
 
         if (interval == 0) {
           idleCountHistory.clear();
@@ -211,11 +219,16 @@ public class FateExecutor<T> {
     return fateOps;
   }
 
+  public FateExecutorMetrics<T> getFateExecutorMetrics() {
+    return fateExecutorMetrics;
+  }
+
   /**
    * Initiates the shutdown of this FateExecutor. This means the pool 
executing TransactionRunners
    * will no longer accept new TransactionRunners, the currently running 
TransactionRunners will
-   * terminate after they are done with their current transaction, if 
applicable, and the work
-   * finder is shutdown. {@link #isShutdown()} returns true after this is 
called.
+   * terminate after they are done with their current transaction, if 
applicable, the work finder is
+   * shutdown, and the metrics created for this FateExecutor are removed from 
the registry (if
+   * metrics were enabled). {@link #isShutdown()} returns true after this is 
called.
    */
   protected void initiateShutdown() {
     log.debug("Initiated shutdown {}", fateOps);
@@ -223,6 +236,7 @@ public class FateExecutor<T> {
     synchronized (runningTxRunners) {
       runningTxRunners.forEach(TransactionRunner::flagStop);
     }
+    fateExecutorMetrics.clearMetrics();
     // work finder will terminate since this.isShutdown() is true
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
new file mode 100644
index 0000000000..fec087519f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.metrics.Metric;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class FateExecutorMetrics<T> implements MetricsProducer {
+  private static final Logger log = 
LoggerFactory.getLogger(FateExecutorMetrics.class);
+  private final FateInstanceType type;
+  private final String operatesOn;
+  private final Set<FateExecutor<T>.TransactionRunner> runningTxRunners;
+  private final AtomicInteger idleWorkerCount;
+  private MeterRegistry registry;
+  private State state;
+  public static final String INSTANCE_TYPE_TAG_KEY = "instanceType";
+  public static final String OPS_ASSIGNED_TAG_KEY = "ops.assigned";
+
+  protected FateExecutorMetrics(FateInstanceType type, String operatesOn,
+      Set<FateExecutor<T>.TransactionRunner> runningTxRunners, AtomicInteger 
idleWorkerCount) {
+    this.type = type;
+    this.operatesOn = operatesOn;
+    this.runningTxRunners = runningTxRunners;
+    this.state = State.UNREGISTERED;
+    this.idleWorkerCount = idleWorkerCount;
+  }
+
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    // noop if already registered or cleared
+    if (state == State.UNREGISTERED) {
+      Gauge.builder(Metric.FATE_OPS_THREADS_TOTAL.getName(), 
runningTxRunners::size)
+          .description(Metric.FATE_OPS_THREADS_TOTAL.getDescription())
+          .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
+          .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+      Gauge.builder(Metric.FATE_OPS_THREADS_INACTIVE.getName(), 
idleWorkerCount::get)
+          .description(Metric.FATE_OPS_THREADS_INACTIVE.getDescription())
+          .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
+          .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+
+      registered(registry);
+    }
+  }
+
+  public void clearMetrics() {
+    // noop if metrics were never registered or have already been cleared
+    if (state == State.REGISTERED) {
+      var threadsTotalMeter = 
registry.find(Metric.FATE_OPS_THREADS_TOTAL.getName())
+          .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(), 
OPS_ASSIGNED_TAG_KEY, operatesOn)
+          .meter();
+      // meter will be null if it could not be found, ignore IDE warning if 
one is seen
+      if (threadsTotalMeter == null) {
+        log.error(
+            "Tried removing meter{name: {} tags: {}={}, {}={}} from the 
registry, but did "
+                + "not find it.",
+            Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
+            type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+      } else {
+        registry.remove(threadsTotalMeter);
+      }
+
+      var threadsInactiveMeter = 
registry.find(Metric.FATE_OPS_THREADS_INACTIVE.getName())
+          .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(), 
OPS_ASSIGNED_TAG_KEY, operatesOn)
+          .meter();
+      // meter will be null if it could not be found, ignore IDE warning if 
one is seen
+      if (threadsInactiveMeter == null) {
+        log.error(
+            "Tried removing meter{name: {} tags: {}={}, {}={}} from the 
registry, but did "
+                + "not find it.",
+            Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
+            type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+      } else {
+        registry.remove(threadsInactiveMeter);
+      }
+
+      cleared();
+    }
+  }
+
+  private void registered(MeterRegistry registry) {
+    this.registry = registry;
+    this.state = State.REGISTERED;
+  }
+
+  private void cleared() {
+    registry = null;
+    state = State.CLEARED;
+  }
+
+  public boolean isRegistered() {
+    return state == State.REGISTERED;
+  }
+
+  private enum State {
+    UNREGISTERED, REGISTERED, CLEARED
+  };
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 4c6bd8bebd..4876c6a160 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -21,6 +21,8 @@ package org.apache.accumulo.core.metrics;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.accumulo.core.fate.FateExecutorMetrics;
+
 public enum Metric {
   // General Server Metrics
   SERVER_IDLE("accumulo.server.idle", MetricType.GAUGE,
@@ -104,7 +106,20 @@ public enum Metric {
       "Count of errors that occurred when attempting to gather fate metrics.",
       MetricDocSection.FATE),
   FATE_TX("accumulo.fate.tx", MetricType.GAUGE,
-      "The state is now in a tag (e.g., state=new, state=in.progress, 
state=failed, etc.).",
+      "Count of FATE operations in a certain state. The state is now in a tag "
+          + "(e.g., state=new, state=in.progress, state=failed, etc.).",
+      MetricDocSection.FATE),
+  FATE_OPS_THREADS_INACTIVE("accumulo.fate.ops.threads.inactive", 
MetricType.GAUGE,
+      "Keeps track of the number of idle threads (not working on a fate 
operation) in the thread pool assigned to work on the operations as shown in 
the "
+          + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+          + " tag. The fate instance type can be found in the "
+          + FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
+      MetricDocSection.FATE),
+  FATE_OPS_THREADS_TOTAL("accumulo.fate.ops.threads.total", MetricType.GAUGE,
+      "Keeps track of the total number of threads in the thread pool assigned 
to work on the operations as shown in the "
+          + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+          + " tag. The fate instance type can be found in the "
+          + FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
       MetricDocSection.FATE),
 
   // Garbage Collection Metrics
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index cb50ebae84..63fae28c47 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -995,8 +995,8 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     }
 
     MetricsInfo metricsInfo = getContext().getMetricsInfo();
-    ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), 
this);
-    var producers = managerMetrics.getProducers(getConfiguration(), this);
+    ManagerMetrics managerMetrics = new ManagerMetrics();
+    List<MetricsProducer> producers = new ArrayList<>();
     producers.add(balanceManager.getMetrics());
 
     final TabletGroupWatcher userTableTGW =
@@ -1109,10 +1109,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
       throw new IllegalStateException("Upgrade coordinator is unexpectedly not 
complete");
     }
 
-    metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
-    metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), 
getApplicationName(),
-        getAdvertiseAddress(), getResourceGroup()));
-
     balanceManager.startBackGroundTask();
     Threads.createCriticalThread("ScanServer Cleanup Thread", new 
ScanServerZKCleaner()).start();
 
@@ -1135,11 +1131,17 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
         throw new IllegalStateException(
             "Unexpected previous fate reference map already initialized");
       }
+      managerMetrics.configureFateMetrics(getConfiguration(), this, 
fateRefs.get());
       fateReadyLatch.countDown();
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException("Exception setting up FaTE cleanup 
thread", e);
     }
 
+    producers.addAll(managerMetrics.getProducers(this));
+    metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
+    metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), 
getApplicationName(),
+        getAdvertiseAddress(), getResourceGroup()));
+
     ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
         .scheduleWithFixedDelay(() -> 
ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index 031e2750b6..c6c0a57c36 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -26,11 +26,14 @@ import static 
org.apache.accumulo.core.metrics.Metric.MANAGER_USER_TGW_ERRORS;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.manager.Manager;
@@ -43,21 +46,24 @@ import io.micrometer.core.instrument.MeterRegistry;
 
 public class ManagerMetrics implements MetricsProducer {
 
-  private final List<FateMetrics<?>> fateMetrics;
+  private List<FateMetrics<?>> fateMetrics;
 
   private final AtomicLong rootTGWErrorsGauge = new AtomicLong(0);
   private final AtomicLong metadataTGWErrorsGauge = new AtomicLong(0);
   private final AtomicLong userTGWErrorsGauge = new AtomicLong(0);
   private final AtomicInteger compactionConfigurationError = new 
AtomicInteger(0);
 
-  public ManagerMetrics(final AccumuloConfiguration conf, final Manager 
manager) {
+  public void configureFateMetrics(final AccumuloConfiguration conf, final 
Manager manager,
+      Map<FateInstanceType,Fate<Manager>> fateRefs) {
     requireNonNull(conf, "AccumuloConfiguration must not be null");
     requireNonNull(conf, "Manager must not be null");
     fateMetrics = List.of(
         new MetaFateMetrics(manager.getContext(),
-            
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)),
+            
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL),
+            fateRefs.get(FateInstanceType.META).getFateExecutors()),
         new UserFateMetrics(manager.getContext(),
-            
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
+            
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL),
+            fateRefs.get(FateInstanceType.USER).getFateExecutors()));
   }
 
   public void incrementTabletGroupWatcherError(DataLevel level) {
@@ -97,7 +103,7 @@ public class ManagerMetrics implements MetricsProducer {
         
.description(COMPACTION_SVC_ERRORS.getDescription()).register(registry);
   }
 
-  public List<MetricsProducer> getProducers(AccumuloConfiguration conf, 
Manager manager) {
+  public List<MetricsProducer> getProducers(Manager manager) {
     ArrayList<MetricsProducer> producers = new ArrayList<>();
     producers.add(this);
     producers.addAll(fateMetrics);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 23cc841e9c..0bbb131e6e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -25,15 +25,18 @@ import static 
org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS;
 import java.util.EnumMap;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.accumulo.core.fate.FateExecutor;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,14 +58,18 @@ public abstract class FateMetrics<T extends 
FateMetricValues> implements Metrics
   protected final ServerContext context;
   protected final ReadOnlyFateStore<FateMetrics<T>> readOnlyFateStore;
   protected final long refreshDelay;
+  private final Set<FateExecutor<Manager>> fateExecutors;
+  private MeterRegistry registry;
 
   protected final AtomicLong totalCurrentOpsCount = new AtomicLong(0);
   private final EnumMap<TStatus,AtomicLong> txStatusCounters = new 
EnumMap<>(TStatus.class);
 
-  public FateMetrics(final ServerContext context, final long 
minimumRefreshDelay) {
+  public FateMetrics(final ServerContext context, final long 
minimumRefreshDelay,
+      Set<FateExecutor<Manager>> fateExecutors) {
     this.context = context;
     this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
     this.readOnlyFateStore = 
Objects.requireNonNull(buildReadOnlyStore(context));
+    this.fateExecutors = fateExecutors;
 
     for (TStatus status : TStatus.values()) {
       txStatusCounters.put(status, new AtomicLong(0));
@@ -91,10 +98,25 @@ public abstract class FateMetrics<T extends 
FateMetricValues> implements Metrics
 
     metricValues.getOpTypeCounters().forEach((name, count) -> Metrics
         .gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), 
count));
+
+    // there may have been new fate executors added, so these need to be 
registered.
+    // fate executors removed will have their metrics removed from the 
registry before they are
+    // removed from the set.
+    if (registry != null) {
+      synchronized (fateExecutors) {
+        fateExecutors.forEach(fe -> {
+          var feMetrics = fe.getFateExecutorMetrics();
+          if (!feMetrics.isRegistered()) {
+            feMetrics.registerMetrics(registry);
+          }
+        });
+      }
+    }
   }
 
   @Override
   public void registerMetrics(final MeterRegistry registry) {
+    this.registry = registry;
     String type = readOnlyFateStore.type().name().toLowerCase();
 
     Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get)
@@ -104,6 +126,10 @@ public abstract class FateMetrics<T extends 
FateMetricValues> implements Metrics
         .builder(FATE_TX.getName(), counter, 
AtomicLong::get).description(FATE_TX.getDescription())
         .tags("state", status.name().toLowerCase(), "instanceType", 
type).register(registry));
 
+    synchronized (fateExecutors) {
+      fateExecutors.forEach(fe -> 
fe.getFateExecutorMetrics().registerMetrics(registry));
+    }
+
     // get fate status is read only operation - no reason to be nice on 
shutdown.
     ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
         .createScheduledExecutorService(1, type + "FateMetricsPoller");
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
index 9d87f9a9cc..44195253ea 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
@@ -21,11 +21,14 @@ package org.apache.accumulo.manager.metrics.fate.meta;
 import static org.apache.accumulo.core.metrics.Metric.FATE_ERRORS;
 import static org.apache.accumulo.core.metrics.Metric.FATE_OPS_ACTIVITY;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.FateExecutor;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.metrics.fate.FateMetrics;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.zookeeper.KeeperException;
@@ -38,8 +41,9 @@ public class MetaFateMetrics extends 
FateMetrics<MetaFateMetricValues> {
   private final AtomicLong totalOpsGauge = new AtomicLong(0);
   private final AtomicLong fateErrorsGauge = new AtomicLong(0);
 
-  public MetaFateMetrics(ServerContext context, long minimumRefreshDelay) {
-    super(context, minimumRefreshDelay);
+  public MetaFateMetrics(ServerContext context, long minimumRefreshDelay,
+      Set<FateExecutor<Manager>> fateExecutors) {
+    super(context, minimumRefreshDelay, fateExecutors);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
index 4f1df05762..7fadf8ae27 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
@@ -18,16 +18,21 @@
  */
 package org.apache.accumulo.manager.metrics.fate.user;
 
+import java.util.Set;
+
+import org.apache.accumulo.core.fate.FateExecutor;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
 import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.metrics.fate.FateMetrics;
 import org.apache.accumulo.server.ServerContext;
 
 public class UserFateMetrics extends FateMetrics<UserFateMetricValues> {
 
-  public UserFateMetrics(ServerContext context, long minimumRefreshDelay) {
-    super(context, minimumRefreshDelay);
+  public UserFateMetrics(ServerContext context, long minimumRefreshDelay,
+      Set<FateExecutor<Manager>> fateExecutors) {
+    super(context, minimumRefreshDelay, fateExecutors);
   }
 
   @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index 2e79fc4729..f022e83f16 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -39,7 +39,7 @@ public class FlakyFate<T> extends Fate<T> {
   public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> 
toLogStrFunc,
       AccumuloConfiguration conf) {
     super(environment, store, false, toLogStrFunc, conf, new 
ScheduledThreadPoolExecutor(2));
-    for (var poolConfig : getPoolConfigurations(conf).entrySet()) {
+    for (var poolConfig : getPoolConfigurations(conf, 
getStore().type()).entrySet()) {
       fateExecutors.add(
           new FlakyFateExecutor<>(this, environment, poolConfig.getKey(), 
poolConfig.getValue()));
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java 
b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
new file mode 100644
index 0000000000..b413258ce2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateExecutor;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Causes the first step in a Split FATE operation to sleep for
+ * {@link SlowFateSplitManager#SLEEP_TIME_MS}. Split was chosen as it can be 
executed on system and
+ * user tables (allowing for testing of meta and user fate ops) and is easy to 
avoid automatic
+ * splits from occurring (just don't have tables exceed the split threshold in 
testing). This allows
+ * us to sleep for splits we initiate via {@link TableOperation}. This is 
useful when we want a fate
+ * thread to be occupied working on some operation. A potential alternative to 
this is compacting
+ * with a {@link SlowIterator} attached to the table, however, this will 
result in the operation
+ * never being ready ({@link Repo#isReady(FateId, Object)}). So, it would 
exist as an operation to
+ * work on, but a thread will briefly reserve it, see it's not isReady, and 
unreserve it. This class
+ * is useful when we need a fate op reserved by a thread and being worked on 
for a configurable
+ * time, but don't have direct access to the Fate objects/are testing Fate as 
it operates within the
+ * Manager instead of directly working with Fate objects.
+ */
+public class SlowFateSplit<T> extends Fate<T> {
+  private static final Logger log = 
LoggerFactory.getLogger(SlowFateSplit.class);
+  private boolean haveSlept = false;
+
+  public SlowFateSplit(T environment, FateStore<T> store, 
Function<Repo<T>,String> toLogStrFunc,
+      AccumuloConfiguration conf) {
+    super(environment, store, false, toLogStrFunc, conf, new 
ScheduledThreadPoolExecutor(2));
+    for (var poolConfig : getPoolConfigurations(conf, 
getStore().type()).entrySet()) {
+      fateExecutors.add(
+          new SlowFateSplitExecutor(this, environment, poolConfig.getKey(), 
poolConfig.getValue()));
+    }
+  }
+
+  private class SlowFateSplitExecutor extends FateExecutor<T> {
+    private SlowFateSplitExecutor(Fate<T> fate, T environment, 
Set<Fate.FateOperation> fateOps,
+        int poolSize) {
+      super(fate, environment, fateOps, poolSize);
+    }
+
+    @Override
+    protected Repo<T> executeCall(FateId fateId, Repo<T> repo) throws 
Exception {
+      var next = super.executeCall(fateId, repo);
+      var fateOp = (FateOperation) SlowFateSplit.this.getStore().read(fateId)
+          .getTransactionInfo(TxInfo.FATE_OP);
+      if (fateOp == SlowFateSplitManager.SLOW_OP && !haveSlept) {
+        var sleepTime = SlowFateSplitManager.SLEEP_TIME_MS;
+        log.debug("{} sleeping in {} for {}", fateId, 
getClass().getSimpleName(), sleepTime);
+        Thread.sleep(sleepTime);
+        log.debug("{} slept in {} for {}", fateId, getClass().getSimpleName(), 
sleepTime);
+        haveSlept = true;
+      }
+      return next;
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java 
b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java
new file mode 100644
index 0000000000..ba2c96c69c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See {@link SlowFateSplit}
+ */
+public class SlowFateSplitManager extends Manager {
+  private static final Logger log = 
LoggerFactory.getLogger(SlowFateSplitManager.class);
+  // causes splits to take at least 10 seconds to complete
+  public static final long SLEEP_TIME_MS = 10_000;
+  // important that this is an op that can be initiated on some system table 
as well as user tables
+  public static final Fate.FateOperation SLOW_OP = 
Fate.FateOperation.TABLE_SPLIT;
+
+  protected SlowFateSplitManager(ConfigOpts opts, String[] args) throws 
IOException {
+    super(opts, ServerContext::new, args);
+  }
+
+  @Override
+  protected Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
+    log.info("Creating Slow Split Fate for {}", store.type());
+    return new SlowFateSplit<>(this, store, TraceRepo::toLogString, 
getConfiguration());
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (SlowFateSplitManager manager = new SlowFateSplitManager(new 
ConfigOpts(), args)) {
+      manager.runServer();
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index a670f58633..ff1859c092 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -23,6 +23,8 @@ import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILED;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILURES_CONSECUTIVE;
 import static 
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILURES_TERMINATION;
 import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK;
+import static 
org.apache.accumulo.core.metrics.Metric.FATE_OPS_THREADS_INACTIVE;
+import static org.apache.accumulo.core.metrics.Metric.FATE_OPS_THREADS_TOTAL;
 import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS;
 import static 
org.apache.accumulo.core.metrics.Metric.MANAGER_BALANCER_MIGRATIONS_NEEDED;
 import static org.apache.accumulo.core.metrics.Metric.SCAN_BUSY_TIMEOUT_COUNT;
@@ -50,6 +52,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.cli.ConfigOpts;
 import org.apache.accumulo.core.client.Accumulo;
@@ -62,13 +65,19 @@ import 
org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateExecutorMetrics;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.metadata.SystemTables;
 import org.apache.accumulo.core.metrics.Metric;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FateTestUtil;
+import org.apache.accumulo.test.fate.SlowFateSplitManager;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.conf.Configuration;
@@ -76,12 +85,17 @@ import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.micrometer.core.instrument.MeterRegistry;
 
 public class MetricsIT extends ConfigurableMacBase implements MetricsProducer {
-
+  private static final Logger log = LoggerFactory.getLogger(MetricsIT.class);
   private static TestStatsDSink sink;
+  private static final int numFateThreadsPool1 = 5;
+  private static final int numFateThreadsPool2 = 10;
+  private static final int numFateThreadsPool3 = 15;
 
   @Override
   protected Duration defaultTimeout() {
@@ -116,6 +130,16 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
     Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
         TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
     cfg.setSystemProperties(sysProps);
+    // custom config for the fate thread pools.
+    // starting FATE config for each FATE type (USER and META) will be:
+    // {<all fate ops>: numFateThreadsPool1} (one pool for all ops of size 
numFateThreadsPool1)
+    var fatePoolsConfig = 
FateTestUtil.createTestFateConfig(numFateThreadsPool1);
+    cfg.setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
+        fatePoolsConfig.get(Property.MANAGER_FATE_USER_CONFIG));
+    cfg.setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
+        fatePoolsConfig.get(Property.MANAGER_FATE_META_CONFIG));
+    // Make splits run slowly, used for testing the fate metrics
+    cfg.setServerClass(ServerType.MANAGER, r -> SlowFateSplitManager.class);
   }
 
   @Test
@@ -232,12 +256,228 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
     cluster.stop();
   }
 
+  @Test
+  public void testFateExecutorMetrics() throws Exception {
+    // Tests metrics for Fate's thread pools. Tests that metrics are seen as 
expected, and config
+    // changes to the thread pools are accurately reflected in the metrics. 
This includes checking
+    // that old thread pool metrics are removed, new ones are created, size 
changes to thread
+    // pools are reflected, and the ops assigned and instance type tags are 
seen as expected
+    final String table = getUniqueNames(1)[0];
+
+    // prevent any system initiated fate operations from running, which may 
interfere with our
+    // metrics gathering
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    try {
+      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+        client.tableOperations().create(table);
+
+        SortedSet<Text> splits = new TreeSet<>();
+        splits.add(new Text("foo"));
+        // initiate 1 USER fate op which will execute slowly
+        client.tableOperations().addSplits(table, splits);
+        // initiate 1 META fate op which will execute slowly
+        client.tableOperations().addSplits(SystemTables.METADATA.tableName(), 
splits);
+
+        // let metrics build up
+        Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 
3);
+
+        boolean sawExpectedTotalThreadsUserMetric = false;
+        boolean sawExpectedTotalThreadsMetaMetric = false;
+        boolean sawExpectedInactiveUserThreads = false;
+        boolean sawExpectedInactiveMetaThreads = false;
+        // For each FATE instance, should see at least one metric for the 
following:
+        // inactive = configured size - num fate ops initiated = configured 
size - 1
+        // total = configured size
+        for (var line : sink.getLines()) {
+          TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+          // if the metric is not one of the fate executor metrics...
+          if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+            continue;
+          }
+          var tags = metric.getTags();
+          var instanceType = FateInstanceType
+              
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+          var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+          verifyFateMetricTags(opsAssigned, instanceType);
+
+          if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())
+              && numFateThreadsPool1 == Integer.parseInt(metric.getValue())) {
+            if (instanceType == FateInstanceType.USER) {
+              sawExpectedTotalThreadsUserMetric = true;
+            } else if (instanceType == FateInstanceType.META) {
+              sawExpectedTotalThreadsMetaMetric = true;
+            }
+          } else if 
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 
1)) {
+            if (instanceType == FateInstanceType.USER) {
+              sawExpectedInactiveUserThreads = true;
+            } else if (instanceType == FateInstanceType.META) {
+              sawExpectedInactiveMetaThreads = true;
+            }
+          }
+        }
+        assertTrue(sawExpectedInactiveUserThreads);
+        assertTrue(sawExpectedInactiveMetaThreads);
+        assertTrue(sawExpectedTotalThreadsUserMetric);
+        assertTrue(sawExpectedTotalThreadsMetaMetric);
+
+        // Now change the config from:
+        // {<all fate ops>: numFateThreadsPool1}
+        // ->
+        // {<all fate ops except split>: numFateThreadsPool2,
+        // <split operation>: numFateThreadsPool3}
+        changeFateConfig(client, FateInstanceType.USER);
+        changeFateConfig(client, FateInstanceType.META);
+
+        // Allow FATE config changes to be picked up. Will take at most 
POOL_WATCHER_DELAY to
+        // commence, provide a buffer to allow to complete.
+        Thread.sleep(Fate.POOL_WATCHER_DELAY.toMillis() + 5_000);
+        // sink metrics, expect from this point onward that we will no longer 
see metrics for the
+        // old pool (pool1), only metrics for new pools (pool2 and pool3)
+        sink.getLines();
+
+        // let metrics build back up
+        Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 
3);
+
+        boolean sawAnyMetricPool1 = false;
+
+        boolean sawExpectedTotalThreadsUserMetricPool2 = false;
+        boolean sawExpectedTotalThreadsMetaMetricPool2 = false;
+        boolean sawExpectedInactiveUserThreadsPool2 = false;
+        boolean sawExpectedInactiveMetaThreadsPool2 = false;
+
+        boolean sawExpectedTotalThreadsUserMetricPool3 = false;
+        boolean sawExpectedTotalThreadsMetaMetricPool3 = false;
+        boolean sawExpectedInactiveUserThreadsPool3 = false;
+        boolean sawExpectedInactiveMetaThreadsPool3 = false;
+        for (var line : sink.getLines()) {
+          TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+          // if the metric is not one of the fate executor metrics...
+          if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+            continue;
+          }
+          var tags = metric.getTags();
+          var instanceType = FateInstanceType
+              
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+          var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+          verifyFateMetricTags(opsAssigned, instanceType);
+
+          Set<Fate.FateOperation> fateOpsFromMetric = 
gatherFateOpsFromTag(opsAssigned);
+
+          if (fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())
+              || 
fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) {
+            sawAnyMetricPool1 = true;
+          } else if 
(fateOpsFromMetric.equals(Arrays.stream(Fate.FateOperation.values())
+              .filter(fo -> 
!fo.equals(SlowFateSplitManager.SLOW_OP)).collect(Collectors.toSet()))
+              && numFateThreadsPool2 == Integer.parseInt(metric.getValue())) {
+            // pool2
+            // total = inactive = size pool2
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedInactiveUserThreadsPool2 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedInactiveMetaThreadsPool2 = true;
+              }
+            } else if 
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedTotalThreadsUserMetricPool2 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedTotalThreadsMetaMetricPool2 = true;
+              }
+            }
+          } else if 
(fateOpsFromMetric.equals(Set.of(SlowFateSplitManager.SLOW_OP))
+              && numFateThreadsPool3 == Integer.parseInt(metric.getValue())) {
+            // pool3
+            // total = inactive = size pool3
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedInactiveUserThreadsPool3 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedInactiveMetaThreadsPool3 = true;
+              }
+            } else if 
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedTotalThreadsUserMetricPool3 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedTotalThreadsMetaMetricPool3 = true;
+              }
+            }
+          } else {
+            throw new IllegalStateException("Saw unexpected FATE executor 
metric: " + metric);
+          }
+        }
+
+        assertFalse(sawAnyMetricPool1);
+        assertTrue(sawExpectedTotalThreadsUserMetricPool2);
+        assertTrue(sawExpectedTotalThreadsMetaMetricPool2);
+        assertTrue(sawExpectedInactiveUserThreadsPool2);
+        assertTrue(sawExpectedInactiveMetaThreadsPool2);
+        assertTrue(sawExpectedTotalThreadsUserMetricPool3);
+        assertTrue(sawExpectedTotalThreadsMetaMetricPool3);
+        assertTrue(sawExpectedInactiveUserThreadsPool3);
+        assertTrue(sawExpectedInactiveMetaThreadsPool3);
+      }
+    } finally {
+      getCluster().getClusterControl().startAllServers(ServerType.COMPACTOR);
+      
getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR);
+    }
+  }
+
+  /**
+   * Verifies what should always be true for fate metrics tags: The ops 
assigned tag should include
+   * all the ops that are associated with a pool
+   */
+  private void verifyFateMetricTags(String opsAssignedInMetric, 
FateInstanceType type) {
+    var opsAssignedInConfig =
+        
Fate.getPoolConfigurations(getCluster().getServerContext().getConfiguration(), 
type);
+    assertNotNull(opsAssignedInConfig);
+
+    var fateOpsFromMetric = gatherFateOpsFromTag(opsAssignedInMetric);
+
+    assertNotNull(opsAssignedInConfig.get(fateOpsFromMetric));
+  }
+
+  private void changeFateConfig(AccumuloClient client, FateInstanceType type) 
throws Exception {
+    Set<Fate.FateOperation> allFateOps = null;
+    if (type == FateInstanceType.USER) {
+      allFateOps = new HashSet<>(Fate.FateOperation.getAllUserFateOps());
+    } else if (type == FateInstanceType.META) {
+      allFateOps = new HashSet<>(Fate.FateOperation.getAllMetaFateOps());
+    }
+    assertNotNull(allFateOps);
+    allFateOps.remove(SlowFateSplitManager.SLOW_OP);
+    String newFateConfig =
+        "{'" + 
allFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "': "
+            + numFateThreadsPool2 + ",'" + SlowFateSplitManager.SLOW_OP.name() 
+ "': "
+            + numFateThreadsPool3 + "}";
+    newFateConfig = newFateConfig.replace("'", "\"");
+
+    if (type == FateInstanceType.USER) {
+      
client.instanceOperations().setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
+          newFateConfig);
+    } else if (type == FateInstanceType.META) {
+      
client.instanceOperations().setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
+          newFateConfig);
+    }
+  }
+
+  private Set<Fate.FateOperation> gatherFateOpsFromTag(String opsAssigned) {
+    String[] ops = opsAssigned.split("\\.");
+    Set<Fate.FateOperation> fateOpsFromMetric = new HashSet<>();
+    for (var op : ops) {
+      fateOpsFromMetric.add(Fate.FateOperation.valueOf(op.toUpperCase()));
+    }
+    return fateOpsFromMetric;
+  }
+
   static void doWorkToGenerateMetrics(AccumuloClient client, Class<?> 
testClass) throws Exception {
     String tableName = testClass.getSimpleName();
     client.tableOperations().create(tableName);
-    SortedSet<Text> splits = new TreeSet<>(List.of(new Text("5")));
-    client.tableOperations().addSplits(tableName, splits);
-    Thread.sleep(3_000);
     BatchWriterConfig config = new BatchWriterConfig().setMaxMemory(0);
     try (BatchWriter writer = client.createBatchWriter(tableName, config)) {
       Mutation m = new Mutation("row");
@@ -313,7 +553,7 @@ public class MetricsIT extends ConfigurableMacBase 
implements MetricsProducer {
             assertEquals("value2", a.getTags().get("tag2"));
 
             // check the length of the tag value is sane
-            final int MAX_EXPECTED_TAG_LEN = 128;
+            final int MAX_EXPECTED_TAG_LEN = 512;
             a.getTags().forEach((k, v) -> assertTrue(v.length() < 
MAX_EXPECTED_TAG_LEN));
           });
     }

Reply via email to