This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 11d972cd8b Improves FATE metrics (#5798)
11d972cd8b is described below
commit 11d972cd8b9140aafd424f969050142dc68f669f
Author: Kevin Rathbun <[email protected]>
AuthorDate: Thu Aug 28 09:54:40 2025 -0400
Improves FATE metrics (#5798)
* Improves FATE metrics
- Removes FATE metrics which are no longer useful (due to 4.0 FATE changes)
and replaces them with more appropriate metrics.
- These new metrics are: a Gauge to track the number of total
threads per FateExecutor and a Gauge to track the total number of idle threads
(not currently working on a fate operation) per FateExecutor.
- Background info/existing functionality:
- Fate's set of fate executors may change as Fate
config is changed. Each FateExecutor has a pool of threads which work on some
subset of fate operations based on the fate configuration
properties. If these properties are changed such
that a FateExecutor is no longer applicable, the FateExecutor is stopped and
new FateExecutor(s) are started to accurately represent
config changes.
- What this means for these metric changes:
- If a FateExecutor is stopped, it's metrics will
be removed from the registry. This avoids the registry getting flooded with
metrics that are no longer applicable/no longer give
relevant info on the Fate system.
- Adds testing for these new metrics in MetricsIT and adds two new classes
to aid in testing these metrics: SlowFateSplitManager and SlowFateSplit.
- Some minor misc. changes:
- Improved description for the deprecated
MANAGER_FATE_THREADPOOL_SIZE property
- Improved the set of non thrift ops in the Fate class to gather
the ops at run time
- Improved the FATE_TX metric description
closes #5147
---
.../org/apache/accumulo/core/conf/Property.java | 16 +-
.../java/org/apache/accumulo/core/fate/Fate.java | 38 +++-
.../apache/accumulo/core/fate/FateExecutor.java | 38 +++-
.../accumulo/core/fate/FateExecutorMetrics.java | 121 ++++++++++
.../org/apache/accumulo/core/metrics/Metric.java | 17 +-
.../java/org/apache/accumulo/manager/Manager.java | 14 +-
.../accumulo/manager/metrics/ManagerMetrics.java | 16 +-
.../accumulo/manager/metrics/fate/FateMetrics.java | 28 ++-
.../manager/metrics/fate/meta/MetaFateMetrics.java | 8 +-
.../manager/metrics/fate/user/UserFateMetrics.java | 9 +-
.../org/apache/accumulo/test/fate/FlakyFate.java | 2 +-
.../apache/accumulo/test/fate/SlowFateSplit.java | 84 +++++++
.../accumulo/test/fate/SlowFateSplitManager.java | 57 +++++
.../apache/accumulo/test/metrics/MetricsIT.java | 250 ++++++++++++++++++++-
14 files changed, 645 insertions(+), 53 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b63b6a0d9f..1b21986f5e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -458,13 +458,6 @@ public enum Property {
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
- @Deprecated(since = "4.0.0")
- MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
- PropertyType.FATE_THREADPOOL_SIZE,
- "Previously, the number of threads used to run fault-tolerant executions
(FATE)."
- + " This is no longer used in 4.0+. MANAGER_FATE_USER_CONFIG and"
- + " MANAGER_FATE_META_CONFIG are the replacement and must be set
instead.",
- "1.4.3"),
MANAGER_FATE_USER_CONFIG("manager.fate.user.config",
"{\"TABLE_CREATE,TABLE_DELETE,TABLE_RENAME,TABLE_ONLINE,TABLE_OFFLINE,NAMESPACE_CREATE,"
+
"NAMESPACE_DELETE,NAMESPACE_RENAME,TABLE_TABLET_AVAILABILITY,SHUTDOWN_TSERVER,"
@@ -491,6 +484,15 @@ public enum Property {
+ "more FATE operations and each value is the number of threads that
will be assigned "
+ "to the pool.",
"4.0.0"),
+ @Deprecated(since = "4.0.0")
+ MANAGER_FATE_THREADPOOL_SIZE("manager.fate.threadpool.size", "64",
+ PropertyType.FATE_THREADPOOL_SIZE,
+ String.format(
+ "Previously, the number of threads used to run fault-tolerant
executions (FATE)."
+ + " This is no longer used in 4.0+. %s and %s are the
replacement and must be"
+ + " set instead.",
+ MANAGER_FATE_USER_CONFIG.getKey(),
MANAGER_FATE_META_CONFIG.getKey()),
+ "1.4.3"),
MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
PropertyType.TIMEDURATION,
"The interval at which to check if the number of idle Fate threads has
consistently been zero."
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 42b30d88b7..87c237f2ad 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -80,7 +80,7 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3);
- private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
+ public static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
// Visible for FlakyFate test object
@@ -115,8 +115,8 @@ public class Fate<T> {
TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);
private final TFateOperation top;
- private static final Set<FateOperation> nonThriftOps =
Collections.unmodifiableSet(
- EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT,
SYSTEM_MERGE));
+ private static final Set<FateOperation> nonThriftOps =
Arrays.stream(FateOperation.values())
+ .filter(fateOp -> fateOp.top ==
null).collect(Collectors.toUnmodifiableSet());
private static final Set<FateOperation> allUserFateOps =
Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class));
private static final Set<FateOperation> allMetaFateOps =
@@ -171,7 +171,7 @@ public class Fate<T> {
public void run() {
// Read from the config here and here only. Must avoid reading the same
property from the
// config more than once since it can change at any point in this
execution
- final var poolConfigs = getPoolConfigurations(conf);
+ final var poolConfigs = getPoolConfigurations(conf, store.type());
final var idleCheckIntervalMillis =
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
@@ -188,7 +188,7 @@ public class Fate<T> {
log.debug(
"[{}] The config for {} has changed invalidating {}.
Gracefully shutting down "
+ "the FateExecutor.",
- store.type(), getFateConfigProp(), fateExecutor);
+ store.type(), getFateConfigProp(store.type()), fateExecutor);
fateExecutor.initiateShutdown();
} else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
log.debug("[{}] {} has been shutdown, but is still actively
working on transactions.",
@@ -272,7 +272,6 @@ public class Fate<T> {
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
-
}
/**
@@ -280,9 +279,11 @@ public class Fate<T> {
* of fate operations and each value is an integer for the number of threads
assigned to work
* those fate operations.
*/
- protected Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf) {
+ @VisibleForTesting
+ public static Map<Set<FateOperation>,Integer>
getPoolConfigurations(AccumuloConfiguration conf,
+ FateInstanceType type) {
Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>();
- final var json =
JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject();
+ final var json =
JsonParser.parseString(conf.get(getFateConfigProp(type))).getAsJsonObject();
for (var entry : json.entrySet()) {
var key = entry.getKey();
@@ -305,19 +306,31 @@ public class Fate<T> {
return store;
}
- protected Property getFateConfigProp() {
- return this.store.type() == FateInstanceType.USER ?
Property.MANAGER_FATE_USER_CONFIG
+ protected static Property getFateConfigProp(FateInstanceType type) {
+ return type == FateInstanceType.USER ? Property.MANAGER_FATE_USER_CONFIG
: Property.MANAGER_FATE_META_CONFIG;
}
+ /**
+ * Exists for overrides in test code. Internal access to this field needs to
be through this
+ * getter
+ */
public Duration getDeadResCleanupDelay() {
return DEAD_RES_CLEANUP_DELAY;
}
+ /**
+ * Exists for overrides in test code. Internal access to this field needs to
be through this
+ * getter
+ */
public Duration getPoolWatcherDelay() {
return POOL_WATCHER_DELAY;
}
+ public Set<FateExecutor<T>> getFateExecutors() {
+ return fateExecutors;
+ }
+
/**
* Returns the number of TransactionRunners active for the FateExecutor
assigned to work on the
* given set of fate operations. "Active" meaning it is waiting for a
transaction to work on or
@@ -523,9 +536,12 @@ public class Fate<T> {
// interrupt the background threads
synchronized (fateExecutors) {
- for (var fateExecutor : fateExecutors) {
+ var fateExecutorsIter = fateExecutors.iterator();
+ while (fateExecutorsIter.hasNext()) {
+ var fateExecutor = fateExecutorsIter.next();
fateExecutor.shutdownNow();
fateExecutor.getIdleCountHistory().clear();
+ fateExecutorsIter.remove();
}
}
if (deadResCleanerExecutor != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 234ce34432..9710981b09 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
@@ -61,7 +62,9 @@ import com.google.common.base.Preconditions;
/**
* Handles finding and working on FATE work. Only finds/works on fate
operations that it is assigned
- * to work on defined by 'fateOps'
+ * to work on defined by 'fateOps'. These executors may be stopped and new
ones started throughout
+ * FATEs life, depending on changes to {@link
Property#MANAGER_FATE_USER_CONFIG} and
+ * {@link Property#MANAGER_FATE_META_CONFIG}.
*/
public class FateExecutor<T> {
private static final Logger log =
LoggerFactory.getLogger(FateExecutor.class);
@@ -71,19 +74,22 @@ public class FateExecutor<T> {
private final Fate<T> fate;
private final Thread workFinder;
private final TransferQueue<FateId> workQueue;
- private final AtomicInteger idleWorkerCount = new AtomicInteger(0);
+ private final AtomicInteger idleWorkerCount;
private final String poolName;
private final ThreadPoolExecutor transactionExecutor;
private final Set<TransactionRunner> runningTxRunners;
private final Set<Fate.FateOperation> fateOps;
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new
ConcurrentLinkedQueue<>();
+ private final FateExecutorMetrics<T> fateExecutorMetrics;
public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation>
fateOps, int poolSize) {
- final String operatesOn = fate.getStore().type().name().toLowerCase() + "."
- + fateOps.stream().map(fo ->
fo.name().toLowerCase()).collect(Collectors.joining("."));
+ final FateInstanceType type = fate.getStore().type();
+ final String typeStr = type.name().toLowerCase();
+ final String operatesOn = fateOps.stream().map(fo ->
fo.name().toLowerCase()).sorted()
+ .collect(Collectors.joining("."));
final String transactionRunnerPoolName =
- ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + operatesOn;
- final String workFinderThreadName = "fate.work.finder." + operatesOn;
+ ThreadPoolNames.MANAGER_FATE_POOL_PREFIX.poolName + typeStr + "." +
operatesOn;
+ final String workFinderThreadName = "fate.work.finder." + typeStr + "." +
operatesOn;
this.fate = fate;
this.environment = environment;
@@ -91,9 +97,11 @@ public class FateExecutor<T> {
this.workQueue = new LinkedTransferQueue<>();
this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
this.poolName = transactionRunnerPoolName;
- this.transactionExecutor =
-
ThreadPools.getServerThreadPools().getPoolBuilder(transactionRunnerPoolName)
- .numCoreThreads(poolSize).enableThreadPoolMetrics().build();
+ this.transactionExecutor = ThreadPools.getServerThreadPools()
+
.getPoolBuilder(transactionRunnerPoolName).numCoreThreads(poolSize).build();
+ this.idleWorkerCount = new AtomicInteger(0);
+ this.fateExecutorMetrics =
+ new FateExecutorMetrics<>(type, operatesOn, runningTxRunners,
idleWorkerCount);
this.workFinder = Threads.createCriticalThread(workFinderThreadName, new
WorkFinder());
this.workFinder.start();
@@ -159,7 +167,7 @@ public class FateExecutor<T> {
// split into separate pools.
final long interval =
Math.min(60,
TimeUnit.MILLISECONDS.toMinutes(idleCheckIntervalMillis));
- var fateConfigProp = fate.getFateConfigProp();
+ var fateConfigProp = Fate.getFateConfigProp(fate.getStore().type());
if (interval == 0) {
idleCountHistory.clear();
@@ -211,11 +219,16 @@ public class FateExecutor<T> {
return fateOps;
}
+ public FateExecutorMetrics<T> getFateExecutorMetrics() {
+ return fateExecutorMetrics;
+ }
+
/**
* Initiates the shutdown of this FateExecutor. This means the pool
executing TransactionRunners
* will no longer accept new TransactionRunners, the currently running
TransactionRunners will
- * terminate after they are done with their current transaction, if
applicable, and the work
- * finder is shutdown. {@link #isShutdown()} returns true after this is
called.
+ * terminate after they are done with their current transaction, if
applicable, the work finder is
+ * shutdown, and the metrics created for this FateExecutor are removed from
the registry (if
+ * metrics were enabled). {@link #isShutdown()} returns true after this is
called.
*/
protected void initiateShutdown() {
log.debug("Initiated shutdown {}", fateOps);
@@ -223,6 +236,7 @@ public class FateExecutor<T> {
synchronized (runningTxRunners) {
runningTxRunners.forEach(TransactionRunner::flagStop);
}
+ fateExecutorMetrics.clearMetrics();
// work finder will terminate since this.isShutdown() is true
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
new file mode 100644
index 0000000000..fec087519f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.metrics.Metric;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class FateExecutorMetrics<T> implements MetricsProducer {
+ private static final Logger log =
LoggerFactory.getLogger(FateExecutorMetrics.class);
+ private final FateInstanceType type;
+ private final String operatesOn;
+ private final Set<FateExecutor<T>.TransactionRunner> runningTxRunners;
+ private final AtomicInteger idleWorkerCount;
+ private MeterRegistry registry;
+ private State state;
+ public static final String INSTANCE_TYPE_TAG_KEY = "instanceType";
+ public static final String OPS_ASSIGNED_TAG_KEY = "ops.assigned";
+
+ protected FateExecutorMetrics(FateInstanceType type, String operatesOn,
+ Set<FateExecutor<T>.TransactionRunner> runningTxRunners, AtomicInteger
idleWorkerCount) {
+ this.type = type;
+ this.operatesOn = operatesOn;
+ this.runningTxRunners = runningTxRunners;
+ this.state = State.UNREGISTERED;
+ this.idleWorkerCount = idleWorkerCount;
+ }
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ // noop if already registered or cleared
+ if (state == State.UNREGISTERED) {
+ Gauge.builder(Metric.FATE_OPS_THREADS_TOTAL.getName(),
runningTxRunners::size)
+ .description(Metric.FATE_OPS_THREADS_TOTAL.getDescription())
+ .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
+ .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+ Gauge.builder(Metric.FATE_OPS_THREADS_INACTIVE.getName(),
idleWorkerCount::get)
+ .description(Metric.FATE_OPS_THREADS_INACTIVE.getDescription())
+ .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase())
+ .tag(OPS_ASSIGNED_TAG_KEY, operatesOn).register(registry);
+
+ registered(registry);
+ }
+ }
+
+ public void clearMetrics() {
+ // noop if metrics were never registered or have already been cleared
+ if (state == State.REGISTERED) {
+ var threadsTotalMeter =
registry.find(Metric.FATE_OPS_THREADS_TOTAL.getName())
+ .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
OPS_ASSIGNED_TAG_KEY, operatesOn)
+ .meter();
+ // meter will be null if it could not be found, ignore IDE warning if
one is seen
+ if (threadsTotalMeter == null) {
+ log.error(
+ "Tried removing meter{name: {} tags: {}={}, {}={}} from the
registry, but did "
+ + "not find it.",
+ Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
+ type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+ } else {
+ registry.remove(threadsTotalMeter);
+ }
+
+ var threadsInactiveMeter =
registry.find(Metric.FATE_OPS_THREADS_INACTIVE.getName())
+ .tags(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase(),
OPS_ASSIGNED_TAG_KEY, operatesOn)
+ .meter();
+ // meter will be null if it could not be found, ignore IDE warning if
one is seen
+ if (threadsInactiveMeter == null) {
+ log.error(
+ "Tried removing meter{name: {} tags: {}={}, {}={}} from the
registry, but did "
+ + "not find it.",
+ Metric.FATE_OPS_THREADS_TOTAL.getName(), INSTANCE_TYPE_TAG_KEY,
+ type.name().toLowerCase(), OPS_ASSIGNED_TAG_KEY, operatesOn);
+ } else {
+ registry.remove(threadsInactiveMeter);
+ }
+
+ cleared();
+ }
+ }
+
+ private void registered(MeterRegistry registry) {
+ this.registry = registry;
+ this.state = State.REGISTERED;
+ }
+
+ private void cleared() {
+ registry = null;
+ state = State.CLEARED;
+ }
+
+ public boolean isRegistered() {
+ return state == State.REGISTERED;
+ }
+
+ private enum State {
+ UNREGISTERED, REGISTERED, CLEARED
+ };
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 4c6bd8bebd..4876c6a160 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -21,6 +21,8 @@ package org.apache.accumulo.core.metrics;
import java.util.HashMap;
import java.util.Map;
+import org.apache.accumulo.core.fate.FateExecutorMetrics;
+
public enum Metric {
// General Server Metrics
SERVER_IDLE("accumulo.server.idle", MetricType.GAUGE,
@@ -104,7 +106,20 @@ public enum Metric {
"Count of errors that occurred when attempting to gather fate metrics.",
MetricDocSection.FATE),
FATE_TX("accumulo.fate.tx", MetricType.GAUGE,
- "The state is now in a tag (e.g., state=new, state=in.progress,
state=failed, etc.).",
+ "Count of FATE operations in a certain state. The state is now in a tag "
+ + "(e.g., state=new, state=in.progress, state=failed, etc.).",
+ MetricDocSection.FATE),
+ FATE_OPS_THREADS_INACTIVE("accumulo.fate.ops.threads.inactive",
MetricType.GAUGE,
+ "Keeps track of the number of idle threads (not working on a fate
operation) in the thread pool assigned to work on the operations as shown in
the "
+ + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+ + " tag. The fate instance type can be found in the "
+ + FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
+ MetricDocSection.FATE),
+ FATE_OPS_THREADS_TOTAL("accumulo.fate.ops.threads.total", MetricType.GAUGE,
+ "Keeps track of the total number of threads in the thread pool assigned
to work on the operations as shown in the "
+ + FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY
+ + " tag. The fate instance type can be found in the "
+ + FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY + " tag.",
MetricDocSection.FATE),
// Garbage Collection Metrics
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index cb50ebae84..63fae28c47 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -995,8 +995,8 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
}
MetricsInfo metricsInfo = getContext().getMetricsInfo();
- ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(),
this);
- var producers = managerMetrics.getProducers(getConfiguration(), this);
+ ManagerMetrics managerMetrics = new ManagerMetrics();
+ List<MetricsProducer> producers = new ArrayList<>();
producers.add(balanceManager.getMetrics());
final TabletGroupWatcher userTableTGW =
@@ -1109,10 +1109,6 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
throw new IllegalStateException("Upgrade coordinator is unexpectedly not
complete");
}
- metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
- metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(),
getApplicationName(),
- getAdvertiseAddress(), getResourceGroup()));
-
balanceManager.startBackGroundTask();
Threads.createCriticalThread("ScanServer Cleanup Thread", new
ScanServerZKCleaner()).start();
@@ -1135,11 +1131,17 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
throw new IllegalStateException(
"Unexpected previous fate reference map already initialized");
}
+ managerMetrics.configureFateMetrics(getConfiguration(), this,
fateRefs.get());
fateReadyLatch.countDown();
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up FaTE cleanup
thread", e);
}
+ producers.addAll(managerMetrics.getProducers(this));
+ metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
+ metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(),
getApplicationName(),
+ getAdvertiseAddress(), getResourceGroup()));
+
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(() ->
ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index 031e2750b6..c6c0a57c36 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -26,11 +26,14 @@ import static
org.apache.accumulo.core.metrics.Metric.MANAGER_USER_TGW_ERRORS;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.manager.Manager;
@@ -43,21 +46,24 @@ import io.micrometer.core.instrument.MeterRegistry;
public class ManagerMetrics implements MetricsProducer {
- private final List<FateMetrics<?>> fateMetrics;
+ private List<FateMetrics<?>> fateMetrics;
private final AtomicLong rootTGWErrorsGauge = new AtomicLong(0);
private final AtomicLong metadataTGWErrorsGauge = new AtomicLong(0);
private final AtomicLong userTGWErrorsGauge = new AtomicLong(0);
private final AtomicInteger compactionConfigurationError = new
AtomicInteger(0);
- public ManagerMetrics(final AccumuloConfiguration conf, final Manager
manager) {
+ public void configureFateMetrics(final AccumuloConfiguration conf, final
Manager manager,
+ Map<FateInstanceType,Fate<Manager>> fateRefs) {
requireNonNull(conf, "AccumuloConfiguration must not be null");
requireNonNull(conf, "Manager must not be null");
fateMetrics = List.of(
new MetaFateMetrics(manager.getContext(),
-
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)),
+
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL),
+ fateRefs.get(FateInstanceType.META).getFateExecutors()),
new UserFateMetrics(manager.getContext(),
-
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
+
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL),
+ fateRefs.get(FateInstanceType.USER).getFateExecutors()));
}
public void incrementTabletGroupWatcherError(DataLevel level) {
@@ -97,7 +103,7 @@ public class ManagerMetrics implements MetricsProducer {
.description(COMPACTION_SVC_ERRORS.getDescription()).register(registry);
}
- public List<MetricsProducer> getProducers(AccumuloConfiguration conf,
Manager manager) {
+ public List<MetricsProducer> getProducers(Manager manager) {
ArrayList<MetricsProducer> producers = new ArrayList<>();
producers.add(this);
producers.addAll(fateMetrics);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 23cc841e9c..0bbb131e6e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -25,15 +25,18 @@ import static
org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS;
import java.util.EnumMap;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.fate.FateExecutor;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,14 +58,18 @@ public abstract class FateMetrics<T extends
FateMetricValues> implements Metrics
protected final ServerContext context;
protected final ReadOnlyFateStore<FateMetrics<T>> readOnlyFateStore;
protected final long refreshDelay;
+ private final Set<FateExecutor<Manager>> fateExecutors;
+ private MeterRegistry registry;
protected final AtomicLong totalCurrentOpsCount = new AtomicLong(0);
private final EnumMap<TStatus,AtomicLong> txStatusCounters = new
EnumMap<>(TStatus.class);
- public FateMetrics(final ServerContext context, final long
minimumRefreshDelay) {
+ public FateMetrics(final ServerContext context, final long
minimumRefreshDelay,
+ Set<FateExecutor<Manager>> fateExecutors) {
this.context = context;
this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY,
minimumRefreshDelay);
this.readOnlyFateStore =
Objects.requireNonNull(buildReadOnlyStore(context));
+ this.fateExecutors = fateExecutors;
for (TStatus status : TStatus.values()) {
txStatusCounters.put(status, new AtomicLong(0));
@@ -91,10 +98,25 @@ public abstract class FateMetrics<T extends
FateMetricValues> implements Metrics
metricValues.getOpTypeCounters().forEach((name, count) -> Metrics
.gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name),
count));
+
+ // there may have been new fate executors added, so these need to be
registered.
+ // fate executors removed will have their metrics removed from the
registry before they are
+ // removed from the set.
+ if (registry != null) {
+ synchronized (fateExecutors) {
+ fateExecutors.forEach(fe -> {
+ var feMetrics = fe.getFateExecutorMetrics();
+ if (!feMetrics.isRegistered()) {
+ feMetrics.registerMetrics(registry);
+ }
+ });
+ }
+ }
}
@Override
public void registerMetrics(final MeterRegistry registry) {
+ this.registry = registry;
String type = readOnlyFateStore.type().name().toLowerCase();
Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get)
@@ -104,6 +126,10 @@ public abstract class FateMetrics<T extends
FateMetricValues> implements Metrics
.builder(FATE_TX.getName(), counter,
AtomicLong::get).description(FATE_TX.getDescription())
.tags("state", status.name().toLowerCase(), "instanceType",
type).register(registry));
+ synchronized (fateExecutors) {
+ fateExecutors.forEach(fe ->
fe.getFateExecutorMetrics().registerMetrics(registry));
+ }
+
// get fate status is read only operation - no reason to be nice on
shutdown.
ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
.createScheduledExecutorService(1, type + "FateMetricsPoller");
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
index 9d87f9a9cc..44195253ea 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
@@ -21,11 +21,14 @@ package org.apache.accumulo.manager.metrics.fate.meta;
import static org.apache.accumulo.core.metrics.Metric.FATE_ERRORS;
import static org.apache.accumulo.core.metrics.Metric.FATE_OPS_ACTIVITY;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.FateExecutor;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
+import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
@@ -38,8 +41,9 @@ public class MetaFateMetrics extends
FateMetrics<MetaFateMetricValues> {
private final AtomicLong totalOpsGauge = new AtomicLong(0);
private final AtomicLong fateErrorsGauge = new AtomicLong(0);
- public MetaFateMetrics(ServerContext context, long minimumRefreshDelay) {
- super(context, minimumRefreshDelay);
+ public MetaFateMetrics(ServerContext context, long minimumRefreshDelay,
+ Set<FateExecutor<Manager>> fateExecutors) {
+ super(context, minimumRefreshDelay, fateExecutors);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
index 4f1df05762..7fadf8ae27 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java
@@ -18,16 +18,21 @@
*/
package org.apache.accumulo.manager.metrics.fate.user;
+import java.util.Set;
+
+import org.apache.accumulo.core.fate.FateExecutor;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;
import org.apache.accumulo.server.ServerContext;
public class UserFateMetrics extends FateMetrics<UserFateMetricValues> {
- public UserFateMetrics(ServerContext context, long minimumRefreshDelay) {
- super(context, minimumRefreshDelay);
+ public UserFateMetrics(ServerContext context, long minimumRefreshDelay,
+ Set<FateExecutor<Manager>> fateExecutors) {
+ super(context, minimumRefreshDelay, fateExecutors);
}
@Override
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index 2e79fc4729..f022e83f16 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -39,7 +39,7 @@ public class FlakyFate<T> extends Fate<T> {
public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String>
toLogStrFunc,
AccumuloConfiguration conf) {
super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
- for (var poolConfig : getPoolConfigurations(conf).entrySet()) {
+ for (var poolConfig : getPoolConfigurations(conf,
getStore().type()).entrySet()) {
fateExecutors.add(
new FlakyFateExecutor<>(this, environment, poolConfig.getKey(),
poolConfig.getValue()));
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
new file mode 100644
index 0000000000..b413258ce2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateExecutor;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Causes the first step in a Split FATE operation to sleep for
+ * {@link SlowFateSplitManager#SLEEP_TIME_MS}. Split was chosen as it can be
executed on system and
+ * user tables (allowing for testing of meta and user fate ops) and is easy to
avoid automatic
+ * splits from occurring (just don't have tables exceed the split threshold in
testing). This allows
+ * us to sleep for splits we initiate via {@link TableOperation}. This is
useful when we want a fate
+ * thread to be occupied working on some operation. A potential alternative to
this is compacting
+ * with a {@link SlowIterator} attached to the table, however, this will
result in the operation
+ * never being ready ({@link Repo#isReady(FateId, Object)}). So, it would
exist as an operation to
+ * work on, but a thread will briefly reserve it, see it's not isReady, and
unreserve it. This class
+ * is useful when we need a fate op reserved by a thread and being worked on
for a configurable
+ * time, but don't have direct access to the Fate objects/are testing Fate as
it operates within the
+ * Manager instead of directly working with Fate objects.
+ */
+public class SlowFateSplit<T> extends Fate<T> {
+ private static final Logger log =
LoggerFactory.getLogger(SlowFateSplit.class);
+ private boolean haveSlept = false;
+
+ public SlowFateSplit(T environment, FateStore<T> store,
Function<Repo<T>,String> toLogStrFunc,
+ AccumuloConfiguration conf) {
+ super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
+ for (var poolConfig : getPoolConfigurations(conf,
getStore().type()).entrySet()) {
+ fateExecutors.add(
+ new SlowFateSplitExecutor(this, environment, poolConfig.getKey(),
poolConfig.getValue()));
+ }
+ }
+
+ private class SlowFateSplitExecutor extends FateExecutor<T> {
+ private SlowFateSplitExecutor(Fate<T> fate, T environment,
Set<Fate.FateOperation> fateOps,
+ int poolSize) {
+ super(fate, environment, fateOps, poolSize);
+ }
+
+ @Override
+ protected Repo<T> executeCall(FateId fateId, Repo<T> repo) throws
Exception {
+ var next = super.executeCall(fateId, repo);
+ var fateOp = (FateOperation) SlowFateSplit.this.getStore().read(fateId)
+ .getTransactionInfo(TxInfo.FATE_OP);
+ if (fateOp == SlowFateSplitManager.SLOW_OP && !haveSlept) {
+ var sleepTime = SlowFateSplitManager.SLEEP_TIME_MS;
+ log.debug("{} sleeping in {} for {}", fateId,
getClass().getSimpleName(), sleepTime);
+ Thread.sleep(sleepTime);
+ log.debug("{} slept in {} for {}", fateId, getClass().getSimpleName(),
sleepTime);
+ haveSlept = true;
+ }
+ return next;
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java
b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java
new file mode 100644
index 0000000000..ba2c96c69c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplitManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See {@link SlowFateSplit}
+ */
+public class SlowFateSplitManager extends Manager {
+ private static final Logger log =
LoggerFactory.getLogger(SlowFateSplitManager.class);
+ // causes splits to take at least 10 seconds to complete
+ public static final long SLEEP_TIME_MS = 10_000;
+ // important that this is an op that can be initiated on some system table
as well as user tables
+ public static final Fate.FateOperation SLOW_OP =
Fate.FateOperation.TABLE_SPLIT;
+
+ protected SlowFateSplitManager(ConfigOpts opts, String[] args) throws
IOException {
+ super(opts, ServerContext::new, args);
+ }
+
+ @Override
+ protected Fate<Manager> initializeFateInstance(ServerContext context,
FateStore<Manager> store) {
+ log.info("Creating Slow Split Fate for {}", store.type());
+ return new SlowFateSplit<>(this, store, TraceRepo::toLogString,
getConfiguration());
+ }
+
+ public static void main(String[] args) throws Exception {
+ try (SlowFateSplitManager manager = new SlowFateSplitManager(new
ConfigOpts(), args)) {
+ manager.runServer();
+ }
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index a670f58633..ff1859c092 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -23,6 +23,8 @@ import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILURES_CONSECUTIVE;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_FAILURES_TERMINATION;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_MAJC_STUCK;
+import static
org.apache.accumulo.core.metrics.Metric.FATE_OPS_THREADS_INACTIVE;
+import static org.apache.accumulo.core.metrics.Metric.FATE_OPS_THREADS_TOTAL;
import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS;
import static
org.apache.accumulo.core.metrics.Metric.MANAGER_BALANCER_MIGRATIONS_NEEDED;
import static org.apache.accumulo.core.metrics.Metric.SCAN_BUSY_TIMEOUT_COUNT;
@@ -50,6 +52,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.Accumulo;
@@ -62,13 +65,19 @@ import
org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateExecutorMetrics;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FateTestUtil;
+import org.apache.accumulo.test.fate.SlowFateSplitManager;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;
@@ -76,12 +85,17 @@ import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.micrometer.core.instrument.MeterRegistry;
public class MetricsIT extends ConfigurableMacBase implements MetricsProducer {
-
+ private static final Logger log = LoggerFactory.getLogger(MetricsIT.class);
private static TestStatsDSink sink;
+ private static final int numFateThreadsPool1 = 5;
+ private static final int numFateThreadsPool2 = 10;
+ private static final int numFateThreadsPool3 = 15;
@Override
protected Duration defaultTimeout() {
@@ -116,6 +130,16 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
Map<String,String> sysProps =
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
cfg.setSystemProperties(sysProps);
+ // custom config for the fate thread pools.
+ // starting FATE config for each FATE type (USER and META) will be:
+ // {<all fate ops>: numFateThreadsPool1} (one pool for all ops of size
numFateThreadsPool1)
+ var fatePoolsConfig =
FateTestUtil.createTestFateConfig(numFateThreadsPool1);
+ cfg.setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
+ fatePoolsConfig.get(Property.MANAGER_FATE_USER_CONFIG));
+ cfg.setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
+ fatePoolsConfig.get(Property.MANAGER_FATE_META_CONFIG));
+ // Make splits run slowly, used for testing the fate metrics
+ cfg.setServerClass(ServerType.MANAGER, r -> SlowFateSplitManager.class);
}
@Test
@@ -232,12 +256,228 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
cluster.stop();
}
+ @Test
+ public void testFateExecutorMetrics() throws Exception {
+ // Tests metrics for Fate's thread pools. Tests that metrics are seen as
expected, and config
+ // changes to the thread pools are accurately reflected in the metrics.
This includes checking
+ // that old thread pool metrics are removed, new ones are created, size
changes to thread
+ // pools are reflected, and the ops assigned and instance type tags are
seen as expected
+ final String table = getUniqueNames(1)[0];
+
+ // prevent any system initiated fate operations from running, which may
interfere with our
+ // metrics gathering
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+ try {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ client.tableOperations().create(table);
+
+ SortedSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("foo"));
+ // initiate 1 USER fate op which will execute slowly
+ client.tableOperations().addSplits(table, splits);
+ // initiate 1 META fate op which will execute slowly
+ client.tableOperations().addSplits(SystemTables.METADATA.tableName(),
splits);
+
+ // let metrics build up
+ Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() *
3);
+
+ boolean sawExpectedTotalThreadsUserMetric = false;
+ boolean sawExpectedTotalThreadsMetaMetric = false;
+ boolean sawExpectedInactiveUserThreads = false;
+ boolean sawExpectedInactiveMetaThreads = false;
+ // For each FATE instance, should see at least one metric for the
following:
+ // inactive = configured size - num fate ops initiated = configured
size - 1
+ // total = configured size
+ for (var line : sink.getLines()) {
+ TestStatsDSink.Metric metric =
TestStatsDSink.parseStatsDMetric(line);
+ // if the metric is not one of the fate executor metrics...
+ if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+ && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ continue;
+ }
+ var tags = metric.getTags();
+ var instanceType = FateInstanceType
+
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+ var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+ verifyFateMetricTags(opsAssigned, instanceType);
+
+ if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())
+ && numFateThreadsPool1 == Integer.parseInt(metric.getValue())) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedTotalThreadsUserMetric = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedTotalThreadsMetaMetric = true;
+ }
+ } else if
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+ && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 -
1)) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedInactiveUserThreads = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedInactiveMetaThreads = true;
+ }
+ }
+ }
+ assertTrue(sawExpectedInactiveUserThreads);
+ assertTrue(sawExpectedInactiveMetaThreads);
+ assertTrue(sawExpectedTotalThreadsUserMetric);
+ assertTrue(sawExpectedTotalThreadsMetaMetric);
+
+ // Now change the config from:
+ // {<all fate ops>: numFateThreadsPool1}
+ // ->
+ // {<all fate ops except split>: numFateThreadsPool2,
+ // <split operation>: numFateThreadsPool3}
+ changeFateConfig(client, FateInstanceType.USER);
+ changeFateConfig(client, FateInstanceType.META);
+
+ // Allow FATE config changes to be picked up. Will take at most
POOL_WATCHER_DELAY to
+ // commence, provide a buffer to allow to complete.
+ Thread.sleep(Fate.POOL_WATCHER_DELAY.toMillis() + 5_000);
+ // sink metrics, expect from this point onward that we will no longer
see metrics for the
+ // old pool (pool1), only metrics for new pools (pool2 and pool3)
+ sink.getLines();
+
+ // let metrics build back up
+ Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() *
3);
+
+ boolean sawAnyMetricPool1 = false;
+
+ boolean sawExpectedTotalThreadsUserMetricPool2 = false;
+ boolean sawExpectedTotalThreadsMetaMetricPool2 = false;
+ boolean sawExpectedInactiveUserThreadsPool2 = false;
+ boolean sawExpectedInactiveMetaThreadsPool2 = false;
+
+ boolean sawExpectedTotalThreadsUserMetricPool3 = false;
+ boolean sawExpectedTotalThreadsMetaMetricPool3 = false;
+ boolean sawExpectedInactiveUserThreadsPool3 = false;
+ boolean sawExpectedInactiveMetaThreadsPool3 = false;
+ for (var line : sink.getLines()) {
+ TestStatsDSink.Metric metric =
TestStatsDSink.parseStatsDMetric(line);
+ // if the metric is not one of the fate executor metrics...
+ if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+ && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ continue;
+ }
+ var tags = metric.getTags();
+ var instanceType = FateInstanceType
+
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+ var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+ verifyFateMetricTags(opsAssigned, instanceType);
+
+ Set<Fate.FateOperation> fateOpsFromMetric =
gatherFateOpsFromTag(opsAssigned);
+
+ if (fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())
+ ||
fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) {
+ sawAnyMetricPool1 = true;
+ } else if
(fateOpsFromMetric.equals(Arrays.stream(Fate.FateOperation.values())
+ .filter(fo ->
!fo.equals(SlowFateSplitManager.SLOW_OP)).collect(Collectors.toSet()))
+ && numFateThreadsPool2 == Integer.parseInt(metric.getValue())) {
+ // pool2
+ // total = inactive = size pool2
+ if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedInactiveUserThreadsPool2 = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedInactiveMetaThreadsPool2 = true;
+ }
+ } else if
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedTotalThreadsUserMetricPool2 = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedTotalThreadsMetaMetricPool2 = true;
+ }
+ }
+ } else if
(fateOpsFromMetric.equals(Set.of(SlowFateSplitManager.SLOW_OP))
+ && numFateThreadsPool3 == Integer.parseInt(metric.getValue())) {
+ // pool3
+ // total = inactive = size pool3
+ if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedInactiveUserThreadsPool3 = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedInactiveMetaThreadsPool3 = true;
+ }
+ } else if
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+ if (instanceType == FateInstanceType.USER) {
+ sawExpectedTotalThreadsUserMetricPool3 = true;
+ } else if (instanceType == FateInstanceType.META) {
+ sawExpectedTotalThreadsMetaMetricPool3 = true;
+ }
+ }
+ } else {
+ throw new IllegalStateException("Saw unexpected FATE executor
metric: " + metric);
+ }
+ }
+
+ assertFalse(sawAnyMetricPool1);
+ assertTrue(sawExpectedTotalThreadsUserMetricPool2);
+ assertTrue(sawExpectedTotalThreadsMetaMetricPool2);
+ assertTrue(sawExpectedInactiveUserThreadsPool2);
+ assertTrue(sawExpectedInactiveMetaThreadsPool2);
+ assertTrue(sawExpectedTotalThreadsUserMetricPool3);
+ assertTrue(sawExpectedTotalThreadsMetaMetricPool3);
+ assertTrue(sawExpectedInactiveUserThreadsPool3);
+ assertTrue(sawExpectedInactiveMetaThreadsPool3);
+ }
+ } finally {
+ getCluster().getClusterControl().startAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR);
+ }
+ }
+
+ /**
+ * Verifies what should always be true for fate metrics tags: The ops
assigned tag should include
+ * all the ops that are associated with a pool
+ */
+ private void verifyFateMetricTags(String opsAssignedInMetric,
FateInstanceType type) {
+ var opsAssignedInConfig =
+
Fate.getPoolConfigurations(getCluster().getServerContext().getConfiguration(),
type);
+ assertNotNull(opsAssignedInConfig);
+
+ var fateOpsFromMetric = gatherFateOpsFromTag(opsAssignedInMetric);
+
+ assertNotNull(opsAssignedInConfig.get(fateOpsFromMetric));
+ }
+
+ private void changeFateConfig(AccumuloClient client, FateInstanceType type)
throws Exception {
+ Set<Fate.FateOperation> allFateOps = null;
+ if (type == FateInstanceType.USER) {
+ allFateOps = new HashSet<>(Fate.FateOperation.getAllUserFateOps());
+ } else if (type == FateInstanceType.META) {
+ allFateOps = new HashSet<>(Fate.FateOperation.getAllMetaFateOps());
+ }
+ assertNotNull(allFateOps);
+ allFateOps.remove(SlowFateSplitManager.SLOW_OP);
+ String newFateConfig =
+ "{'" +
allFateOps.stream().map(Enum::name).collect(Collectors.joining(",")) + "': "
+ + numFateThreadsPool2 + ",'" + SlowFateSplitManager.SLOW_OP.name()
+ "': "
+ + numFateThreadsPool3 + "}";
+ newFateConfig = newFateConfig.replace("'", "\"");
+
+ if (type == FateInstanceType.USER) {
+
client.instanceOperations().setProperty(Property.MANAGER_FATE_USER_CONFIG.getKey(),
+ newFateConfig);
+ } else if (type == FateInstanceType.META) {
+
client.instanceOperations().setProperty(Property.MANAGER_FATE_META_CONFIG.getKey(),
+ newFateConfig);
+ }
+ }
+
+ private Set<Fate.FateOperation> gatherFateOpsFromTag(String opsAssigned) {
+ String[] ops = opsAssigned.split("\\.");
+ Set<Fate.FateOperation> fateOpsFromMetric = new HashSet<>();
+ for (var op : ops) {
+ fateOpsFromMetric.add(Fate.FateOperation.valueOf(op.toUpperCase()));
+ }
+ return fateOpsFromMetric;
+ }
+
static void doWorkToGenerateMetrics(AccumuloClient client, Class<?>
testClass) throws Exception {
String tableName = testClass.getSimpleName();
client.tableOperations().create(tableName);
- SortedSet<Text> splits = new TreeSet<>(List.of(new Text("5")));
- client.tableOperations().addSplits(tableName, splits);
- Thread.sleep(3_000);
BatchWriterConfig config = new BatchWriterConfig().setMaxMemory(0);
try (BatchWriter writer = client.createBatchWriter(tableName, config)) {
Mutation m = new Mutation("row");
@@ -313,7 +553,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
assertEquals("value2", a.getTags().get("tag2"));
// check the length of the tag value is sane
- final int MAX_EXPECTED_TAG_LEN = 128;
+ final int MAX_EXPECTED_TAG_LEN = 512;
a.getTags().forEach((k, v) -> assertTrue(v.length() <
MAX_EXPECTED_TAG_LEN));
});
}