This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 02193118d4 Improvements to FateMetrics (#4924)
02193118d4 is described below
commit 02193118d4bf39e43bcafd66ff8fa858bfcd0c74
Author: Dom G. <[email protected]>
AuthorDate: Wed Sep 25 15:07:42 2024 -0400
Improvements to FateMetrics (#4924)
* Improvements to FateMetrics
---
.../manager/metrics/fate/FateMetricValues.java | 26 +++---
.../accumulo/manager/metrics/fate/FateMetrics.java | 102 +++++++--------------
2 files changed, 47 insertions(+), 81 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
index 57a561aa7b..702eaa7e56 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
@@ -19,12 +19,14 @@
package org.apache.accumulo.manager.metrics.fate;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.ReadOnlyTStore;
+import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -44,12 +46,12 @@ class FateMetricValues {
private final long zkFateChildOpsTotal;
private final long zkConnectionErrors;
- private final Map<String,Long> txStateCounters;
+ private final EnumMap<TStatus,Long> txStateCounters;
private final Map<String,Long> opTypeCounters;
private FateMetricValues(final long updateTime, final long currentFateOps,
final long zkFateChildOpsTotal, final long zkConnectionErrors,
- final Map<String,Long> txStateCounters, final Map<String,Long>
opTypeCounters) {
+ final EnumMap<TStatus,Long> txStateCounters, final Map<String,Long>
opTypeCounters) {
this.updateTime = updateTime;
this.currentFateOps = currentFateOps;
this.zkFateChildOpsTotal = zkFateChildOpsTotal;
@@ -75,7 +77,7 @@ class FateMetricValues {
*
* @return a map of transaction status counters.
*/
- Map<String,Long> getTxStateCounters() {
+ EnumMap<TStatus,Long> getTxStateCounters() {
return txStateCounters;
}
@@ -115,9 +117,9 @@ class FateMetricValues {
builder.withCurrentFateOps(currFates.size());
// states are enumerated - create new map with counts initialized to 0.
- Map<String,Long> states = new TreeMap<>();
- for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) {
- states.put(t.name(), 0L);
+ EnumMap<TStatus,Long> states = new EnumMap<>(TStatus.class);
+ for (TStatus t : TStatus.values()) {
+ states.put(t, 0L);
}
// op types are dynamic, no count initialization needed - clearing prev
values will
@@ -126,7 +128,7 @@ class FateMetricValues {
for (AdminUtil.TransactionStatus tx : currFates) {
- String stateName = tx.getStatus().name();
+ TStatus stateName = tx.getStatus();
// incr count for state
states.merge(stateName, 1L, Long::sum);
@@ -182,15 +184,15 @@ class FateMetricValues {
private long zkFateChildOpsTotal = 0;
private long zkConnectionErrors = 0;
- private final Map<String,Long> txStateCounters;
+ private final EnumMap<TStatus,Long> txStateCounters;
private Map<String,Long> opTypeCounters;
Builder() {
// states are enumerated - create new map with counts initialized to 0.
- txStateCounters = new TreeMap<>();
- for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) {
- txStateCounters.put(t.name(), 0L);
+ txStateCounters = new EnumMap<>(TStatus.class);
+ for (TStatus t : TStatus.values()) {
+ txStateCounters.put(t, 0L);
}
opTypeCounters = Collections.emptyMap();
@@ -216,7 +218,7 @@ class FateMetricValues {
return this;
}
- Builder withTxStateCounters(final Map<String,Long> txStateCounters) {
+ Builder withTxStateCounters(final EnumMap<TStatus,Long> txStateCounters) {
this.txStateCounters.putAll(txStateCounters);
return this;
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index ebbbec4316..18e376e48c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -24,7 +24,7 @@ import static
org.apache.accumulo.core.metrics.Metric.FATE_OPS_ACTIVITY;
import static org.apache.accumulo.core.metrics.Metric.FATE_TX;
import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS;
-import java.util.List;
+import java.util.EnumMap;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.ReadOnlyTStore;
+import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.fate.ZooStore;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -41,9 +42,9 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
-import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
public class FateMetrics implements MetricsProducer {
@@ -60,16 +61,10 @@ public class FateMetrics implements MetricsProducer {
private final String fateRootPath;
private final long refreshDelay;
- private AtomicLong totalCurrentOpsGauge;
- private AtomicLong totalOpsGauge;
- private AtomicLong fateErrorsGauge;
- private AtomicLong newTxGauge;
- private AtomicLong submittedTxGauge;
- private AtomicLong inProgressTxGauge;
- private AtomicLong failedInProgressTxGauge;
- private AtomicLong failedTxGauge;
- private AtomicLong successfulTxGauge;
- private AtomicLong unknownTxGauge;
+ private final AtomicLong totalCurrentOpsCount = new AtomicLong(0);
+ private final AtomicLong totalOpsCount = new AtomicLong(0);
+ private final AtomicLong fateErrorsCount = new AtomicLong(0);
+ private final EnumMap<TStatus,AtomicLong> txStatusCounters = new
EnumMap<>(TStatus.class);
public FateMetrics(final ServerContext context, final long
minimumRefreshDelay) {
@@ -88,6 +83,10 @@ public class FateMetrics implements MetricsProducer {
"FATE Metrics - Interrupt received while initializing zoo store");
}
+ for (TStatus status : TStatus.values()) {
+ txStatusCounters.put(status, new AtomicLong(0));
+ }
+
}
private void update() {
@@ -95,70 +94,35 @@ public class FateMetrics implements MetricsProducer {
FateMetricValues metricValues =
FateMetricValues.getFromZooKeeper(context, fateRootPath, zooStore);
- totalCurrentOpsGauge.set(metricValues.getCurrentFateOps());
- totalOpsGauge.set(metricValues.getZkFateChildOpsTotal());
- fateErrorsGauge.set(metricValues.getZkConnectionErrors());
-
- for (Entry<String,Long> vals :
metricValues.getTxStateCounters().entrySet()) {
- switch (ReadOnlyTStore.TStatus.valueOf(vals.getKey())) {
- case NEW:
- newTxGauge.set(vals.getValue());
- break;
- case SUBMITTED:
- submittedTxGauge.set(vals.getValue());
- break;
- case IN_PROGRESS:
- inProgressTxGauge.set(vals.getValue());
- break;
- case FAILED_IN_PROGRESS:
- failedInProgressTxGauge.set(vals.getValue());
- break;
- case FAILED:
- failedTxGauge.set(vals.getValue());
- break;
- case SUCCESSFUL:
- successfulTxGauge.set(vals.getValue());
- break;
- case UNKNOWN:
- unknownTxGauge.set(vals.getValue());
- break;
- default:
- log.warn("Unhandled status type: {}", vals.getKey());
+ totalCurrentOpsCount.set(metricValues.getCurrentFateOps());
+ totalOpsCount.set(metricValues.getZkFateChildOpsTotal());
+ fateErrorsCount.set(metricValues.getZkConnectionErrors());
+
+ for (Entry<TStatus,Long> entry :
metricValues.getTxStateCounters().entrySet()) {
+ AtomicLong counter = txStatusCounters.get(entry.getKey());
+ if (counter != null) {
+ counter.set(entry.getValue());
+ } else {
+ log.warn("Unhandled TStatus: {}", entry.getKey());
}
}
- metricValues.getOpTypeCounters().forEach((name, count) -> {
- Metrics.gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG,
name), count);
- });
+ metricValues.getOpTypeCounters().forEach((name, count) -> Metrics
+ .gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name),
count));
}
@Override
public void registerMetrics(final MeterRegistry registry) {
- totalCurrentOpsGauge = registry.gauge(FATE_OPS.getName(), new
AtomicLong(0));
- totalOpsGauge = registry.gauge(FATE_OPS_ACTIVITY.getName(), new
AtomicLong(0));
- fateErrorsGauge = registry.gauge(FATE_ERRORS.getName(),
- List.of(Tag.of("type", "zk.connection")), new AtomicLong(0));
- newTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.NEW.name().toLowerCase())),
- new AtomicLong(0));
- submittedTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())),
- new AtomicLong(0));
- inProgressTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())),
- new AtomicLong(0));
- failedInProgressTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())),
- new AtomicLong(0));
- failedTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())),
- new AtomicLong(0));
- successfulTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())),
- new AtomicLong(0));
- unknownTxGauge = registry.gauge(FATE_TX.getName(),
- List.of(Tag.of("state",
ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())),
- new AtomicLong(0));
+ Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get)
+ .description(FATE_OPS.getDescription()).register(registry);
+ Gauge.builder(FATE_OPS_ACTIVITY.getName(), totalOpsCount, AtomicLong::get)
+ .description(FATE_OPS_ACTIVITY.getDescription()).register(registry);
+ Gauge.builder(FATE_ERRORS.getName(), fateErrorsCount, AtomicLong::get)
+ .description(FATE_ERRORS.getDescription()).tags("type",
"zk.connection").register(registry);
+
+ txStatusCounters.forEach((status, counter) -> Gauge
+ .builder(FATE_TX.getName(), counter,
AtomicLong::get).description(FATE_TX.getDescription())
+ .tags("state", status.name().toLowerCase()).register(registry));
update();