This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new cbcf0a0568 Modifications to make compaction queue size dynamic (#4767)
cbcf0a0568 is described below
commit cbcf0a0568194e39ba0e798b6317ddd916af0f24
Author: Dave Marion <[email protected]>
AuthorDate: Mon Jul 29 17:11:38 2024 -0400
Modifications to make compaction queue size dynamic (#4767)
Modified the logic in CompactionCoordinator to clear the
priority queue for a compactor group when there are no
compactors remaining or to set the max size based on
the number of remaining compactors multiplied by some
factor set by the user.
Closes #3635
---
.../org/apache/accumulo/core/conf/Property.java | 14 ++++--
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 4 +-
.../java/org/apache/accumulo/manager/Manager.java | 3 +-
.../coordinator/CompactionCoordinator.java | 54 +++++++++++++++-------
.../queue/CompactionJobPriorityQueue.java | 20 ++++++--
.../compaction/CompactionCoordinatorTest.java | 2 +-
.../queue/CompactionJobPriorityQueueTest.java | 9 ++++
.../CompactionPriorityQueueMetricsIT.java | 2 +-
8 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ce425a8940..cad729af32 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -445,9 +445,14 @@ public enum Property {
MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size",
"8", PropertyType.COUNT,
"The number of threads used to inspect tablets files to find split
points.", "4.0.0"),
-
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
- "10000", PropertyType.COUNT,
- "The max size of each resource groups compaction job priority queue.",
"4.0"),
+ MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
+ "manager.compaction.major.service.queue.initial.size", "10000",
PropertyType.COUNT,
+ "The initial size of each resource groups compaction job priority
queue.", "4.0.0"),
+ MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
+ "manager.compaction.major.service.queue.size.factor", "3.0",
PropertyType.FRACTION,
+ "The dynamic resizing of the compaction job priority queue is based on"
+ + " the number of compactors for the group multiplied by this
factor.",
+ "4.0.0"),
SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
"System wide properties related to splitting tablets.", "3.1.0"),
SPLIT_MAXOPEN("split.files.max", "300", PropertyType.COUNT,
@@ -1433,6 +1438,9 @@ public enum Property {
// max message options
RPC_MAX_MESSAGE_SIZE,
+ // compaction coordiantor properties
+ MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
+
// block cache options
TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE,
SSERV_SUMMARYCACHE_SIZE,
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index b5c6667519..5b81fac468 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -188,8 +188,8 @@ public class MiniAccumuloConfigImpl {
mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
-
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
-
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
+
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(),
+
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue());
mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0690e93afa..72c79121e5 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1108,8 +1108,7 @@ public class Manager extends AbstractServer
// Start the Manager's Fate Service
fateServiceHandler = new FateServiceHandler(this);
managerClientHandler = new ManagerClientServiceHandler(this);
- compactionCoordinator =
- new CompactionCoordinator(context, security, fateRefs,
getResourceGroup(), this);
+ compactionCoordinator = new CompactionCoordinator(context, security,
fateRefs, this);
// Start the Manager's Client service
// Ensure that calls before the manager gets the lock fail
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 34e9d9191b..92bc94f4ab 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -117,6 +117,7 @@ import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
import
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
import
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
@@ -167,7 +168,6 @@ public class CompactionCoordinator
private final ServerContext ctx;
private final SecurityOperation security;
- private final String resourceGroupName;
private final CompactionJobQueues jobQueues;
private final AtomicReference<Map<FateInstanceType,Fate<Manager>>>
fateInstances;
// Exposed for tests
@@ -184,18 +184,19 @@ public class CompactionCoordinator
private final Manager manager;
private final LoadingCache<String,Integer> compactorCounts;
+ private final int jobQueueInitialSize;
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
- AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
- final String resourceGroupName, Manager manager) {
+ AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
Manager manager) {
this.ctx = ctx;
this.schedExecutor = this.ctx.getScheduledExecutor();
this.security = security;
- this.resourceGroupName = resourceGroupName;
this.manager = Objects.requireNonNull(manager);
- this.jobQueues = new CompactionJobQueues(
-
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
+ this.jobQueueInitialSize = ctx.getConfiguration()
+
.getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE);
+
+ this.jobQueues = new CompactionJobQueues(jobQueueInitialSize);
this.queueMetrics = new QueueMetrics(jobQueues);
@@ -1067,27 +1068,46 @@ public class CompactionCoordinator
private void cleanUpCompactors() {
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() +
Constants.ZCOMPACTORS;
- var zoorw = this.ctx.getZooReaderWriter();
+ final var zoorw = this.ctx.getZooReaderWriter();
+ final double queueSizeFactor = ctx.getConfiguration()
+
.getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR);
try {
var groups = zoorw.getChildren(compactorQueuesPath);
for (String group : groups) {
- String qpath = compactorQueuesPath + "/" + group;
-
- var compactors = zoorw.getChildren(qpath);
+ final String qpath = compactorQueuesPath + "/" + group;
+ final CompactorGroupId cgid = CompactorGroupId.of(group);
+ final var compactors = zoorw.getChildren(qpath);
if (compactors.isEmpty()) {
deleteEmpty(zoorw, qpath);
- }
-
- for (String compactor : compactors) {
- String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
- var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group
+ "/" + compactor);
- if (lockNodes.isEmpty()) {
- deleteEmpty(zoorw, cpath);
+ // Group has no compactors, we can clear its
+ // associated priority queue of jobs
+ CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
+ if (queue != null) {
+ queue.clear();
+ queue.setMaxSize(this.jobQueueInitialSize);
}
+ } else {
+ int aliveCompactorsForGroup = 0;
+ for (String compactor : compactors) {
+ String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
+ var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" +
group + "/" + compactor);
+ if (lockNodes.isEmpty()) {
+ deleteEmpty(zoorw, cpath);
+ } else {
+ aliveCompactorsForGroup++;
+ }
+ }
+ CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
+ if (queue != null) {
+ queue.setMaxSize(
+ Math.min((int) (aliveCompactorsForGroup * queueSizeFactor),
Integer.MAX_VALUE));
+ }
+
}
+
}
} catch (KeeperException | RuntimeException e) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index c91b8becf1..9a4c726463 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -107,7 +108,7 @@ public class CompactionJobPriorityQueue {
// efficiently removing entries from anywhere in the queue. Efficient
removal is needed for the
// case where tablets decided to issues different compaction jobs than what
is currently queued.
private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
- private final int maxSize;
+ private AtomicInteger maxSize;
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>>
futures;
@@ -131,7 +132,7 @@ public class CompactionJobPriorityQueue {
public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
this.jobQueue = new TreeMap<>();
- this.maxSize = maxSize;
+ this.maxSize = new AtomicInteger(maxSize);
this.tabletJobs = new HashMap<>();
this.groupId = groupId;
this.rejectedJobs = new AtomicLong(0);
@@ -201,8 +202,12 @@ public class CompactionJobPriorityQueue {
return jobsAdded;
}
- public long getMaxSize() {
- return maxSize;
+ public synchronized int getMaxSize() {
+ return maxSize.get();
+ }
+
+ public synchronized void setMaxSize(int maxSize) {
+ this.maxSize.set(maxSize);
}
public long getRejectedJobs() {
@@ -284,7 +289,7 @@ public class CompactionJobPriorityQueue {
}
private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob
job) {
- if (jobQueue.size() >= maxSize) {
+ if (jobQueue.size() >= maxSize.get()) {
var lastEntry = jobQueue.lastKey();
if (job.getPriority() <= lastEntry.job.getPriority()) {
// the queue is full and this job has a lower or same priority than
the lowest job in the
@@ -304,4 +309,9 @@ public class CompactionJobPriorityQueue {
jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata));
return key;
}
+
+ public synchronized void clear() {
+ jobQueue.clear();
+ tabletJobs.clear();
+ }
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 5c7a913d91..58a592f036 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -130,7 +130,7 @@ public class CompactionCoordinatorTest {
public TestCoordinator(ServerContext ctx, SecurityOperation security,
List<RunningCompaction> runningCompactions, Manager manager) {
- super(ctx, security, fateInstances, "TEST_GROUP", manager);
+ super(ctx, security, fateInstances, manager);
this.runningCompactions = runningCompactions;
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 5464b90a33..05e0b35c55 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -300,4 +300,13 @@ public class CompactionJobPriorityQueueTest {
< 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD +
CANCEL_THRESHOLD));
assertTrue(maxFuturesSize > 2 *
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
}
+
+ @Test
+ public void testChangeMaxSize() {
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100);
+ assertEquals(100, queue.getMaxSize());
+ queue.setMaxSize(50);
+ assertEquals(50, queue.getMaxSize());
+ }
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 40f6ddc107..c2a86fa6bc 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -196,7 +196,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE +
".planner.opts.groups",
"[{'group':'" + QUEUE1 + "'}]");
- cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
"6");
+
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
"6");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);
// This test waits for dead compactors to be absent in zookeeper. The
following setting will