This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 6cfdfc242a3 Adds metrics for compaction queue data size (#5275)
6cfdfc242a3 is described below
commit 6cfdfc242a31b36f2aae0c4f9e4c9121e2f57555
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jan 22 11:45:48 2025 -0500
Adds metrics for compaction queue data size (#5275)
Added a metric for compaction queue data size, removed some unused code,
and reworked an existing compaction metric for the compaction job size
changes.
Fixes #5269
---
.../java/org/apache/accumulo/core/metrics/Metric.java | 6 ++++--
.../manager/compaction/coordinator/QueueMetrics.java | 15 ++++++++++++---
.../compaction/queue/CompactionJobPriorityQueue.java | 10 ++++------
.../compaction/queue/CompactionJobPriorityQueueTest.java | 13 -------------
.../compaction/CompactionPriorityQueueMetricsIT.java | 16 +++++++++++++---
.../java/org/apache/accumulo/test/metrics/MetricsIT.java | 10 +++++++---
6 files changed, 40 insertions(+), 30 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
index da61305fc21..e5ddf3be6a4 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
@@ -54,12 +54,14 @@ public enum Metric {
MetricDocSection.COMPACTION),
COMPACTOR_JOB_PRIORITY_QUEUES("accumulo.compaction.queue.count",
MetricType.GAUGE,
"Number of priority queues for compaction jobs.",
MetricDocSection.COMPACTION),
- COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH("accumulo.compaction.queue.length",
MetricType.GAUGE,
- "Length of priority queue.", MetricDocSection.COMPACTION),
+ COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE("accumulo.compaction.queue.max.size",
MetricType.GAUGE,
+ "The maximum size in bytes of all jobs.", MetricDocSection.COMPACTION),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED("accumulo.compaction.queue.jobs.dequeued",
MetricType.GAUGE, "Count of dequeued jobs.",
MetricDocSection.COMPACTION),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED("accumulo.compaction.queue.jobs.queued",
MetricType.GAUGE, "Count of queued jobs.", MetricDocSection.COMPACTION),
+
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE("accumulo.compaction.queue.jobs.size",
MetricType.GAUGE,
+ "Size of queued jobs in bytes.", MetricDocSection.COMPACTION),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED("accumulo.compaction.queue.jobs.rejected",
MetricType.GAUGE, "Count of rejected jobs.",
MetricDocSection.COMPACTION),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compaction.queue.jobs.priority",
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
index aa429d6999d..b0771bc2fe2 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java
@@ -27,7 +27,8 @@ import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
-import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE;
import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
import java.util.HashMap;
@@ -59,6 +60,7 @@ public class QueueMetrics implements MetricsProducer {
private static class QueueMeters {
private final Gauge length;
private final Gauge jobsQueued;
+ private final Gauge jobsQueuedSize;
private final Gauge jobsDequeued;
private final Gauge jobsRejected;
private final Gauge jobsLowestPriority;
@@ -72,8 +74,8 @@ public class QueueMetrics implements MetricsProducer {
var queueId = formatString(cgid.canonical());
length =
- Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName(), queue,
q -> q.getMaxSize())
-
.description(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getDescription())
+ Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName(),
queue, q -> q.getMaxSize())
+
.description(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getDescription())
.tags(List.of(Tag.of("queue.id",
queueId))).register(meterRegistry);
jobsQueued = Gauge
@@ -82,6 +84,12 @@ public class QueueMetrics implements MetricsProducer {
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+ jobsQueuedSize = Gauge
+ .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName(), queue,
+ q -> q.getQueuedJobsSize())
+ .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getDescription())
+ .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
+
jobsDequeued = Gauge
.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED.getName(), queue,
q -> q.getDequeuedJobs())
@@ -134,6 +142,7 @@ public class QueueMetrics implements MetricsProducer {
registry.remove(jobsMaxAge);
registry.remove(jobsAvgAge);
registry.remove(jobsQueueTimer);
+ registry.remove(jobsQueuedSize);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index f183b50b86f..9a36f07f561 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -233,12 +233,6 @@ public class CompactionJobPriorityQueue {
return maxSize.get();
}
- public synchronized void setMaxSize(long maxSize) {
- Preconditions.checkArgument(maxSize > 0,
- "Maximum size of the Compaction job priority queue must be greater
than 0");
- this.maxSize.set(maxSize);
- }
-
public long getRejectedJobs() {
return rejectedJobs.get();
}
@@ -251,6 +245,10 @@ public class CompactionJobPriorityQueue {
return jobQueue.entrySize();
}
+ public synchronized long getQueuedJobsSize() {
+ return jobQueue.dataSize();
+ }
+
public synchronized long getLowestPriority() {
if (jobQueue.isEmpty()) {
return 0;
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 01c18798754..6617ad5cef2 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -339,16 +338,4 @@ public class CompactionJobPriorityQueueTest {
< 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD +
CANCEL_THRESHOLD));
assertTrue(maxFuturesSize > 2 *
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
}
-
- @Test
- public void testChangeMaxSize() {
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100, mj -> 1);
- assertEquals(100, queue.getMaxSize());
- queue.setMaxSize(50);
- assertEquals(50, queue.getMaxSize());
- assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(0));
- assertThrows(IllegalArgumentException.class, () -> queue.setMaxSize(-1));
- // Make sure previous value was not changed after invalid setting
- assertEquals(50, queue.getMaxSize());
- }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 2a032565953..3a8a6a601b4 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -22,7 +22,8 @@ import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
-import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE;
+import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -337,7 +338,9 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
}
boolean sawMetricsQ1 = false;
- while (!sawMetricsQ1) {
+ boolean sawMetricsQ1Size = false;
+
+ while (!sawMetricsQ1 || !sawMetricsQ1Size) {
while (!queueMetrics.isEmpty()) {
var qm = queueMetrics.take();
if
(qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
@@ -346,7 +349,14 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
sawMetricsQ1 = true;
}
}
+ if
(qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName())
+ && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ if (Integer.parseInt(qm.getValue()) > 0) {
+ sawMetricsQ1Size = true;
+ }
+ }
}
+
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}
@@ -366,7 +376,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
} else if
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
lowestPriority = Math.max(lowestPriority,
Long.parseLong(metric.getValue()));
- } else if
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName())
+ } else if
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE.getName())
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
queueSize = Integer.parseInt(metric.getValue());
} else if
(metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index a5c777c64d4..e7492968cf5 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -149,9 +149,10 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
final int compactionPriorityQueueDequeuedBit = 2;
final int compactionPriorityQueueRejectedBit = 3;
final int compactionPriorityQueuePriorityBit = 4;
+ final int compactionPriorityQueueSizeBit = 5;
- final BitSet trueSet = new BitSet(5);
- trueSet.set(0, 4, true);
+ final BitSet trueSet = new BitSet(6);
+ trueSet.set(0, 5, true);
final BitSet queueMetricsSeen = new BitSet(5);
@@ -187,7 +188,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
seenMetrics.add(metric);
expectedMetrics.remove(metric);
switch (metric) {
- case COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH:
+ case COMPACTOR_JOB_PRIORITY_QUEUE_MAX_SIZE:
queueMetricsSeen.set(compactionPriorityQueueLengthBit, true);
break;
case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED:
@@ -202,6 +203,9 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY:
queueMetricsSeen.set(compactionPriorityQueuePriorityBit,
true);
break;
+ case COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE:
+ queueMetricsSeen.set(compactionPriorityQueueSizeBit, true);
+ break;
default:
break;
}