This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 07e781c80f Added runtime check for shared compaction queues (#4800)
07e781c80f is described below
commit 07e781c80f549459ce64cbc1e130dbe615db43b7
Author: Dave Marion <[email protected]>
AuthorDate: Mon Aug 19 07:49:18 2024 -0400
Added runtime check for shared compaction queues (#4800)
Closes #4034
---
.../server/conf/CheckCompactionConfig.java | 28 ++++++++++++++--
.../server/conf/CheckCompactionConfigTest.java | 37 ++++++++++++++++------
.../accumulo/manager/TabletGroupWatcher.java | 13 +++++++-
3 files changed, 65 insertions(+), 13 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
index fb278fbfe5..d9c4ebcc50 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
@@ -22,6 +22,10 @@ import static
org.apache.accumulo.core.Constants.DEFAULT_COMPACTION_SERVICE_NAME
import java.io.FileNotFoundException;
import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.cli.Help;
@@ -31,6 +35,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
@@ -104,6 +109,7 @@ public class CheckCompactionConfig implements
KeywordExecutable {
return;
}
+ Map<CompactorGroupId,Set<String>> groupToServices = new HashMap<>();
for (var entry : servicesConfig.getPlanners().entrySet()) {
String serviceId = entry.getKey();
String plannerClassName = entry.getValue();
@@ -120,9 +126,25 @@ public class CheckCompactionConfig implements
KeywordExecutable {
planner.init(initParams);
- initParams.getRequestedGroups().forEach(
- (groupId -> log.info("Compaction service '{}' requested with
compactor group '{}'",
- serviceId, groupId)));
+ initParams.getRequestedGroups().forEach(groupId -> {
+ log.info("Compaction service '{}' requested with compactor group
'{}'", serviceId, groupId);
+ groupToServices.computeIfAbsent(groupId, f -> new
HashSet<>()).add(serviceId);
+ });
+ }
+
+ boolean dupesFound = false;
+ for (Entry<CompactorGroupId,Set<String>> e : groupToServices.entrySet()) {
+ if (e.getValue().size() > 1) {
+ log.warn("Compaction services " + e.getValue().toString()
+ + " mapped to the same compactor group: " + e.getKey());
+ dupesFound = true;
+ }
+ }
+
+ if (dupesFound) {
+ throw new IllegalStateException(
+ "Multiple compaction services configured to use the same group. This
could lead"
+ + " to undesired behavior. Please fix the configuration");
}
log.info("Properties file has passed all checks.");
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
index 9bb7cc163f..7404380a78 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
@@ -65,12 +65,12 @@ public class CheckCompactionConfigTest extends
WithTestNames {
String inputString = ("compaction.service.cs1.planner="
+ "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ "compaction.service.cs1.planner.opts.groups=\\\n"
- +
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
- + "{'group':'large'}] \ncompaction.service.cs2.planner="
+ +
"[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n"
+ + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner="
+ "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ "compaction.service.cs2.planner.opts.groups=\\\n"
- +
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
- + "{'group':'large'}]").replaceAll("'", "\"");
+ +
"[{'group':'cs2_small','maxSize':'16M'},{'group':'cs2_medium','maxSize':'128M'},\\\n"
+ + "{'group':'cs2_large'}]").replaceAll("'", "\"");
String filePath = writeToFileAndReturnPath(inputString);
@@ -82,15 +82,15 @@ public class CheckCompactionConfigTest extends
WithTestNames {
String inputString = ("compaction.service.cs1.planner="
+ "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ "compaction.service.cs1.planner.opts.groups=\\\n"
- +
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
- + "{'group':'large'}] \ncompaction.service.cs2.planner="
+ +
"[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n"
+ + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner="
+ "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ "compaction.service.cs2.planner.opts.groups=\\\n"
- + "[{'group':'small','maxSize':'16M'},
{'group':'medium','maxSize':'128M'},\\\n"
- + "{'group':'large'}] \ncompaction.service.cs3.planner="
+ + "[{'group':'cs2_small','maxSize':'16M'},
{'group':'cs2_medium','maxSize':'128M'},\\\n"
+ + "{'group':'cs2_large'}] \ncompaction.service.cs3.planner="
+ "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ "compaction.service.cs3.planner.opts.groups=\\\n"
- +
"[{'group':'small','maxSize':'16M'},{'group':'large'}]").replaceAll("'", "\"");
+ +
"[{'group':'cs3_small','maxSize':'16M'},{'group':'cs3_large'}]").replaceAll("'",
"\"");
String filePath = writeToFileAndReturnPath(inputString);
CheckCompactionConfig.main(new String[] {filePath});
@@ -178,4 +178,23 @@ public class CheckCompactionConfigTest extends
WithTestNames {
log.info("Wrote to path: {}\nWith string:\n{}", file.getAbsolutePath(),
inputString);
return file.getAbsolutePath();
}
+
+ @Test
+ public void testGroupReuse() throws Exception {
+ String inputString = ("compaction.service.cs1.planner="
+ + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ + "compaction.service.cs1.planner.opts.groups=\\\n"
+ +
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
+ + "{'group':'large'}] \ncompaction.service.cs2.planner="
+ + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner
\n"
+ + "compaction.service.cs2.planner.opts.groups=\\\n"
+ +
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
+ + "{'group':'large'}]").replaceAll("'", "\"");
+
+ String filePath = writeToFileAndReturnPath(inputString);
+
+ assertThrows(IllegalStateException.class,
+ () -> CheckCompactionConfig.main(new String[] {filePath}));
+ }
+
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index d621b65a46..3b77816a1e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -81,6 +81,7 @@ import org.apache.accumulo.manager.state.TableStats;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionJobGenerator;
+import org.apache.accumulo.server.conf.CheckCompactionConfig;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.log.WalStateManager;
@@ -440,6 +441,16 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
new CompactionJobGenerator(new
ServiceEnvironmentImpl(manager.getContext()),
tableMgmtParams.getCompactionHints(),
tableMgmtParams.getSteadyTime());
+ try {
+ CheckCompactionConfig.validate(manager.getConfiguration());
+ } catch (SecurityException | IllegalArgumentException |
IllegalStateException
+ | ReflectiveOperationException e) {
+ LOG.error(
+ "Error validating compaction configuration, all compactions are
paused until the configuration is fixed.",
+ e);
+ compactionGenerator = null;
+ }
+
Set<TServerInstance> filteredServersToShutdown =
new HashSet<>(tableMgmtParams.getServersToShutdown());
@@ -588,7 +599,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
manager.getSplitter().initiateSplit(new SeedSplitTask(manager,
tm.getExtent()));
}
- if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
+ if (actions.contains(ManagementAction.NEEDS_COMPACTING) &&
compactionGenerator != null) {
var jobs = compactionGenerator.generateJobs(tm,
TabletManagementIterator.determineCompactionKinds(actions));
LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(),
jobs.size());