This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new f22e774ef6 Stop tracking last compactor check-in for non-existent
groups (#4403)
f22e774ef6 is described below
commit f22e774ef6544343ef9d467e8620161eae0f3824
Author: Dave Marion <[email protected]>
AuthorDate: Tue Aug 20 16:48:03 2024 -0400
Stop tracking last compactor check-in for non-existent groups (#4403)
Consolidated internal cleanup methods in the Coordinator.
Added logic to stop tracking compactor check-in times for
groups that have been removed.
---
.../coordinator/CompactionCoordinator.java | 220 +++++++++++++++------
.../coordinator/DeadCompactionDetector.java | 3 +-
.../compaction/CompactionCoordinatorTest.java | 16 +-
3 files changed, 169 insertions(+), 70 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 92bc94f4ab..9b31ca33f1 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -70,6 +70,7 @@ import
org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -98,6 +99,8 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
@@ -107,6 +110,8 @@ import
org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -120,6 +125,7 @@ import
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactio
import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
import org.apache.accumulo.server.compaction.CompactionPluginUtils;
import org.apache.accumulo.server.security.SecurityOperation;
@@ -163,7 +169,6 @@ public class CompactionCoordinator
new ConcurrentHashMap<>();
/* Map of group name to last time compactor called to get a compaction job */
- // ELASTICITY_TODO #4403 need to clean out groups that are no longer
configured..
private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new
ConcurrentHashMap<>();
private final ServerContext ctx;
@@ -186,6 +191,8 @@ public class CompactionCoordinator
private final LoadingCache<String,Integer> compactorCounts;
private final int jobQueueInitialSize;
+ private volatile long coordinatorStartTime;
+
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
Manager manager) {
this.ctx = ctx;
@@ -253,44 +260,23 @@ public class CompactionCoordinator
}
}
- protected void startCompactionCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
- ScheduledFuture<?> future =
- schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5,
TimeUnit.MINUTES);
+ protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
+ ScheduledFuture<?> future = schedExecutor
+ .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5,
TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}
- protected void startRunningCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
+ protected void startInternalStateCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
ScheduledFuture<?> future =
- schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5,
TimeUnit.MINUTES);
- ThreadPools.watchNonCriticalScheduledTask(future);
- }
-
- protected void startIdleCompactionWatcher() {
-
- ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
- getTServerCheckInterval(), getTServerCheckInterval(),
TimeUnit.MILLISECONDS);
+ schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5,
TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}
- private void idleCompactionWarning() {
-
- long now = System.currentTimeMillis();
- Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
- TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
- if ((now - lastCheckTime) > getMissingCompactorWarningTime()
- && jobQueues.getQueuedJobs(groupName) > 0
- && idleCompactors.containsKey(groupName.canonical())) {
- LOG.warn("No compactors have checked in with coordinator for group {}
in {}ms", groupName,
- getMissingCompactorWarningTime());
- }
- });
-
- }
-
@Override
public void run() {
- startCompactionCleaner(schedExecutor);
- startRunningCleaner(schedExecutor);
+
+ this.coordinatorStartTime = System.currentTimeMillis();
+ startCompactorZKCleaner(schedExecutor);
// On a re-start of the coordinator it's possible that external
compactions are in-progress.
// Attempt to get the running compactions on the compactors and then
resolve which tserver
@@ -312,8 +298,7 @@ public class CompactionCoordinator
}
startDeadCompactionDetector();
-
- startIdleCompactionWatcher();
+ startInternalStateCleaner(schedExecutor);
try {
shutdown.await();
@@ -324,13 +309,14 @@ public class CompactionCoordinator
LOG.info("Shutting down");
}
- private Map<String,Set<HostAndPort>> getIdleCompactors() {
+ private Map<String,Set<HostAndPort>>
+ getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) {
- Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
- ExternalCompactionUtil.getCompactorAddrs(ctx)
+ final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
+ runningCompactors
.forEach((group, compactorList) -> allCompactors.put(group, new
HashSet<>(compactorList)));
- Set<String> emptyQueues = new HashSet<>();
+ final Set<String> emptyQueues = new HashSet<>();
// Remove all of the compactors that are running a compaction
RUNNING_CACHE.values().forEach(rc -> {
@@ -939,30 +925,6 @@ public class CompactionCoordinator
}
}
- /**
- * The RUNNING_CACHE set may contain external compactions that are not
actually running. This
- * method periodically cleans those up.
- */
- public void cleanUpRunning() {
-
- // grab a snapshot of the ids in the set before reading the metadata
table. This is done to
- // avoid removing things that are added while reading the metadata.
- Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
-
- // grab the ids that are listed as running in the metadata table. It
important that this is done
- // after getting the snapshot.
- Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
-
- var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
-
- // remove ids that are in the running set but not in the metadata table
- idsToRemove.forEach(this::recordCompletion);
-
- if (idsToRemove.size() > 0) {
- LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
- }
- }
-
/**
* Return information about running compactions
*
@@ -1049,6 +1011,11 @@ public class CompactionCoordinator
return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
}
+ /* Method exists to be overridden in test to hide static method */
+ protected Map<String,Set<HostAndPort>> getRunningCompactors() {
+ return ExternalCompactionUtil.getCompactorAddrs(this.ctx);
+ }
+
/* Method exists to be overridden in test to hide static method */
protected void cancelCompactionOnCompactor(String address, String
externalCompactionId) {
HostAndPort hostPort = HostAndPort.fromString(address);
@@ -1065,7 +1032,7 @@ public class CompactionCoordinator
}
}
- private void cleanUpCompactors() {
+ private void cleanUpEmptyCompactorPathInZK() {
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() +
Constants.ZCOMPACTORS;
final var zoorw = this.ctx.getZooReaderWriter();
@@ -1118,6 +1085,137 @@ public class CompactionCoordinator
}
}
+ private Set<CompactorGroupId> getCompactionServicesConfigurationGroups()
+ throws ReflectiveOperationException, IllegalArgumentException,
SecurityException {
+
+ Set<CompactorGroupId> groups = new HashSet<>();
+ AccumuloConfiguration config = ctx.getConfiguration();
+ CompactionServicesConfig servicesConfig = new
CompactionServicesConfig(config);
+
+ for (var entry : servicesConfig.getPlanners().entrySet()) {
+ String serviceId = entry.getKey();
+ String plannerClassName = entry.getValue();
+
+ Class<? extends CompactionPlanner> plannerClass =
+ Class.forName(plannerClassName).asSubclass(CompactionPlanner.class);
+ CompactionPlanner planner =
plannerClass.getDeclaredConstructor().newInstance();
+
+ var initParams = new
CompactionPlannerInitParams(CompactionServiceId.of(serviceId),
+ servicesConfig.getPlannerPrefix(serviceId),
servicesConfig.getOptions().get(serviceId),
+ new ServiceEnvironmentImpl(ctx));
+
+ planner.init(initParams);
+
+ groups.addAll(initParams.getRequestedGroups());
+ }
+ return groups;
+ }
+
+ public void cleanUpInternalState() {
+
+ // This method does the following:
+ //
+ // 1. Removes entries from RUNNING_CACHE that are not really running
+ // 2. Cancels running compactions for groups that are not in the current
configuration
+ // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
+ // 4. Log groups with no compactors
+ // 5. Log compactors with no groups
+ // 6. Log groups with compactors and queued jos that have not checked in
+
+ // grab a snapshot of the ids in the set before reading the metadata
table. This is done to
+ // avoid removing things that are added while reading the metadata.
+ final Set<ExternalCompactionId> idsSnapshot =
Set.copyOf(RUNNING_CACHE.keySet());
+
+ // grab the ids that are listed as running in the metadata table. It
important that this is done
+ // after getting the snapshot.
+ final Set<ExternalCompactionId> idsInMetadata =
readExternalCompactionIds();
+
+ final Set<ExternalCompactionId> idsToRemove = Sets.difference(idsSnapshot,
idsInMetadata);
+
+ // remove ids that are in the running set but not in the metadata table
+ idsToRemove.forEach(this::recordCompletion);
+ if (idsToRemove.size() > 0) {
+ LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
+ }
+
+ // Get the set of groups being referenced in the current configuration
+ Set<CompactorGroupId> groupsInConfiguration = null;
+ try {
+ groupsInConfiguration = getCompactionServicesConfigurationGroups();
+ } catch (RuntimeException | ReflectiveOperationException e) {
+ LOG.error(
+ "Error getting groups from the compaction services configuration.
Unable to clean up internal state.",
+ e);
+ return;
+ }
+
+ // Compaction jobs are created in the TabletGroupWatcher and added to the
Coordinator
+ // via the addJobs method which adds the job to the CompactionJobQueues
object.
+ final Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds();
+
+ final Set<CompactorGroupId> jobGroupsNotInConfiguration =
+ Sets.difference(groupsWithJobs, groupsInConfiguration);
+
+ if (jobGroupsNotInConfiguration != null &&
!jobGroupsNotInConfiguration.isEmpty()) {
+ RUNNING_CACHE.values().forEach(rc -> {
+ if
(jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) {
+ LOG.warn(
+ "External compaction {} running in group {} on compactor {},"
+ + " but group not found in current configuration. Failing
compaction...",
+ rc.getJob().getExternalCompactionId(), rc.getGroupName(),
rc.getCompactorAddress());
+ cancelCompactionOnCompactor(rc.getCompactorAddress(),
+ rc.getJob().getExternalCompactionId());
+ }
+ });
+
+ final Set<CompactorGroupId> trackedGroups =
Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet());
+ TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration);
+ LOG.debug("No longer tracking compactor check-in times for groups: {}",
+ Sets.difference(trackedGroups,
TIME_COMPACTOR_LAST_CHECKED.keySet()));
+ }
+
+ final Map<String,Set<HostAndPort>> runningCompactors =
getRunningCompactors();
+
+ final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
+ runningCompactors.keySet()
+ .forEach(group ->
runningCompactorGroups.add(CompactorGroupId.of(group)));
+
+ final Set<CompactorGroupId> groupsWithNoCompactors =
+ Sets.difference(groupsInConfiguration, runningCompactorGroups);
+ if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
+ for (CompactorGroupId group : groupsWithNoCompactors) {
+ long queuedJobCount = jobQueues.getQueuedJobs(group);
+ if (queuedJobCount > 0) {
+ LOG.warn("Compactor group {} has {} queued compactions but no
running compactors", group,
+ queuedJobCount);
+ }
+ }
+ }
+
+ final Set<CompactorGroupId> compactorsWithNoGroups =
+ Sets.difference(runningCompactorGroups, groupsInConfiguration);
+ if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
+ LOG.warn(
+ "The following groups have running compactors, but are not in the
current configuration: {}",
+ compactorsWithNoGroups);
+ }
+
+ final long now = System.currentTimeMillis();
+ final long warningTime = getMissingCompactorWarningTime();
+ Map<String,Set<HostAndPort>> idleCompactors =
getIdleCompactors(runningCompactors);
+ for (CompactorGroupId groupName : groupsInConfiguration) {
+ long lastCheckTime =
+ TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName,
coordinatorStartTime);
+ if ((now - lastCheckTime) > warningTime &&
jobQueues.getQueuedJobs(groupName) > 0
+ && idleCompactors.containsKey(groupName.canonical())) {
+ LOG.warn(
+ "The group {} has queued jobs and {} idle compactors, however none
have checked in "
+ + "with coordinator for {}ms",
+ groupName, idleCompactors.get(groupName.canonical()).size(),
warningTime);
+ }
+ }
+ }
+
private static Set<StoredTabletFile>
getFilesReservedBySelection(TabletMetadata tabletMetadata,
SteadyTime steadyTime, ServerContext ctx) {
if (tabletMetadata.getSelectedFiles() == null) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index bf702b0db7..7dbf45a3d6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -230,8 +230,7 @@ public class DeadCompactionDetector {
this.deadCompactions.keySet().removeAll(toFail);
}
- // Find and delete any known tables that have unreferenced
- // compaction tmp files.
+ // Find and delete compaction tmp files that are unreferenced
if (!tablesWithUnreferencedTmpFiles.isEmpty()) {
Set<TableId> copy = new HashSet<>();
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 58a592f036..90c100aa72 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -148,13 +148,10 @@ public class CompactionCoordinatorTest {
}
@Override
- protected void startCompactionCleaner(ScheduledThreadPoolExecutor
schedExecutor) {}
+ protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor
schedExecutor) {}
@Override
- protected void startRunningCleaner(ScheduledThreadPoolExecutor
schedExecutor) {}
-
- @Override
- protected void startIdleCompactionWatcher() {
+ protected void startInternalStateCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
// This is called from CompactionCoordinator.run(). Counting down
// the latch will exit the run method
this.shutdown.countDown();
@@ -196,6 +193,11 @@ public class CompactionCoordinatorTest {
return runningCompactions;
}
+ @Override
+ protected Map<String,Set<HostAndPort>> getRunningCompactors() {
+ return Map.of();
+ }
+
@Override
protected CompactionMetadata reserveCompaction(MetaJob metaJob, String
compactorAddress,
ExternalCompactionId externalCompactionId) {
@@ -434,13 +436,13 @@ public class CompactionCoordinatorTest {
coordinator.getRunning().put(ecid2, new RunningCompaction(new
TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new
TExternalCompaction()));
- coordinator.cleanUpRunning();
+ coordinator.cleanUpInternalState();
assertEquals(Set.of(ecid1, ecid2, ecid3),
coordinator.getRunning().keySet());
coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
- coordinator.cleanUpRunning();
+ coordinator.cleanUpInternalState();
assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());