This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 1261ec2472 Convert micrometer Counters to FunctionCounters (#4555)
1261ec2472 is described below
commit 1261ec24726f65fca3b7314d00c17c062a8dfd6c
Author: Dom G <[email protected]>
AuthorDate: Fri May 17 14:08:21 2024 -0400
Convert micrometer Counters to FunctionCounters (#4555)
---
.../server/compaction/PausedCompactionMetrics.java | 20 +++---
.../apache/accumulo/tserver/ScanServerMetrics.java | 10 +--
.../accumulo/tserver/ThriftScanClientHandler.java | 16 ++---
.../tserver/metrics/TabletServerScanMetrics.java | 84 +++++++++-------------
.../tserver/metrics/TabletServerUpdateMetrics.java | 27 ++++---
.../apache/accumulo/tserver/tablet/TabletBase.java | 2 +-
6 files changed, 75 insertions(+), 84 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
index 3d1a07be42..29e6da06d0 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
@@ -18,30 +18,32 @@
*/
package org.apache.accumulo.server.compaction;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.accumulo.core.metrics.MetricsProducer;
-import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.MeterRegistry;
public class PausedCompactionMetrics implements MetricsProducer {
- private Counter majcPauses;
- private Counter mincPauses;
+ private final AtomicLong majcPauseCount = new AtomicLong(0);
+ private final AtomicLong mincPauseCount = new AtomicLong(0);
public void incrementMinCPause() {
- mincPauses.increment();
+ mincPauseCount.incrementAndGet();
}
public void incrementMajCPause() {
- majcPauses.increment();
+ majcPauseCount.incrementAndGet();
}
@Override
public void registerMetrics(MeterRegistry registry) {
- majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major
compaction pause count")
- .register(registry);
- mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor
compactor pause count")
- .register(registry);
+ FunctionCounter.builder(METRICS_MAJC_PAUSED, majcPauseCount,
AtomicLong::get)
+ .description("major compaction pause count").register(registry);
+ FunctionCounter.builder(METRICS_MINC_PAUSED, mincPauseCount,
AtomicLong::get)
+ .description("minor compactor pause count").register(registry);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java
index 1a516b597b..37ddfab028 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java
@@ -18,13 +18,15 @@
*/
package org.apache.accumulo.tserver;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metrics.MetricsProducer;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
@@ -32,7 +34,7 @@ import
io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
public class ScanServerMetrics implements MetricsProducer {
private Timer reservationTimer;
- private Counter busyTimeoutCount;
+ private final AtomicLong busyTimeoutCount = new AtomicLong(0);
private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
@@ -44,7 +46,7 @@ public class ScanServerMetrics implements MetricsProducer {
public void registerMetrics(MeterRegistry registry) {
reservationTimer =
Timer.builder(MetricsProducer.METRICS_SCAN_RESERVATION_TIMER)
.description("Time to reserve a tablets files for
scan").register(registry);
- busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER)
+ FunctionCounter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER,
busyTimeoutCount, AtomicLong::get)
.description("The number of scans where a busy timeout
happened").register(registry);
CaffeineCacheMetrics.monitor(registry, tabletMetadataCache,
METRICS_SCAN_TABLET_METADATA_CACHE);
}
@@ -54,6 +56,6 @@ public class ScanServerMetrics implements MetricsProducer {
}
public void incrementBusy() {
- busyTimeoutCount.increment();
+ busyTimeoutCount.incrementAndGet();
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index a89af6617d..71ea20e62d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -159,7 +159,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
ThriftSecurityException,
org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException,
TSampleNotPresentException, ScanServerBusyException {
- server.getScanMetrics().incrementStartScan(1.0D);
+ server.getScanMetrics().incrementStartScan();
TableId tableId = extent.tableId();
NamespaceId namespaceId;
@@ -250,7 +250,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException,
TSampleNotPresentException,
ScanServerBusyException {
- server.getScanMetrics().incrementContinueScan(1.0D);
+ server.getScanMetrics().incrementContinueScan();
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(server, scanID,
scanSession.interruptFlag);
@@ -284,7 +284,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
server.getSessionManager().removeSession(scanID);
TabletBase tablet =
scanSession.getTabletResolver().getTablet(scanSession.extent);
if (busyTimeout > 0) {
- server.getScanMetrics().incrementBusy(1.0D);
+ server.getScanMetrics().incrementBusy();
throw new ScanServerBusyException();
} else if (tablet == null || tablet.isClosed()) {
throw new NotServingTabletException(scanSession.extent.toThrift());
@@ -326,7 +326,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
@Override
public void closeScan(TInfo tinfo, long scanID) {
- server.getScanMetrics().incrementCloseScan(1.0D);
+ server.getScanMetrics().incrementCloseScan();
final SingleScanSession ss =
(SingleScanSession) server.getSessionManager().removeSession(scanID);
@@ -378,7 +378,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
long busyTimeout)
throws ThriftSecurityException, TSampleNotPresentException,
ScanServerBusyException {
- server.getScanMetrics().incrementStartScan(1.0D);
+ server.getScanMetrics().incrementStartScan();
// find all of the tables that need to be scanned
final HashSet<TableId> tables = new HashSet<>();
@@ -470,7 +470,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
private MultiScanResult continueMultiScan(long scanID, MultiScanSession
session, long busyTimeout)
throws TSampleNotPresentException, ScanServerBusyException {
- server.getScanMetrics().incrementContinueScan(1.0D);
+ server.getScanMetrics().incrementContinueScan();
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(server, scanID);
@@ -495,7 +495,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
} catch (CancellationException ce) {
server.getSessionManager().removeSession(scanID);
if (busyTimeout > 0) {
- server.getScanMetrics().incrementBusy(1.0D);
+ server.getScanMetrics().incrementBusy();
throw new ScanServerBusyException();
} else {
log.warn("Failed to get multiscan result", ce);
@@ -518,7 +518,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws
NoSuchScanIDException {
- server.getScanMetrics().incrementCloseScan(1.0D);
+ server.getScanMetrics().incrementCloseScan();
MultiScanSession session = (MultiScanSession)
server.getSessionManager().removeSession(scanID);
if (session == null) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index 8cc135369e..5ca15c5c9b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -20,13 +20,14 @@ package org.apache.accumulo.tserver.metrics;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.server.metrics.NoOpDistributionSummary;
-import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -37,50 +38,34 @@ public class TabletServerScanMetrics implements
MetricsProducer {
private Timer scans;
private DistributionSummary resultsPerScan = new NoOpDistributionSummary();
private DistributionSummary yields = new NoOpDistributionSummary();
- private Counter startScanCalls;
- private Counter continueScanCalls;
- private Counter closeScanCalls;
- private Counter busyTimeoutCount;
- private Counter pausedForMemory;
- private Counter earlyReturnForMemory;
+ private final AtomicLong startScanCalls = new AtomicLong(0);
+ private final AtomicLong continueScanCalls = new AtomicLong(0);
+ private final AtomicLong closeScanCalls = new AtomicLong(0);
+ private final AtomicLong busyTimeoutCount = new AtomicLong(0);
+ private final AtomicLong pausedForMemory = new AtomicLong(0);
+ private final AtomicLong earlyReturnForMemory = new AtomicLong(0);
private final LongAdder lookupCount = new LongAdder();
private final LongAdder queryResultCount = new LongAdder();
private final LongAdder queryResultBytes = new LongAdder();
private final LongAdder scannedCount = new LongAdder();
- public void incrementLookupCount(long amount) {
- this.lookupCount.add(amount);
- }
-
- public long getLookupCount() {
- return this.lookupCount.sum();
+ public void incrementLookupCount() {
+ this.lookupCount.increment();
}
public void incrementQueryResultCount(long amount) {
this.queryResultCount.add(amount);
}
- public long getQueryResultCount() {
- return this.queryResultCount.sum();
- }
-
public void incrementQueryResultBytes(long amount) {
this.queryResultBytes.add(amount);
}
- public long getQueryByteCount() {
- return this.queryResultBytes.sum();
- }
-
public LongAdder getScannedCounter() {
return this.scannedCount;
}
- public long getScannedCount() {
- return this.scannedCount.sum();
- }
-
public void addScan(long value) {
scans.record(Duration.ofMillis(value));
}
@@ -101,28 +86,28 @@ public class TabletServerScanMetrics implements
MetricsProducer {
openFiles.addAndGet(delta < 0 ? delta : delta * -1);
}
- public void incrementStartScan(double value) {
- startScanCalls.increment(value);
+ public void incrementStartScan() {
+ startScanCalls.incrementAndGet();
}
- public void incrementContinueScan(double value) {
- continueScanCalls.increment(value);
+ public void incrementContinueScan() {
+ continueScanCalls.incrementAndGet();
}
- public void incrementCloseScan(double value) {
- closeScanCalls.increment(value);
+ public void incrementCloseScan() {
+ closeScanCalls.incrementAndGet();
}
- public void incrementBusy(double value) {
- busyTimeoutCount.increment(value);
+ public void incrementBusy() {
+ busyTimeoutCount.incrementAndGet();
}
public void incrementScanPausedForLowMemory() {
- pausedForMemory.increment();
+ pausedForMemory.incrementAndGet();
}
public void incrementEarlyReturnForLowMemory() {
- earlyReturnForMemory.increment();
+ earlyReturnForMemory.incrementAndGet();
}
@Override
@@ -134,31 +119,28 @@ public class TabletServerScanMetrics implements
MetricsProducer {
.description("Results per scan").register(registry);
yields =
DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry);
- startScanCalls = Counter.builder(METRICS_SCAN_START)
+ FunctionCounter.builder(METRICS_SCAN_START, this.startScanCalls,
AtomicLong::get)
.description("calls to start a scan / multiscan").register(registry);
- continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE)
+ FunctionCounter.builder(METRICS_SCAN_CONTINUE, this.continueScanCalls,
AtomicLong::get)
.description("calls to continue a scan /
multiscan").register(registry);
- closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE)
+ FunctionCounter.builder(METRICS_SCAN_CLOSE, this.closeScanCalls,
AtomicLong::get)
.description("calls to close a scan / multiscan").register(registry);
- busyTimeoutCount = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER)
+ FunctionCounter
+ .builder(METRICS_SCAN_BUSY_TIMEOUT_COUNTER, this.busyTimeoutCount,
AtomicLong::get)
.description("The number of scans where a busy timeout
happened").register(registry);
- Gauge.builder(METRICS_SCAN_QUERIES, this,
TabletServerScanMetrics::getLookupCount)
+ FunctionCounter.builder(METRICS_SCAN_QUERIES, this.lookupCount,
LongAdder::sum)
.description("Number of queries").register(registry);
- Gauge
- .builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this,
- TabletServerScanMetrics::getQueryResultCount)
- .description("Query rate (entries/sec)").register(registry);
- Gauge
- .builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES, this,
- TabletServerScanMetrics::getQueryByteCount)
- .description("Query rate (bytes/sec)").register(registry);
- Gauge.builder(METRICS_SCAN_SCANNED_ENTRIES, this,
TabletServerScanMetrics::getScannedCount)
+ FunctionCounter.builder(METRICS_SCAN_SCANNED_ENTRIES, this.scannedCount,
LongAdder::sum)
.description("Scanned rate").register(registry);
- pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM)
+ FunctionCounter.builder(METRICS_SCAN_PAUSED_FOR_MEM, this.pausedForMemory,
AtomicLong::get)
.description("scan paused due to server being low on
memory").register(registry);
- earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM)
+ FunctionCounter.builder(METRICS_SCAN_RETURN_FOR_MEM,
this.earlyReturnForMemory, AtomicLong::get)
.description("scan returned results early due to server being low on
memory")
.register(registry);
+ Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS, this.queryResultCount,
LongAdder::sum)
+ .description("Query rate (entries/sec)").register(registry);
+ Gauge.builder(METRICS_SCAN_QUERY_SCAN_RESULTS_BYTES,
this.queryResultBytes, LongAdder::sum)
+ .description("Query rate (bytes/sec)").register(registry);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
index 09ad197b6d..6dbce396b1 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java
@@ -19,35 +19,36 @@
package org.apache.accumulo.tserver.metrics;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.server.metrics.NoOpDistributionSummary;
-import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
public class TabletServerUpdateMetrics implements MetricsProducer {
- private Counter permissionErrorsCounter;
- private Counter unknownTabletErrorsCounter;
- private Counter constraintViolationsCounter;
+ private final AtomicLong permissionErrorsCount = new AtomicLong();
+ private final AtomicLong unknownTabletErrorsCount = new AtomicLong();
+ private final AtomicLong constraintViolationsCount = new AtomicLong();
private Timer commitPrepStat;
private Timer walogWriteTimeStat;
private Timer commitTimeStat;
private DistributionSummary mutationArraySizeStat = new
NoOpDistributionSummary();
public void addPermissionErrors(long value) {
- permissionErrorsCounter.increment(value);
+ permissionErrorsCount.addAndGet(value);
}
public void addUnknownTabletErrors(long value) {
- unknownTabletErrorsCounter.increment(value);
+ unknownTabletErrorsCount.addAndGet(value);
}
public void addConstraintViolations(long value) {
- constraintViolationsCounter.increment(value);
+ constraintViolationsCount.addAndGet(value);
}
public void addCommitPrep(long value) {
@@ -68,10 +69,14 @@ public class TabletServerUpdateMetrics implements
MetricsProducer {
@Override
public void registerMetrics(MeterRegistry registry) {
- permissionErrorsCounter = registry.counter(METRICS_UPDATE_ERRORS, "type",
"permission");
- unknownTabletErrorsCounter = registry.counter(METRICS_UPDATE_ERRORS,
"type", "unknown.tablet");
- constraintViolationsCounter =
- registry.counter(METRICS_UPDATE_ERRORS, "type",
"constraint.violation");
+ FunctionCounter.builder(METRICS_UPDATE_ERRORS, permissionErrorsCount,
AtomicLong::get)
+ .tags("type", "permission").description("Counts permission
errors").register(registry);
+ FunctionCounter.builder(METRICS_UPDATE_ERRORS, unknownTabletErrorsCount,
AtomicLong::get)
+ .tags("type", "unknown.tablet").description("Counts unknown tablet
errors")
+ .register(registry);
+ FunctionCounter.builder(METRICS_UPDATE_ERRORS, constraintViolationsCount,
AtomicLong::get)
+ .tags("type", "constraint.violation").description("Counts constraint
violations")
+ .register(registry);
commitPrepStat = Timer.builder(METRICS_UPDATE_COMMIT_PREP)
.description("preparing to commit mutations").register(registry);
walogWriteTimeStat = Timer.builder(METRICS_UPDATE_WALOG_WRITE)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index dd23c5a12b..9b0545a5c7 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -208,7 +208,7 @@ public abstract class TabletBase {
try {
SortedKeyValueIterator<Key,Value> iter = new
SourceSwitchingIterator(dataSource);
this.lookupCount.incrementAndGet();
- this.server.getScanMetrics().incrementLookupCount(1);
+ this.server.getScanMetrics().incrementLookupCount();
result = lookup(iter, ranges, results, scanParams, maxResultSize);
return result;
} catch (IOException | RuntimeException e) {