This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new e42a03198f periodically clean up canceled futures in compaction job
prioq (#4727)
e42a03198f is described below
commit e42a03198fea366842403761701585bbfc09a430
Author: Keith Turner <[email protected]>
AuthorDate: Fri Jul 5 11:14:19 2024 -0700
periodically clean up canceled futures in compaction job prioq (#4727)
For the case where nothing is ever added to a compaction job prioq
and futures are continually obtained and canceled these canceled
futures would keep building up in memory. This commit fixes that
by periodically cleaning out canceled futures.
---
.../queue/CompactionJobPriorityQueue.java | 22 +++++++++++++
.../queue/CompactionJobPriorityQueueTest.java | 36 ++++++++++++++++++++++
.../compaction/queue/CompactionJobQueuesTest.java | 16 ++++++++--
3 files changed, 71 insertions(+), 3 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 9909ccb7f9..c91b8becf1 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -42,6 +42,7 @@ import
org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -59,6 +60,9 @@ public class CompactionJobPriorityQueue {
private final CompactorGroupId groupId;
+ @VisibleForTesting
+ static final int FUTURE_CHECK_THRESHOLD = 10_000;
+
private class CjpqKey implements Comparable<CjpqKey> {
private final CompactionJob job;
@@ -107,6 +111,7 @@ public class CompactionJobPriorityQueue {
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>>
futures;
+ private long futuresAdded = 0;
private static class TabletJobs {
final long generation;
@@ -244,9 +249,26 @@ public class CompactionJobPriorityQueue {
// be completed when something does arrive.
CompletableFuture<CompactionJobQueues.MetaJob> future = new
CompletableFuture<>();
futures.add(future);
+ futuresAdded++;
+ // Handle the case where nothing is ever being added to this queue and
futures are constantly
+ // being obtained and cancelled. If nothing is done these canceled futures
would just keep
+ // building up in memory. The following code periodically checks to see if
there are canceled
+ // futures to remove.
+ if (futuresAdded % FUTURE_CHECK_THRESHOLD == 0
+ && futures.size() >= 2 * FUTURE_CHECK_THRESHOLD) {
+ futures.removeIf(CompletableFuture::isDone);
+ // It is not expected that the future we just created would be done, if
it were it would have
+ // been removed.
+ Preconditions.checkState(!future.isDone());
+ }
return future;
}
+ @VisibleForTesting
+ synchronized int futuresSize() {
+ return futures.size();
+ }
+
// exists for tests
synchronized CompactionJobQueues.MetaJob peek() {
var firstEntry = jobQueue.firstEntry();
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index ddf9a3016e..5464b90a33 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -20,11 +20,14 @@ package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.TableId;
@@ -264,4 +267,37 @@ public class CompactionJobPriorityQueueTest {
assertEquals(100, matchesSeen);
}
+
+ /**
+ * Test to ensure that canceled futures do not build up in memory.
+ */
+ @Test
+ public void testAsyncCancelCleanup() {
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100);
+
+ List<CompletableFuture<MetaJob>> futures = new ArrayList<>();
+
+ int maxFuturesSize = 0;
+
+ // Add 11 below so that cadence of clearing differs from the internal
check cadence
+ final int CANCEL_THRESHOLD =
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD / 10 + 11;
+ final int ITERATIONS = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD *
20;
+
+ for (int x = 0; x < ITERATIONS; x++) {
+ futures.add(queue.getAsync());
+
+ maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
+
+ if (futures.size() >= CANCEL_THRESHOLD) {
+ futures.forEach(f -> f.cancel(true));
+ futures.clear();
+ }
+ }
+
+ maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
+
+ assertTrue(maxFuturesSize
+ < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD +
CANCEL_THRESHOLD));
+ assertTrue(maxFuturesSize > 2 *
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
+ }
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index a9f360b4cd..09ae416091 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@@ -45,6 +46,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -376,15 +378,23 @@ public class CompactionJobQueuesTest {
assertEquals(extent4, future3.get().getTabletMetadata().getExtent());
assertEquals(extent3, future4.get().getTabletMetadata().getExtent());
- // test cancelling a future
+ // test cancelling a future and having a future timeout
var future5 = jobQueues.getAsync(cg1);
assertFalse(future5.isDone());
future5.cancel(false);
var future6 = jobQueues.getAsync(cg1);
assertFalse(future6.isDone());
- // since future5 was canceled, this addition should go to future6
+ future6.orTimeout(10, TimeUnit.MILLISECONDS);
+ // sleep for 20 millis, this should cause future6 to be timed out
+ UtilWaitThread.sleep(20);
+ var future7 = jobQueues.getAsync(cg1);
+ assertFalse(future7.isDone());
+ // since future5 was canceled and future6 timed out, this addition should
go to future7
jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ assertTrue(future7.isDone());
+ assertEquals(extent1, future7.get().getTabletMetadata().getExtent());
+ assertTrue(future5.isDone());
+ assertTrue(future6.isCompletedExceptionally());
assertTrue(future6.isDone());
- assertEquals(extent1, future6.get().getTabletMetadata().getExtent());
}
}