This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new ab5c57eb3c Add metrics to track time compaction jobs are queued (#4980)
ab5c57eb3c is described below
commit ab5c57eb3c25d202133fa8e1e818115e7855accc
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Oct 18 16:39:47 2024 -0400
Add metrics to track time compaction jobs are queued (#4980)
This adds 2 new groups of stats to track information about queued
compaction jobs. The first stat is a timer that keeps track of when jobs
are being polled and give information on how often/fast jobs are
exiting the queue. The second group of stats is a min/max/avg
and is tracking age information about how long jobs are waiting
on the queue.
This closes #4945
---
.../org/apache/accumulo/core/metrics/Metric.java | 12 +++
.../coordinator/CompactionCoordinator.java | 2 +-
.../compaction/coordinator/QueueMetrics.java | 36 +++++++
.../queue/CompactionJobPriorityQueue.java | 106 +++++++++++++++++++--
.../queue/CompactionJobPriorityQueueTest.java | 40 +++++++-
.../compaction/queue/CompactionJobQueuesTest.java | 8 ++
.../compaction/ExternalCompactionMetricsIT.java | 28 +++++-
7 files changed, 219 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index 92690123c6..7050b73347 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -51,6 +51,18 @@ public enum Metric {
MetricType.GAUGE, "Count of rejected jobs.", MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compactor.queue.jobs.priority",
MetricType.GAUGE, "Lowest priority queued job.",
MetricCategory.COMPACTOR),
+
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE("accumulo.compactor.queue.jobs.min.age",
+ MetricType.GAUGE, "Minimum age of currently queued jobs in seconds.",
+ MetricCategory.COMPACTOR),
+
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE("accumulo.compactor.queue.jobs.max.age",
+ MetricType.GAUGE, "Maximum age of currently queued jobs in seconds.",
+ MetricCategory.COMPACTOR),
+
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE("accumulo.compactor.queue.jobs.avg.age",
+ MetricType.GAUGE, "Average age of currently queued jobs in seconds.",
+ MetricCategory.COMPACTOR),
+
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER("accumulo.compactor.queue.jobs.exit.time",
+ MetricType.TIMER, "Tracks time a job spent in the queue before exiting
the queue.",
+ MetricCategory.COMPACTOR),
// Fate Metrics
FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type",
MetricType.GAUGE,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 272395354d..3de05a7578 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -1049,7 +1049,7 @@ public class CompactionCoordinator
// associated priority queue of jobs
CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
if (queue != null) {
- queue.clear();
+ queue.clearIfInactive(Duration.ofMinutes(10));
queue.setMaxSize(this.jobQueueInitialSize);
}
} else {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index eb2b9800c2..dd2705eb18 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@ -19,7 +19,11 @@
package org.apache.accumulo.manager.compaction.coordinator;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUES;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
@@ -48,6 +52,7 @@ import com.google.common.collect.Sets.SetView;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
public class QueueMetrics implements MetricsProducer {
@@ -57,6 +62,10 @@ public class QueueMetrics implements MetricsProducer {
private final Gauge jobsDequeued;
private final Gauge jobsRejected;
private final Gauge jobsLowestPriority;
+ private final Gauge jobsMinAge;
+ private final Gauge jobsMaxAge;
+ private final Gauge jobsAvgAge;
+ private final Timer jobsQueueTimer;
public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid,
CompactionJobPriorityQueue queue) {
@@ -90,6 +99,29 @@ public class QueueMetrics implements MetricsProducer {
q -> q.getLowestPriority())
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+ jobsMinAge = Gauge
+ .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName(), queue,
+ q -> q.getJobQueueStats().getMinAge().toSeconds())
+
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getDescription())
+ .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+ jobsMaxAge = Gauge
+ .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName(), queue,
+ q -> q.getJobQueueStats().getMaxAge().toSeconds())
+
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getDescription())
+ .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+ jobsAvgAge =
Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName(), queue,
+ // Divide by 1000.0 instead of using toSeconds() so we get a double
+ q -> q.getJobQueueStats().getAvgAge().toMillis() / 1000.0)
+
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getDescription())
+ .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
+ jobsQueueTimer =
Timer.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName())
+
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getDescription())
+ .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+ queue.setJobQueueTimerCallback(jobsQueueTimer);
}
private void removeMeters(MeterRegistry registry) {
@@ -98,6 +130,10 @@ public class QueueMetrics implements MetricsProducer {
registry.remove(jobsDequeued);
registry.remove(jobsRejected);
registry.remove(jobsLowestPriority);
+ registry.remove(jobsMinAge);
+ registry.remove(jobsMaxAge);
+ registry.remove(jobsAvgAge);
+ registry.remove(jobsQueueTimer);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 9e2ddbc96a..2099dbed6d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
import static com.google.common.base.Preconditions.checkState;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -28,23 +29,31 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
/**
* Priority Queue for {@link CompactionJob}s that supports a maximum size.
When a job is added and
@@ -113,6 +122,9 @@ public class CompactionJobPriorityQueue {
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>>
futures;
private long futuresAdded = 0;
+ private final Map<KeyExtent,Timer> jobAges;
+ private final Supplier<CompactionJobPriorityQueueStats> jobQueueStats;
+ private final AtomicReference<Optional<io.micrometer.core.instrument.Timer>>
jobQueueTimer;
private static class TabletJobs {
final long generation;
@@ -138,6 +150,10 @@ public class CompactionJobPriorityQueue {
this.rejectedJobs = new AtomicLong(0);
this.dequeuedJobs = new AtomicLong(0);
this.futures = new ArrayDeque<>();
+ this.jobAges = new ConcurrentHashMap<>();
+ this.jobQueueStats = Suppliers.memoizeWithExpiration(
+ () -> new CompactionJobPriorityQueueStats(jobAges), 5,
TimeUnit.SECONDS);
+ this.jobQueueTimer = new AtomicReference<>(Optional.empty());
}
public synchronized void removeOlderGenerations(Ample.DataLevel level, long
currGeneration) {
@@ -154,7 +170,8 @@ public class CompactionJobPriorityQueue {
removals.size(), groupId, level);
}
- removals.forEach(this::removePreviousSubmissions);
+ // Also clears jobAge timer for tablets that do not need compaction anymore
+ removals.forEach(ke -> removePreviousSubmissions(ke, true));
}
/**
@@ -164,7 +181,10 @@ public class CompactionJobPriorityQueue {
long generation) {
Preconditions.checkArgument(jobs.stream().allMatch(job ->
job.getGroup().equals(groupId)));
- removePreviousSubmissions(tabletMetadata.getExtent());
+ // Do not clear jobAge timers, they are cleared later at the end of this
method
+ // if there are no jobs for the extent so we do not reset the timer for an
extent
+ // that had previous jobs and still has jobs
+ removePreviousSubmissions(tabletMetadata.getExtent(), false);
HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
@@ -175,9 +195,14 @@ public class CompactionJobPriorityQueue {
// its expected that if futures are present then the queue is empty,
if this is not true
// then there is a bug
Preconditions.checkState(jobQueue.isEmpty());
+ // Queue should be empty so jobAges should be empty
+ Preconditions.checkState(jobAges.isEmpty());
if (future.complete(new CompactionJobQueues.MetaJob(job,
tabletMetadata))) {
// successfully completed a future with this job, so do not need to
queue the job
jobsAdded++;
+ // Record a time of 0 as job as we were able to complete immediately
and there
+ // were no jobs waiting
+ jobQueueTimer.get().ifPresent(jqt -> jqt.record(Duration.ZERO));
continue outer;
} // else the future was canceled or timed out so could not complete it
future = futures.poll();
@@ -197,6 +222,9 @@ public class CompactionJobPriorityQueue {
if (!newEntries.isEmpty()) {
checkState(tabletJobs.put(tabletMetadata.getExtent(), new
TabletJobs(generation, newEntries))
== null);
+ jobAges.computeIfAbsent(tabletMetadata.getExtent(), e ->
Timer.startNew());
+ } else {
+ jobAges.remove(tabletMetadata.getExtent());
}
return jobsAdded;
@@ -235,10 +263,19 @@ public class CompactionJobPriorityQueue {
if (first != null) {
dequeuedJobs.getAndIncrement();
var extent = first.getValue().getTabletMetadata().getExtent();
- Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
+ var timer = jobAges.get(extent);
+ checkState(timer != null);
+ jobQueueTimer.get().ifPresent(jqt -> jqt.record(timer.elapsed()));
+ log.trace("Compaction job age for {} is {} ms", extent,
timer.elapsed(TimeUnit.MILLISECONDS));
+ Set<CompactionJobPriorityQueue.CjpqKey> jobs =
tabletJobs.get(extent).jobs;
checkState(jobs.remove(first.getKey()));
+ // If there are no more jobs for this extent we can remove the timer,
otherwise
+ // we need to reset it
if (jobs.isEmpty()) {
tabletJobs.remove(extent);
+ jobAges.remove(extent);
+ } else {
+ timer.restart();
}
}
return first == null ? null : first.getValue();
@@ -280,11 +317,15 @@ public class CompactionJobPriorityQueue {
return firstEntry == null ? null : firstEntry.getValue();
}
- private void removePreviousSubmissions(KeyExtent extent) {
- TabletJobs prevJobs = tabletJobs.get(extent);
+ private void removePreviousSubmissions(KeyExtent extent, boolean
removeJobAges) {
+ CompactionJobPriorityQueue.TabletJobs prevJobs = tabletJobs.get(extent);
if (prevJobs != null) {
prevJobs.jobs.forEach(jobQueue::remove);
tabletJobs.remove(extent);
+ if (removeJobAges) {
+ jobAges.remove(extent);
+ log.trace("Removed jobAge timer for tablet {} that no longer needs
compaction", extent);
+ }
}
}
@@ -302,7 +343,6 @@ public class CompactionJobPriorityQueue {
rejectedJobs.getAndIncrement();
}
}
-
}
var key = new CjpqKey(job);
@@ -310,8 +350,56 @@ public class CompactionJobPriorityQueue {
return key;
}
- public synchronized void clear() {
- jobQueue.clear();
- tabletJobs.clear();
+ public synchronized void clearIfInactive(Duration duration) {
+ // IF the minimum age of jobs in the queue is older than the
+ // duration then clear all the maps as this queue is now
+ // considered inactive
+ if (getJobQueueStats().getMinAge().compareTo(duration) > 0) {
+ jobQueue.clear();
+ tabletJobs.clear();
+ jobAges.clear();
+ }
+ }
+
+ public CompactionJobPriorityQueueStats getJobQueueStats() {
+ return jobQueueStats.get();
+ }
+
+ public void setJobQueueTimerCallback(io.micrometer.core.instrument.Timer
jobQueueTimer) {
+ this.jobQueueTimer.set(Optional.of(jobQueueTimer));
+ }
+
+ // Used for unit testing, can return the map as is because
+ // it is a ConcurrentHashMap
+ @VisibleForTesting
+ Map<KeyExtent,Timer> getJobAges() {
+ return jobAges;
+ }
+
+ public static class CompactionJobPriorityQueueStats {
+ private final Duration minAge;
+ private final Duration maxAge;
+ private final Duration avgAge;
+
+ @VisibleForTesting
+ CompactionJobPriorityQueueStats(Map<KeyExtent,Timer> jobAges) {
+ final Stat stats = new Stat();
+ jobAges.values().forEach(t ->
stats.addStat(t.elapsed(TimeUnit.MILLISECONDS)));
+ this.minAge = Duration.ofMillis(stats.min());
+ this.maxAge = Duration.ofMillis(stats.max());
+ this.avgAge = Duration.ofMillis(Math.round(stats.mean()));
+ }
+
+ public Duration getMinAge() {
+ return minAge;
+ }
+
+ public Duration getMaxAge() {
+ return maxAge;
+ }
+
+ public Duration getAvgAge() {
+ return avgAge;
+ }
}
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 05e0b35c55..2592be35aa 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -33,11 +33,13 @@ import
org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import
org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue.CompactionJobPriorityQueueStats;
import
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
@@ -193,21 +195,26 @@ public class CompactionJobPriorityQueueTest {
assertEquals(0, queue.getDequeuedJobs());
assertEquals(1, queue.getRejectedJobs());
assertEquals(2, queue.getQueuedJobs());
+ // One tablet was added with jobs
+ assertEquals(1, queue.getJobAges().size());
MetaJob job = queue.poll();
assertEquals(cj1, job.getJob());
assertEquals(tm, job.getTabletMetadata());
assertEquals(1, queue.getDequeuedJobs());
+ // still 1 job left so should still have a timer
+ assertEquals(1, queue.getJobAges().size());
job = queue.poll();
assertEquals(cj2, job.getJob());
assertEquals(tm, job.getTabletMetadata());
assertEquals(2, queue.getDequeuedJobs());
+ // no more jobs so timer should be gone
+ assertTrue(queue.getJobAges().isEmpty());
job = queue.poll();
assertNull(job);
assertEquals(2, queue.getDequeuedJobs());
-
}
private static int counter = 1;
@@ -251,6 +258,14 @@ public class CompactionJobPriorityQueueTest {
assertEquals(100, queue.getMaxSize());
assertEquals(100, queue.getQueuedJobs());
assertEquals(900, queue.getRejectedJobs());
+ // There should be 1000 total job ages even though 900 were rejected
+ // as there were 1000 total tablets added
+ assertEquals(1000, queue.getJobAges().size());
+
+ var stats = queue.getJobQueueStats();
+ assertTrue(stats.getMinAge().toMillis() > 0);
+ assertTrue(stats.getMaxAge().toMillis() > 0);
+ assertTrue(stats.getAvgAge().toMillis() > 0);
// iterate over the expected set and make sure that they next job in the
queue
// matches
@@ -266,6 +281,29 @@ public class CompactionJobPriorityQueueTest {
}
assertEquals(100, matchesSeen);
+ // Should be 900 left as the 100 that were polled would clear as there are
no more
+ // jobs for those tablets. These 900 were rejected so their timers remain
and will
+ // be cleared if there are no computed jobs when jobs are added again or by
+ // the call to removeOlderGenerations()
+ assertEquals(900, queue.getJobAges().size());
+
+ // Create new stats directly vs using queue.getJobQueueStats() because
that method
+ // caches the results for a short period
+ stats = new CompactionJobPriorityQueueStats(queue.getJobAges());
+ assertTrue(stats.getMinAge().toMillis() > 0);
+ assertTrue(stats.getMaxAge().toMillis() > 0);
+ assertTrue(stats.getAvgAge().toMillis() > 0);
+
+ // Verify jobAges cleared when calling removeOlderGenerations()
+ queue.removeOlderGenerations(DataLevel.USER, 2);
+
+ // Stats should be 0 if no jobs
+ var jobAges = queue.getJobAges();
+ assertTrue(jobAges.isEmpty());
+ stats = new CompactionJobPriorityQueueStats(queue.getJobAges());
+ assertEquals(0, stats.getMinAge().toMillis());
+ assertEquals(0, stats.getMaxAge().toMillis());
+ assertEquals(0, stats.getAvgAge().toMillis());
}
/**
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 09ae416091..bba27daf67 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -362,12 +362,20 @@ public class CompactionJobQueuesTest {
jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+ // Futures were immediately completed so nothing should be queued
+ assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
+
jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+ // No futures available, so jobAges should exist for 2 tablets
+ assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size());
var future3 = jobQueues.getAsync(cg1);
var future4 = jobQueues.getAsync(cg1);
+ // Should be back to 0 size after futures complete
+ assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
+
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
index 494e9fe6ca..3245e6bf24 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java
@@ -18,6 +18,10 @@
*/
package org.apache.accumulo.test.compaction;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
@@ -151,7 +155,6 @@ public class ExternalCompactionMetricsIT extends
SharedMiniClusterBase {
boolean sawDCQ1_5 = false;
boolean sawDCQ2_10 = false;
-
// wait until expected number of queued are seen in metrics
while (!sawDCQ1_5 || !sawDCQ2_10) {
Metric qm = queueMetrics.take();
@@ -165,12 +168,22 @@ public class ExternalCompactionMetricsIT extends
SharedMiniClusterBase {
boolean sawDCQ1_0 = false;
boolean sawDCQ2_0 = false;
+ boolean minDCQ1 = false;
+ boolean maxDCQ1 = false;
+ boolean avgDCQ1 = false;
+ boolean timerDCQ1 = false;
// wait until queued goes to zero in metrics
- while (!sawDCQ1_0 || !sawDCQ2_0) {
+ // and verify stats are positive values
+ while (!sawDCQ1_0 || !sawDCQ2_0 || !minDCQ1 || !maxDCQ1 || !avgDCQ1 ||
!timerDCQ1) {
Metric qm = queueMetrics.take();
sawDCQ1_0 |= match(qm, "dcq1", "0");
sawDCQ2_0 |= match(qm, "dcq2", "0");
+ minDCQ1 |= assertMetric(qm, "dcq1",
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName());
+ maxDCQ1 |= assertMetric(qm, "dcq1",
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName());
+ avgDCQ1 |= assertMetric(qm, "dcq1",
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName());
+ timerDCQ1 |=
+ assertMetric(qm, "dcq1",
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName());
}
shutdownTailer.set(true);
@@ -204,4 +217,15 @@ public class ExternalCompactionMetricsIT extends
SharedMiniClusterBase {
return false;
}
+ private static boolean assertMetric(Metric input, String queue, String name)
{
+ if (input.getTags() != null) {
+ String id = input.getTags().get("queue.id");
+ if (id != null && id.equals(queue) && input.getName().equals(name)
+ && Double.parseDouble(input.getValue()) > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}