This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 890fc4741a fixes ExternalCompactionProgressIT (#4665)
890fc4741a is described below
commit 890fc4741a7984407f303bfadd8ee858aad0a7f5
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jun 12 12:14:03 2024 -0400
fixes ExternalCompactionProgressIT (#4665)
This test was validating the compactor busy count metric. The test has 9
compactor processes running. When it saw a busy count from any of the 9 it
would set an atomic long. This is made it likely that the 8 of 9 not busy
compactors would set zero making the test flaky. Replaced the atomic long
w/ a
concurrent map where each compactor process has an entry in the map for its
busy count.
---
.../compaction/ExternalCompactionProgressIT.java | 38 +++++++++++++++++-----
1 file changed, 30 insertions(+), 8 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 3b3042a11f..a797faa36e 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -34,10 +34,11 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.Accumulo;
@@ -123,13 +124,26 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
cfg.setSystemProperties(sysProps);
}
+ private static long computeBusyCount(String resourceGroup,
+ ConcurrentHashMap<String,Long> compactorBusy) {
+ var stats =
+ compactorBusy.entrySet().stream().filter(e ->
e.getKey().startsWith(resourceGroup + ":"))
+ .mapToLong(Map.Entry::getValue).summaryStatistics();
+ if (stats.getCount() == 0) {
+ // signifies nothing was present, this differentiates between the case
where things are
+ // present w/ a zero value
+ return -1;
+ }
+ return stats.getSum();
+ }
+
@Test
public void testProgressViaMetrics() throws Exception {
String table = this.getUniqueNames(1)[0];
final AtomicLong totalEntriesRead = new AtomicLong(0);
final AtomicLong totalEntriesWritten = new AtomicLong(0);
- final AtomicInteger compactorBusy = new AtomicInteger(-1);
+ final ConcurrentHashMap<String,Long> compactorBusy = new
ConcurrentHashMap<>();
final long expectedEntriesRead = 18432;
final long expectedEntriesWritten = 13312;
@@ -149,12 +163,13 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
- "Compactor busy metric should be false initially");
+ Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000,
+ CHECKER_THREAD_SLEEP_MS, "Compactor busy metric should be false
initially");
compact(client, table, 2, GROUP1, false);
- Wait.waitFor(() -> compactorBusy.get() == 1, 30_000,
CHECKER_THREAD_SLEEP_MS,
+ Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 1, 30_000,
+ CHECKER_THREAD_SLEEP_MS,
"Compactor busy metric should be true after starting compaction");
Wait.waitFor(() -> {
@@ -170,7 +185,8 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
}, 30_000, CHECKER_THREAD_SLEEP_MS,
"Entries read and written metrics values did not match expected
values");
- Wait.waitFor(() -> compactorBusy.get() == 0, 30_000,
CHECKER_THREAD_SLEEP_MS,
+ Wait.waitFor(() -> computeBusyCount(GROUP1, compactorBusy) == 0, 30_000,
+ CHECKER_THREAD_SLEEP_MS,
"Compactor busy metric should be false once compaction completes");
log.info("Done Compacting table");
@@ -189,7 +205,7 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
* @param compactorBusy this is set to the value of the compactor busy metric
*/
private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
- AtomicLong totalEntriesWritten, AtomicInteger compactorBusy) {
+ AtomicLong totalEntriesWritten, ConcurrentHashMap<String,Long>
compactorBusy) {
return Threads.createThread("metric-tailer", () -> {
log.info("Starting metric tailer");
@@ -215,7 +231,13 @@ public class ExternalCompactionProgressIT extends
AccumuloClusterHarness {
totalEntriesWritten.addAndGet(value);
break;
case MetricsProducer.METRICS_COMPACTOR_BUSY:
- compactorBusy.set(value);
+ // expect these tags to be present, so have the test fail w/ NPE
if they are not
+ var host = Objects.requireNonNull(metric.getTags().get("host"));
+ var port = Objects.requireNonNull(metric.getTags().get("port"));
+ var resourceGroup =
Objects.requireNonNull(metric.getTags().get("resource.group"));
+ var key = resourceGroup + ":" + host + ":" + port;
+ log.debug("setting busy count {} {}", key, value);
+ compactorBusy.put(key, (long) value);
break;
}
}