This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new f13ad00e06 Adds completable futures to compaction queue (#4726)
f13ad00e06 is described below
commit f13ad00e06c88eb76bfb1d966f837025d8ddfbca
Author: Keith Turner <[email protected]>
AuthorDate: Fri Jul 5 07:37:00 2024 -0700
Adds completable futures to compaction queue (#4726)
Adds completeable futures to the queue of compaction jobs. This allows for
async notification when something is added to the queue.
The compaction queues code would drop queues that became empty. The
concept of
queues being empty became more complex with this change. A queue would be
considered empty when there were no futures and the queue was empty. This
increased complexity of empty would have made the code for dropping empty
queues more complex. Instead of increasing the complexity of this code
chose
to drop removing empty queues. This means that if a compaction group is
used
and then no longer used that it will have a small empty datastructure
sitting
around in map for the process lifetime. That is unlikely to cause memory
issues. Therefore decided the increased complexity was not worthwhile given
it was unlikely to cause memory problems.
---
.../queue/CompactionJobPriorityQueue.java | 55 ++++++++++++----------
.../compaction/queue/CompactionJobQueues.java | 40 +++++++---------
.../queue/CompactionJobPriorityQueueTest.java | 54 ---------------------
.../compaction/queue/CompactionJobQueuesTest.java | 55 ++++++++++++++++++++++
4 files changed, 103 insertions(+), 101 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 4dfd6868ad..9909ccb7f9 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
import static com.google.common.base.Preconditions.checkState;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -29,7 +30,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -105,6 +106,7 @@ public class CompactionJobPriorityQueue {
private final int maxSize;
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
+ private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>>
futures;
private static class TabletJobs {
final long generation;
@@ -122,8 +124,6 @@ public class CompactionJobPriorityQueue {
private final AtomicLong nextSeq = new AtomicLong(0);
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
this.jobQueue = new TreeMap<>();
this.maxSize = maxSize;
@@ -131,13 +131,10 @@ public class CompactionJobPriorityQueue {
this.groupId = groupId;
this.rejectedJobs = new AtomicLong(0);
this.dequeuedJobs = new AtomicLong(0);
+ this.futures = new ArrayDeque<>();
}
public synchronized void removeOlderGenerations(Ample.DataLevel level, long
currGeneration) {
- if (closed.get()) {
- return;
- }
-
List<KeyExtent> removals = new ArrayList<>();
tabletJobs.forEach((extent, jobs) -> {
@@ -160,16 +157,26 @@ public class CompactionJobPriorityQueue {
public synchronized int add(TabletMetadata tabletMetadata,
Collection<CompactionJob> jobs,
long generation) {
Preconditions.checkArgument(jobs.stream().allMatch(job ->
job.getGroup().equals(groupId)));
- if (closed.get()) {
- return -1;
- }
removePreviousSubmissions(tabletMetadata.getExtent());
HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
int jobsAdded = 0;
- for (CompactionJob job : jobs) {
+ outer: for (CompactionJob job : jobs) {
+ var future = futures.poll();
+ while (future != null) {
+ // its expected that if futures are present then the queue is empty,
if this is not true
+ // then there is a bug
+ Preconditions.checkState(jobQueue.isEmpty());
+ if (future.complete(new CompactionJobQueues.MetaJob(job,
tabletMetadata))) {
+ // successfully completed a future with this job, so do not need to
queue the job
+ jobsAdded++;
+ continue outer;
+ } // else the future was canceled or timed out so could not complete it
+ future = futures.poll();
+ }
+
CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
if (cjqpKey != null) {
checkState(newEntries.add(cjqpKey));
@@ -227,25 +234,25 @@ public class CompactionJobPriorityQueue {
return first == null ? null : first.getValue();
}
+ public synchronized CompletableFuture<CompactionJobQueues.MetaJob>
getAsync() {
+ var job = jobQueue.pollFirstEntry();
+ if (job != null) {
+ return CompletableFuture.completedFuture(job.getValue());
+ }
+
+ // There is currently nothing in the queue, so create an uncompleted
future and queue it up to
+ // be completed when something does arrive.
+ CompletableFuture<CompactionJobQueues.MetaJob> future = new
CompletableFuture<>();
+ futures.add(future);
+ return future;
+ }
+
// exists for tests
synchronized CompactionJobQueues.MetaJob peek() {
var firstEntry = jobQueue.firstEntry();
return firstEntry == null ? null : firstEntry.getValue();
}
- public boolean isClosed() {
- return closed.get();
- }
-
- public synchronized boolean closeIfEmpty() {
- if (jobQueue.isEmpty()) {
- closed.set(true);
- return true;
- }
-
- return false;
- }
-
private void removePreviousSubmissions(KeyExtent extent) {
TabletJobs prevJobs = tabletJobs.get(extent);
if (prevJobs != null) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 8c46227357..b9fe1ed424 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,8 +35,6 @@ import
org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
public class CompactionJobQueues {
private static final Logger log =
LoggerFactory.getLogger(CompactionJobQueues.class);
@@ -157,23 +156,25 @@ public class CompactionJobQueues {
}
}
+ /**
+ * Asynchronously get a compaction job from the queue. If the queue
currently has jobs then a
+ * completed future will be returned containing the highest priority job in
the queue. If the
+ * queue is currently empty, then an uncompleted future will be returned and
later when something
+ * is added to the queue the future will be completed.
+ */
+ public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
+ var pq = priorityQueues.computeIfAbsent(groupId,
+ gid -> new CompactionJobPriorityQueue(gid, queueSize));
+ return pq.getAsync();
+ }
+
public MetaJob poll(CompactorGroupId groupId) {
var prioQ = priorityQueues.get(groupId);
if (prioQ == null) {
return null;
}
- MetaJob mj = prioQ.poll();
-
- if (mj == null) {
- priorityQueues.computeIfPresent(groupId, (eid, pq) -> {
- if (pq.closeIfEmpty()) {
- return null;
- } else {
- return pq;
- }
- });
- }
- return mj;
+
+ return prioQ.poll();
}
private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
@@ -187,14 +188,7 @@ public class CompactionJobQueues {
var pq = priorityQueues.computeIfAbsent(groupId,
gid -> new CompactionJobPriorityQueue(gid, queueSize));
- while (pq.add(tabletMetadata, jobs,
-
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
- // When entering this loop its expected the queue is closed
- Preconditions.checkState(pq.isClosed());
- // This loop handles race condition where poll() closes empty priority
queues. The queue could
- // be closed after its obtained from the map and before add is called.
- pq = priorityQueues.computeIfAbsent(groupId,
- gid -> new CompactionJobPriorityQueue(gid, queueSize));
- }
+ pq.add(tabletMetadata, jobs,
+
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
}
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 2e090c32a2..ddf9a3016e 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -19,9 +19,7 @@
package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashSet;
import java.util.List;
@@ -209,58 +207,6 @@ public class CompactionJobPriorityQueueTest {
}
- @Test
- public void testAddAfterClose() {
-
- CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
- CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
- CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
- CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
-
- KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new
Text("a"));
- TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
- EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
-
- CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
- EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
- EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
- EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
-
- CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
- EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
- EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
- EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3,
file4)).anyTimes();
-
- EasyMock.replay(tm, cj1, cj2);
-
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
- assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
-
- assertFalse(queue.closeIfEmpty());
-
- EasyMock.verify(tm, cj1, cj2);
-
- assertEquals(5L, queue.getLowestPriority());
- assertEquals(2, queue.getMaxSize());
- assertEquals(0, queue.getDequeuedJobs());
- assertEquals(0, queue.getRejectedJobs());
- assertEquals(2, queue.getQueuedJobs());
- MetaJob job = queue.poll();
- assertEquals(cj1, job.getJob());
- assertEquals(tm, job.getTabletMetadata());
- assertEquals(1, queue.getDequeuedJobs());
-
- MetaJob job2 = queue.poll();
- assertEquals(cj2, job2.getJob());
- assertEquals(tm, job2.getTabletMetadata());
- assertEquals(2, queue.getDequeuedJobs());
-
- assertTrue(queue.closeIfEmpty());
-
- assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));
-
- }
-
private static int counter = 1;
private Pair<TabletMetadata,CompactionJob> createJob() {
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 73aa404295..a9f360b4cd 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,7 +19,9 @@
package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.URI;
import java.net.URISyntaxException;
@@ -332,4 +334,57 @@ public class CompactionJobQueuesTest {
// The background threads should have seen every job that was added
assertEquals(numToAdd, totalSeen);
}
+
+ @Test
+ public void testGetAsync() throws Exception {
+ CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+ var tid = TableId.of("1");
+ var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+ var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+ var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
+ var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
+
+ var tm1 = TabletMetadata.builder(extent1).build();
+ var tm2 = TabletMetadata.builder(extent2).build();
+ var tm3 = TabletMetadata.builder(extent3).build();
+ var tm4 = TabletMetadata.builder(extent4).build();
+
+ var cg1 = CompactorGroupId.of("CG1");
+
+ var future1 = jobQueues.getAsync(cg1);
+ var future2 = jobQueues.getAsync(cg1);
+
+ assertFalse(future1.isDone());
+ assertFalse(future2.isDone());
+
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+ jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+ jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+ var future3 = jobQueues.getAsync(cg1);
+ var future4 = jobQueues.getAsync(cg1);
+
+ assertTrue(future1.isDone());
+ assertTrue(future2.isDone());
+ assertTrue(future3.isDone());
+ assertTrue(future4.isDone());
+
+ assertEquals(extent1, future1.get().getTabletMetadata().getExtent());
+ assertEquals(extent2, future2.get().getTabletMetadata().getExtent());
+ assertEquals(extent4, future3.get().getTabletMetadata().getExtent());
+ assertEquals(extent3, future4.get().getTabletMetadata().getExtent());
+
+ // test cancelling a future
+ var future5 = jobQueues.getAsync(cg1);
+ assertFalse(future5.isDone());
+ future5.cancel(false);
+ var future6 = jobQueues.getAsync(cg1);
+ assertFalse(future6.isDone());
+ // since future5 was canceled, this addition should go to future6
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ assertTrue(future6.isDone());
+ assertEquals(extent1, future6.get().getTabletMetadata().getExtent());
+ }
}