This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d6eb1df8ee Add metrics for entries read and written during compactions
(#4572)
d6eb1df8ee is described below
commit d6eb1df8ee3155dfbf1fe03548433d6a61566f8f
Author: Dom G <[email protected]>
AuthorDate: Wed May 29 14:35:03 2024 -0400
Add metrics for entries read and written during compactions (#4572)
* Add metrics for compaction entries read and written on compactors and
tservers
* Add IT to make sure these metrics behave as intended.
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../accumulo/core/metrics/MetricsProducer.java | 2 +
.../accumulo/server/compaction/FileCompactor.java | 87 +++++++++++++--
.../org/apache/accumulo/compactor/Compactor.java | 26 ++++-
.../tserver/metrics/TabletServerMetrics.java | 19 ++++
.../compaction/ExternalCompactionProgressIT.java | 122 ++++++++++++++++++++-
5 files changed, 237 insertions(+), 19 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index dd4489b87c..228baa2165 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -583,6 +583,8 @@ public interface MetricsProducer {
String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX +
"majc.stuck";
+ String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX +
"entries.read";
+ String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX +
"entries.written";
String METRICS_FATE_PREFIX = "accumulo.fate.";
String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX +
"ops.in.progress.by.type";
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index df59be89a3..77ec9f1696 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.server.compaction;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -120,8 +122,19 @@ public class FileCompactor implements
Callable<CompactionStats> {
private String currentLocalityGroup = "";
private final long startTime;
- private final AtomicLong entriesRead = new AtomicLong(0);
- private final AtomicLong entriesWritten = new AtomicLong(0);
+ private final AtomicLong currentEntriesRead = new AtomicLong(0);
+ private final AtomicLong currentEntriesWritten = new AtomicLong(0);
+
+ // These track the cumulative count of entries (read and written) that has
been recorded in
+ // the global counts. Their purpose is to avoid double counting of metrics
during the update of
+ // global statistics.
+ private final AtomicLong lastRecordedEntriesRead = new AtomicLong(0);
+ private final AtomicLong lastRecordedEntriesWritten = new AtomicLong(0);
+
+ private static final LongAdder totalEntriesRead = new LongAdder();
+ private static final LongAdder totalEntriesWritten = new LongAdder();
+ private static volatile long lastUpdateTime = 0;
+
private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd
HH:mm:ss.SSS");
// a unique id to identify a compactor
@@ -141,9 +154,61 @@ public class FileCompactor implements
Callable<CompactionStats> {
return currentLocalityGroup;
}
- private void clearStats() {
- entriesRead.set(0);
- entriesWritten.set(0);
+ private void clearCurrentEntryCounts() {
+ currentEntriesRead.set(0);
+ currentEntriesWritten.set(0);
+ }
+
+ private void updateGlobalEntryCounts() {
+ updateTotalEntries(currentEntriesRead, lastRecordedEntriesRead,
totalEntriesRead);
+ updateTotalEntries(currentEntriesWritten, lastRecordedEntriesWritten,
totalEntriesWritten);
+ }
+
+ /**
+ * Updates the total count of entries by adding the difference between the
current count and the
+ * last recorded count to the total.
+ *
+ * @param current The current count of entries
+ * @param recorded The last recorded count of entries
+ * @param total The total count to add the difference to
+ */
+ private void updateTotalEntries(AtomicLong current, AtomicLong recorded,
LongAdder total) {
+ long currentCount = current.get();
+ long lastRecorded =
+ recorded.getAndUpdate(recordedValue -> Math.max(recordedValue,
currentCount));
+ if (lastRecorded < currentCount) {
+ total.add(currentCount - lastRecorded);
+ }
+ }
+
+ /**
+ * @return the total entries written by compactions over the lifetime of
this process.
+ */
+ public static long getTotalEntriesWritten() {
+ updateTotalEntries();
+ return totalEntriesWritten.sum();
+ }
+
+ /**
+ * @return the total entries read by compactions over the lifetime of this
process.
+ */
+ public static long getTotalEntriesRead() {
+ updateTotalEntries();
+ return totalEntriesRead.sum();
+ }
+
+ /**
+ * Updates total entries read and written for all currently running
compactions. Compactions will
+ * update the global stats when they finish. This can be called to update
them sooner. This method
+ * is rate limited, so it will not cause issues if called too frequently.
+ */
+ private static void updateTotalEntries() {
+ long currentTime = System.nanoTime();
+ if (currentTime - lastUpdateTime < Duration.ofMillis(100).toNanos()) {
+ return;
+ }
+ runningCompactions.forEach(FileCompactor::updateGlobalEntryCounts);
+ lastUpdateTime = currentTime;
}
protected static final Set<FileCompactor> runningCompactions =
@@ -211,7 +276,7 @@ public class FileCompactor implements
Callable<CompactionStats> {
String threadStartDate = dateFormatter.format(new Date());
- clearStats();
+ clearCurrentEntryCounts();
String oldThreadName = Thread.currentThread().getName();
String newThreadName =
@@ -298,6 +363,8 @@ public class FileCompactor implements
Callable<CompactionStats> {
runningCompactions.remove(this);
}
+ updateGlobalEntryCounts();
+
try {
if (mfw != null) {
// compaction must not have finished successfully, so close its
output file
@@ -397,7 +464,7 @@ public class FileCompactor implements
Callable<CompactionStats> {
}
CountingIterator citr =
- new CountingIterator(new MultiIterator(iters, extent.toDataRange()),
entriesRead);
+ new CountingIterator(new MultiIterator(iters, extent.toDataRange()),
currentEntriesRead);
SortedKeyValueIterator<Key,Value> delIter =
DeletingIterator.wrap(citr, propagateDeletes,
DeletingIterator.getBehavior(acuTableConf));
ColumnFamilySkippingIterator cfsi = new
ColumnFamilySkippingIterator(delIter);
@@ -425,7 +492,7 @@ public class FileCompactor implements
Callable<CompactionStats> {
if (entriesCompacted % 1024 == 0) {
// Periodically update stats, do not want to do this too often
since its volatile
- entriesWritten.addAndGet(1024);
+ currentEntriesWritten.addAndGet(1024);
}
}
@@ -480,11 +547,11 @@ public class FileCompactor implements
Callable<CompactionStats> {
}
long getEntriesRead() {
- return entriesRead.get();
+ return currentEntriesRead.get();
}
long getEntriesWritten() {
- return entriesWritten.get();
+ return currentEntriesWritten.get();
}
long getStartTime() {
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2b85401516..e5d0c186f3 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -119,6 +119,7 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
@@ -162,8 +163,23 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
queueName = opts.getQueueName();
}
+ private long getTotalEntriesRead() {
+ return FileCompactor.getTotalEntriesRead();
+ }
+
+ private long getTotalEntriesWritten() {
+ return FileCompactor.getTotalEntriesWritten();
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
+ FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this,
Compactor::getTotalEntriesRead)
+ .description("Number of entries read by all compactions that have run
on this compactor")
+ .register(registry);
+ FunctionCounter
+ .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this,
Compactor::getTotalEntriesWritten)
+ .description("Number of entries written by all compactions that have
run on this compactor")
+ .register(registry);
LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
.description("Number and duration of stuck major
compactions").register(registry);
CompactionWatcher.setTimer(timer);
@@ -690,20 +706,20 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
// Compaction has started. There should only be one in the list
CompactionInfo info = running.get(0);
if (info != null) {
+ final long entriesRead = info.getEntriesRead();
+ final long entriesWritten = info.getEntriesWritten();
if (inputEntries > 0) {
- percentComplete =
- Float.toString((info.getEntriesRead() / (float)
inputEntries) * 100);
+ percentComplete = Float.toString((entriesRead / (float)
inputEntries) * 100);
}
String message = String.format(
"Compaction in progress, read %d of %d input entries ( %s
%s ), written %d entries",
- info.getEntriesRead(), inputEntries, percentComplete, "%",
- info.getEntriesWritten());
+ entriesRead, inputEntries, percentComplete, "%",
entriesWritten);
watcher.run();
try {
LOG.debug("Updating coordinator with compaction progress:
{}.", message);
TCompactionStatusUpdate update =
new
TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message,
- inputEntries, info.getEntriesRead(),
info.getEntriesWritten());
+ inputEntries, entriesRead, entriesWritten);
updateCompactionState(job, update);
} catch (RetriesExceededException e) {
LOG.warn("Error updating coordinator with compaction
progress, error: {}",
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
index 0695181c87..70b0c4980b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
@@ -20,8 +20,10 @@ package org.apache.accumulo.tserver.metrics;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.server.compaction.CompactionWatcher;
+import org.apache.accumulo.server.compaction.FileCompactor;
import org.apache.accumulo.tserver.TabletServer;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
@@ -34,8 +36,25 @@ public class TabletServerMetrics implements MetricsProducer {
util = new TabletServerMetricsUtil(tserver);
}
+ private long getTotalEntriesRead() {
+ return FileCompactor.getTotalEntriesRead();
+ }
+
+ private long getTotalEntriesWritten() {
+ return FileCompactor.getTotalEntriesWritten();
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
+ FunctionCounter
+ .builder(METRICS_COMPACTOR_ENTRIES_READ, this,
TabletServerMetrics::getTotalEntriesRead)
+ .description("Number of entries read by all compactions that have run
on this tserver")
+ .register(registry);
+ FunctionCounter
+ .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this,
+ TabletServerMetrics::getTotalEntriesWritten)
+ .description("Number of entries written by all compactions that have
run on this tserver")
+ .register(registry);
LongTaskTimer timer = LongTaskTimer.builder(METRICS_TSERVER_MAJC_STUCK)
.description("Number and duration of stuck major
compactions").register(registry);
CompactionWatcher.setTimer(timer);
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index a9afc4fd43..0cbbc3a97a 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.coordinator.CompactionCoordinator;
@@ -49,14 +50,21 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,11 +87,117 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
Map<String,RunningCompactionInfo> runningMap = new HashMap<>();
List<EC_PROGRESS> progressList = new ArrayList<>();
- private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
+ private static final AtomicBoolean stopCheckerThread = new
AtomicBoolean(false);
+ private static TestStatsDSink sink;
+
+ @BeforeAll
+ public static void before() throws Exception {
+ sink = new TestStatsDSink();
+ }
+
+ @AfterAll
+ public static void after() throws Exception {
+ if (sink != null) {
+ sink.close();
+ }
+ }
+
+ @BeforeEach
+ public void setup() {
+ stopCheckerThread.set(false);
+ }
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
TestStatsDRegistryFactory.class.getName());
+ Map<String,String> sysProps =
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+ TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
+ cfg.setSystemProperties(sysProps);
+ }
+
+ @Test
+ public void testProgressViaMetrics() throws Exception {
+ String table = this.getUniqueNames(1)[0];
+
+ try (AccumuloClient client =
+ Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
+ createTable(client, table, "cs1");
+ writeData(client, table, ROWS);
+
+ cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
+
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
+
+ final AtomicLong expectedEntriesRead = new AtomicLong(9216);
+ final AtomicLong expectedEntriesWritten = new AtomicLong(4096);
+ final AtomicLong totalEntriesRead = new AtomicLong(0);
+ final AtomicLong totalEntriesWritten = new AtomicLong(0);
+
+ Thread checkerThread = getMetricsCheckerThread(totalEntriesRead,
totalEntriesWritten);
+ checkerThread.start();
+
+ IteratorSetting setting = new IteratorSetting(50, "Slow",
SlowIterator.class);
+ SlowIterator.setSleepTime(setting, 1);
+ client.tableOperations().attachIterator(table, setting,
+ EnumSet.of(IteratorUtil.IteratorScope.majc));
+ log.info("Compacting table");
+ compact(client, table, 2, QUEUE1, true);
+ log.info("Done Compacting table");
+ verify(client, table, 2, ROWS);
+
+ Wait.waitFor(() -> {
+ if (totalEntriesRead.get() == expectedEntriesRead.get()
+ && totalEntriesWritten.get() == expectedEntriesWritten.get()) {
+ return true;
+ }
+ log.info(
+ "Waiting for entries read to be {} (currently {}) and entries
written to be {} (currently {})",
+ expectedEntriesRead.get(), totalEntriesRead.get(),
expectedEntriesWritten.get(),
+ totalEntriesWritten.get());
+ return false;
+ }, 30000, 3000, "Entries read and written metrics values did not match
expected values");
+
+ stopCheckerThread.set(true);
+ checkerThread.join();
+ }
+ }
+
+ /**
+ * Get a thread that checks the metrics for entries read and written.
+ *
+ * @param totalEntriesRead this is set to the value of the entries read
metric
+ * @param totalEntriesWritten this is set to the value of the entries
written metric
+ */
+ private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
+ AtomicLong totalEntriesWritten) {
+ return Threads.createThread("metric-tailer", () -> {
+ log.info("Starting metric tailer");
+
+ sink.getLines().clear();
+
+ while (!stopCheckerThread.get()) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (stopCheckerThread.get()) {
+ break;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) {
+ TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+ int value = Integer.parseInt(e.getValue());
+ totalEntriesRead.addAndGet(value);
+ log.info("Found entries.read metric: {} with value: {}",
e.getName(), value);
+ } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) {
+ TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+ int value = Integer.parseInt(e.getValue());
+ totalEntriesWritten.addAndGet(value);
+ log.info("Found entries.written metric: {} with value: {}",
e.getName(), value);
+ }
+ }
+ sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
+ }
+ log.info("Metric tailer thread finished");
+ });
}
@Test
@@ -109,7 +223,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
verify(client, table1, 2, ROWS);
log.info("Done Compacting table");
- compactionFinished.set(true);
+ stopCheckerThread.set(true);
checkerThread.join();
verifyProgress();
@@ -164,7 +278,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
client.tableOperations().compact(tableName2,
new CompactionConfig().setWait(true).setIterators(List.of(setting)));
log.info("Finished compacting table " + tableName2);
- compactionFinished.set(true);
+ stopCheckerThread.set(true);
log.info("Waiting on progress checker thread");
checkerThread.join();
@@ -194,7 +308,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
public Thread startChecker() {
return Threads.createThread("RC checker", () -> {
try {
- while (!compactionFinished.get()) {
+ while (!stopCheckerThread.get()) {
checkRunning();
sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
}