This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new ffbbca7de6 backport server idle metric to 2.1 (#4716)
ffbbca7de6 is described below
commit ffbbca7de6bf45c84788d185180774cb2b882698
Author: Dom G <[email protected]>
AuthorDate: Tue Jul 9 15:41:13 2024 -0400
backport server idle metric to 2.1 (#4716)
* Backport changes from idle stop PR #4078
* Convert idle metric from Counter to Gauge
* Remove the COMPACTOR_BUSY metric as a duplicate of this one
---------
Co-authored-by: Dave Marion <[email protected]>
---
.../org/apache/accumulo/core/conf/Property.java | 3 +
.../accumulo/core/metrics/MetricsProducer.java | 18 +--
.../org/apache/accumulo/server/AbstractServer.java | 44 ++++++-
.../accumulo/server/metrics/ProcessMetrics.java | 43 +++++++
.../org/apache/accumulo/compactor/Compactor.java | 20 ++--
.../apache/accumulo/compactor/CompactorTest.java | 3 +
.../org/apache/accumulo/tserver/ScanServer.java | 4 +-
.../org/apache/accumulo/tserver/TabletServer.java | 11 +-
.../compaction/ExternalCompactionProgressIT.java | 23 +---
.../test/functional/IdleProcessMetricsIT.java | 130 +++++++++++++++++++++
.../apache/accumulo/test/metrics/MetricsIT.java | 2 +-
11 files changed, 258 insertions(+), 43 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fe4b8f4c18..629585058a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -294,6 +294,9 @@ public enum Property {
GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval",
"1d",
PropertyType.TIMEDURATION, "The length of time between generation of new
secret keys.",
"1.7.0"),
+ GENERAL_IDLE_PROCESS_INTERVAL("general.metrics.process.idle", "5m",
PropertyType.TIMEDURATION,
+ "Amount of time a process must be idle before it is considered to be
idle by the metrics system.",
+ "2.1.3"),
GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s",
PropertyType.TIMEDURATION,
"The maximum amount of time that a Scanner should wait before retrying a
failed RPC.",
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 3fdb1a4309..8cf2ffc956 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -44,6 +44,13 @@ import io.micrometer.core.instrument.MeterRegistry;
* <th>Micrometer Type</th>
* <th>Notes</th>
* </tr>
+ * <!-- general server metrics -->
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_SERVER_IDLE}</td>
+ * <td>Gauge</td>
+ * <td>Indicates if the server is idle or not. The value will be 1 when idle
and 0 when not idle.
* <!-- compactor -->
* <tr>
* <td>N/A</td>
@@ -66,14 +73,6 @@ import io.micrometer.core.instrument.MeterRegistry;
* <td>FunctionCounter</td>
* <td>Number of entries written by all threads performing compactions</td>
* </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_COMPACTOR_BUSY}</td>
- * <td>Gauge</td>
- * <td>Indicates if the compactor is busy or not. The value will be 0 when
idle and 1 when
- * busy.</td>
- * </tr>
* <!-- fate -->
* <tr>
* <td>currentFateOps</td>
@@ -605,11 +604,12 @@ public interface MetricsProducer {
Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
+ String METRICS_SERVER_IDLE = "accumulo.server.idle";
+
String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX +
"majc.stuck";
String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX +
"entries.read";
String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX +
"entries.written";
- String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
String METRICS_FATE_PREFIX = "accumulo.fate.";
String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX +
"ops.in.progress.by.type";
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index ac1bcaab90..af679c13d3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -19,22 +19,32 @@
package org.apache.accumulo.server;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.server.metrics.ProcessMetrics;
import org.apache.accumulo.server.security.SecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractServer implements AutoCloseable, Runnable {
+import io.micrometer.core.instrument.MeterRegistry;
+
+public abstract class AbstractServer implements AutoCloseable,
MetricsProducer, Runnable {
private final ServerContext context;
protected final String applicationName;
private final String hostname;
private final Logger log;
+ private final ProcessMetrics processMetrics;
+ protected final long idleReportingPeriodNanos;
+ private volatile long idlePeriodStartNanos = 0L;
protected AbstractServer(String appName, ServerOpts opts, String[] args) {
this.log = LoggerFactory.getLogger(getClass().getName());
@@ -53,6 +63,30 @@ public abstract class AbstractServer implements
AutoCloseable, Runnable {
// Server-side "client" check to make sure we're logged in as a user we
expect to be
context.enforceKerberosLogin();
}
+ processMetrics = new ProcessMetrics();
+ idleReportingPeriodNanos = TimeUnit.MILLISECONDS.toNanos(
+
context.getConfiguration().getTimeInMillis(Property.GENERAL_IDLE_PROCESS_INTERVAL));
+ }
+
+ protected void idleProcessCheck(Supplier<Boolean> idleCondition) {
+ boolean isIdle = idleCondition.get();
+ boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodNanos == 0;
+ boolean isIdlePeriodNotStarted = idlePeriodStartNanos == 0;
+ boolean hasExceededIdlePeriod =
+ (System.nanoTime() - idlePeriodStartNanos) > idleReportingPeriodNanos;
+
+ if (shouldResetIdlePeriod) {
+ // Reset idle period and set idle metric to false
+ idlePeriodStartNanos = 0;
+ processMetrics.setIdleValue(false);
+ } else if (isIdlePeriodNotStarted) {
+ // Start tracking idle period
+ idlePeriodStartNanos = System.nanoTime();
+ } else if (hasExceededIdlePeriod) {
+ // Set idle metric to true and reset the start of the idle period
+ processMetrics.setIdleValue(true);
+ idlePeriodStartNanos = 0;
+ }
}
/**
@@ -76,6 +110,14 @@ public abstract class AbstractServer implements
AutoCloseable, Runnable {
}
}
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ // makes mocking subclasses easier
+ if (processMetrics != null) {
+ processMetrics.registerMetrics(registry);
+ }
+ }
+
public String getHostname() {
return hostname;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
new file mode 100644
index 0000000000..4ebbeb22a2
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class ProcessMetrics implements MetricsProducer {
+
+ private final AtomicInteger isIdle;
+
+ public ProcessMetrics() {
+ this.isIdle = new AtomicInteger(-1);
+ }
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ registry.gauge(METRICS_SERVER_IDLE, isIdle, AtomicInteger::get);
+ }
+
+ public void setIdleValue(boolean isIdle) {
+ this.isIdle.set(isIdle ? 1 : 0);
+ }
+}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 63c525fb5a..3039ab9c8a 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
@@ -122,7 +123,6 @@ import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.FunctionCounter;
-import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
@@ -188,6 +188,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
@Override
public void registerMetrics(MeterRegistry registry) {
+ super.registerMetrics(registry);
FunctionCounter.builder(METRICS_COMPACTOR_ENTRIES_READ, this,
Compactor::getTotalEntriesRead)
.description("Number of entries read by all compactions that have run
on this compactor")
.register(registry);
@@ -198,13 +199,6 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
.description("Number and duration of stuck major
compactions").register(registry);
CompactionWatcher.setTimer(timer);
-
- Gauge
- .builder(METRICS_COMPACTOR_BUSY, this.compactionRunning,
- isRunning -> isRunning.get() ? 1 : 0)
- .description(
- "Indicates if the compactor is busy or not. The value will be 0
when idle and 1 when busy.")
- .register(registry);
}
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
@@ -708,8 +702,17 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
try {
final AtomicReference<Throwable> err = new AtomicReference<>();
+ final AtomicLong timeSinceLastCompletion = new AtomicLong(0L);
while (!shutdown) {
+
+ idleProcessCheck(() -> {
+ return timeSinceLastCompletion.get() == 0
+ /* Never started a compaction */ ||
(timeSinceLastCompletion.get() > 0
+ && (System.nanoTime() - timeSinceLastCompletion.get())
+ > idleReportingPeriodNanos);
+ });
+
currentCompactionId.set(null);
err.set(null);
JOB_HOLDER.reset();
@@ -858,6 +861,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
}
} finally {
currentCompactionId.set(null);
+ timeSinceLastCompletion.set(System.nanoTime());
// In the case where there is an error in the foreground code the
background compaction
// may still be running. Must cancel it before starting another
iteration of the loop to
// avoid multiple threads updating shared state.
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index cc3708f6f4..9e89e024a4 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -323,6 +323,7 @@ public class CompactorTest {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+ PowerMock.suppress(PowerMock.methods(AbstractServer.class,
"idleProcessCheck"));
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -373,6 +374,7 @@ public class CompactorTest {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+ PowerMock.suppress(PowerMock.methods(AbstractServer.class,
"idleProcessCheck"));
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -424,6 +426,7 @@ public class CompactorTest {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+ PowerMock.suppress(PowerMock.methods(AbstractServer.class,
"idleProcessCheck"));
ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index d3c61fdd0d..d8027fa968 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -411,7 +411,7 @@ public class ScanServer extends AbstractServer
blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(),
resourceManager.getDataCache(), resourceManager.getSummaryCache());
- metricsInfo.addMetricsProducers(scanMetrics, scanServerMetrics,
blockCacheMetrics);
+ metricsInfo.addMetricsProducers(this, scanMetrics, scanServerMetrics,
blockCacheMetrics);
metricsInfo.init();
// We need to set the compaction manager so that we don't get an NPE in
CompactableImpl.close
@@ -420,6 +420,8 @@ public class ScanServer extends AbstractServer
try {
while (!serverStopRequested) {
UtilWaitThread.sleep(1000);
+ idleProcessCheck(() -> sessionManager.getActiveScans().isEmpty()
+ && tabletMetadataCache.estimatedSize() == 0);
}
} finally {
LOG.info("Stopping Thrift Servers");
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 85b0872385..cb4e5f534d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -771,8 +771,8 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
blockCacheMetrics = new
BlockCacheMetrics(this.resourceManager.getIndexCache(),
this.resourceManager.getDataCache(),
this.resourceManager.getSummaryCache());
- metricsInfo.addMetricsProducers(metrics, updateMetrics, scanMetrics,
mincMetrics, ceMetrics,
- blockCacheMetrics);
+ metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics,
mincMetrics,
+ ceMetrics, blockCacheMetrics);
metricsInfo.init();
this.compactionManager = new CompactionManager(() -> Iterators
@@ -871,16 +871,20 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
HostAndPort managerHost;
while (!serverStopRequested) {
+
+ idleProcessCheck(() -> getOnlineTablets().isEmpty());
+
// send all of the pending messages
try {
ManagerMessage mm = null;
ManagerClientService.Client iface = null;
try {
- // wait until a message is ready to send, or a sever stop
+ // wait until a message is ready to send, or a server stop
// was requested
while (mm == null && !serverStopRequested) {
mm = managerMessages.poll(1, TimeUnit.SECONDS);
+ idleProcessCheck(() -> getOnlineTablets().isEmpty());
}
// have a message to send to the manager, so grab a
@@ -908,6 +912,7 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
// if any messages are immediately available grab em and
// send them
mm = managerMessages.poll();
+ idleProcessCheck(() -> getOnlineTablets().isEmpty());
}
} finally {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 89f887251b..535d50b34b 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.test.compaction;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.accumulo.core.util.UtilWaitThread.sleep;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
@@ -39,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.compactor.Compactor;
@@ -218,12 +216,10 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
final AtomicLong totalEntriesRead = new AtomicLong(0);
final AtomicLong totalEntriesWritten = new AtomicLong(0);
- final AtomicInteger compactorBusy = new AtomicInteger(-1);
final long expectedEntriesRead = 9216;
final long expectedEntriesWritten = 4096;
- Thread checkerThread =
- getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten,
compactorBusy);
+ Thread checkerThread = getMetricsCheckerThread(totalEntriesRead,
totalEntriesWritten);
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
@@ -241,13 +237,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
- "Compactor busy metric should be false initially");
-
- compact(client, table, 2, QUEUE1, false);
-
- Wait.waitFor(() -> compactorBusy.get() == 1, 30_000,
CHECKER_THREAD_SLEEP_MS,
- "Compactor busy metric should be true after starting compaction");
+ compact(client, table, 2, QUEUE1, true);
Wait.waitFor(() -> {
if (totalEntriesRead.get() == expectedEntriesRead
@@ -262,9 +252,6 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
}, 30_000, CHECKER_THREAD_SLEEP_MS,
"Entries read and written metrics values did not match expected
values");
- Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
- "Compactor busy metric should be false once compaction completes");
-
log.info("Done Compacting table");
verify(client, table, 2, ROWS);
} finally {
@@ -280,10 +267,9 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
*
* @param totalEntriesRead this is set to the value of the entries read
metric
* @param totalEntriesWritten this is set to the value of the entries
written metric
- * @param compactorBusy this is set to the value of the compactor busy metric
*/
private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
- AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
+ AtomicLong totalEntriesWritten) {
return Threads.createThread("metric-tailer", () -> {
log.info("Starting metric tailer");
@@ -308,9 +294,6 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
case MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN:
totalEntriesWritten.addAndGet(value);
break;
- case MetricsProducer.METRICS_COMPACTOR_BUSY:
- compactorBusy.set(value);
- break;
}
}
sleepUninterruptibly(CHECKER_THREAD_SLEEP_MS, TimeUnit.MILLISECONDS);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
new file mode 100644
index 0000000000..6366d16ae9
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdleProcessMetricsIT extends SharedMiniClusterBase {
+
+ private static final Logger log =
LoggerFactory.getLogger(IdleProcessMetricsIT.class);
+
+ static final Duration idleProcessInterval = Duration.ofSeconds(10);
+
+ public static class IdleStopITConfig implements
MiniClusterConfigurationCallback {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+ ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+ cfg.setNumCompactors(1);
+ cfg.setNumTservers(1);
+ cfg.setNumScanServers(1);
+
+ cfg.setProperty(Property.GENERAL_IDLE_PROCESS_INTERVAL,
+ idleProcessInterval.toSeconds() + "s");
+
+ // Tell the server processes to use a StatsDMeterRegistry that will be
configured
+ // to push all metrics to the sink we started.
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+ TestStatsDRegistryFactory.class.getName());
+ Map<String,String> sysProps =
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+ TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
+ cfg.setSystemProperties(sysProps);
+
+ }
+
+ }
+
+ private static TestStatsDSink sink;
+
+ @BeforeAll
+ public static void before() throws Exception {
+ sink = new TestStatsDSink();
+ SharedMiniClusterBase.startMiniClusterWithConfig(new IdleStopITConfig());
+ }
+
+ @AfterAll
+ public static void after() throws Exception {
+ sink.close();
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @Test
+ public void testIdleStopMetrics() throws Exception {
+
+
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+ getCluster().getClusterControl().startCompactors(Compactor.class, 1,
QUEUE1);
+ getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
"localhost");
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ // should emit the idle metric after the configured duration of
GENERAL_IDLE_PROCESS_INTERVAL
+ Thread.sleep(idleProcessInterval.toMillis());
+
+ AtomicBoolean sawCompactor = new AtomicBoolean(false);
+ AtomicBoolean sawSServer = new AtomicBoolean(false);
+ AtomicBoolean sawTServer = new AtomicBoolean(false);
+ Wait.waitFor(() -> {
+ List<String> statsDMetrics = sink.getLines();
+ statsDMetrics.stream().filter(line ->
line.startsWith(MetricsProducer.METRICS_SERVER_IDLE))
+ .peek(log::info).map(TestStatsDSink::parseStatsDMetric).forEach(a ->
{
+ String processName = a.getTags().get("process.name");
+ int value = Integer.parseInt(a.getValue());
+ assertTrue(value == 0 || value == 1 || value == -1, "Unexpected
value " + value);
+ if ("tserver".equals(processName) && value == 0) {
+ // Expect tserver to never be idle
+ sawTServer.set(true);
+ } else if ("sserver".equals(processName) && value == 1) {
+ // Expect scan server to be idle
+ sawSServer.set(true);
+ } else if ("compactor".equals(processName) && value == 1) {
+ // Expect compactor to be idle
+ sawCompactor.set(true);
+ }
+
+ });
+ return sawCompactor.get() && sawSServer.get() && sawTServer.get();
+ });
+ }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 5a46507db1..fc0ecdb881 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -102,13 +102,13 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
// @formatter:off
Set<String> unexpectedMetrics =
Set.of(METRICS_COMPACTOR_MAJC_STUCK,
- METRICS_COMPACTOR_BUSY,
METRICS_REPLICATION_QUEUE,
METRICS_SCAN_YIELDS,
METRICS_UPDATE_ERRORS);
// add sserver as flaky until scan server included in mini tests.
Set<String> flakyMetrics = Set.of(METRICS_FATE_TYPE_IN_PROGRESS,
+ METRICS_SERVER_IDLE,
METRICS_SCAN_BUSY_TIMEOUT_COUNTER,
METRICS_SCAN_RESERVATION_CONFLICT_COUNTER,
METRICS_SCAN_RESERVATION_TOTAL_TIMER,