This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 632dbc2ed0 limits memory and cpu used by compaction reservation
request (#5185)
632dbc2ed0 is described below
commit 632dbc2ed0d7bfed30a2f9da7053257b07720962
Author: Keith Turner <[email protected]>
AuthorDate: Sun Dec 15 14:27:19 2024 -0500
limits memory and cpu used by compaction reservation request (#5185)
Added threads pools to execute compaction reservation request in order
to limit memory and cpu used by executing reservations. Request queued up
for the pool could still potentially use a lot of memory. Did two
things to control memory of things in the queue. First only allow a
compactor process to have one reservation processing at time. Second
made the data related to a resevation request a soft reference which
should allow it be garbage collected if memory gets low while it sitting
in the queue. Once the request starts executing it obtains a strong
refrence to the data so it can no longer be garbage collected.
fixes #5177
---
.../org/apache/accumulo/core/conf/Property.java | 12 ++
.../core/util/threads/ThreadPoolNames.java | 3 +
.../accumulo/core/util/threads/ThreadPools.java | 24 +++
.../coordinator/CompactionCoordinator.java | 211 +++++++++++++++------
4 files changed, 190 insertions(+), 60 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 63cbd397ac..77fa9c461a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1189,6 +1189,18 @@ public enum Property {
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null,
PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo
compaction coordinator server.",
"2.1.0"),
+
COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root",
+ "1", PropertyType.COUNT,
+ "The number of threads used to reserve files for compaction in a tablet
for the root tablet.",
+ "4.0.0"),
+
COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta",
+ "1", PropertyType.COUNT,
+ "The number of threads used to reserve files for compaction in a tablet
for accumulo.metadata tablets.",
+ "4.0.0"),
+
COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user",
+ "64", PropertyType.COUNT,
+ "The number of threads used to reserve files for compaction in a tablet
for user tables.",
+ "4.0.0"),
@Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m",
PropertyType.TIMEDURATION,
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index b360e41b6b..6c025a6815 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -34,6 +34,9 @@ public enum ThreadPoolNames {
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
+
COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"),
+
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
+
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 12e2567bdf..08d1d82989 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -22,6 +22,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
@@ -369,6 +372,27 @@ public class ThreadPools {
return builder.build();
case GC_DELETE_THREADS:
return
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
+ case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:
+ builder =
getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p))
+ .withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
+ case COMPACTION_COORDINATOR_RESERVATION_THREADS_META:
+ builder =
getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p))
+ .withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
+ case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER:
+ builder =
getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p))
+ .withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
default:
throw new IllegalArgumentException("Unhandled thread pool property: "
+ p);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 2640ed4bcc..6f2eca3756 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -31,6 +31,7 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.lang.ref.SoftReference;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,13 +44,17 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
@@ -193,6 +198,9 @@ public class CompactionCoordinator
private volatile long coordinatorStartTime;
+ private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
+ private final Set<String> activeCompactorReservationRequest =
ConcurrentHashMap.newKeySet();
+
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
Manager manager) {
this.ctx = ctx;
@@ -232,6 +240,18 @@ public class CompactionCoordinator
deadCompactionDetector =
new DeadCompactionDetector(this.ctx, this, schedExecutor,
fateInstances);
+ var rootReservationPool =
ThreadPools.getServerThreadPools().createExecutorService(
+ ctx.getConfiguration(),
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
+
+ var metaReservationPool =
ThreadPools.getServerThreadPools().createExecutorService(
+ ctx.getConfiguration(),
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true);
+
+ var userReservationPool =
ThreadPools.getServerThreadPools().createExecutorService(
+ ctx.getConfiguration(),
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true);
+
+ reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool,
Ample.DataLevel.METADATA,
+ metaReservationPool, Ample.DataLevel.USER, userReservationPool);
+
compactorCounts =
ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
.expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors);
// At this point the manager does not have its lock so no actions should
be taken yet
@@ -250,6 +270,9 @@ public class CompactionCoordinator
public void shutdown() {
shutdown.countDown();
+
+ reservationPools.values().forEach(ExecutorService::shutdownNow);
+
var localThread = serviceThread;
if (localThread != null) {
try {
@@ -528,82 +551,142 @@ public class CompactionCoordinator
}
- protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob
metaJob,
- String compactorAddress, ExternalCompactionId externalCompactionId) {
+ private class ReserveCompactionTask implements Supplier<CompactionMetadata> {
+
+ // Use a soft reference for this in case free memory gets low while this
is sitting in the queue
+ // waiting to process. This object can contain the tablets list of files
and if there are lots
+ // of tablet with lots of files then that could start to cause memory
problems. This hack could
+ // be removed if #5188 were implemented.
+ private final SoftReference<CompactionJobQueues.MetaJob> metaJobRef;
+ private final String compactorAddress;
+ private final ExternalCompactionId externalCompactionId;
+
+ private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String
compactorAddress,
+ ExternalCompactionId externalCompactionId) {
+ Preconditions.checkArgument(metaJob.getJob().getKind() ==
CompactionKind.SYSTEM
+ || metaJob.getJob().getKind() == CompactionKind.USER);
+ this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob));
+ this.compactorAddress = Objects.requireNonNull(compactorAddress);
+ this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
+
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress),
+ "compactor %s already on has a reservation in flight, cannot process
%s",
+ compactorAddress, externalCompactionId);
+ }
- Preconditions.checkArgument(metaJob.getJob().getKind() ==
CompactionKind.SYSTEM
- || metaJob.getJob().getKind() == CompactionKind.USER);
+ @Override
+ public CompactionMetadata get() {
+ try {
+ var metaJob = metaJobRef.get();
+ if (metaJob == null) {
+ LOG.warn("Compaction reservation request for {} {} was garbage
collected.",
+ compactorAddress, externalCompactionId);
+ return null;
+ }
- var tabletMetadata = metaJob.getTabletMetadata();
+ var tabletMetadata = metaJob.getTabletMetadata();
- var jobFiles =
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
- .collect(Collectors.toSet());
+ var jobFiles = metaJob.getJob().getFiles().stream()
+
.map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet());
- Retry retry =
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
-
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
- .logInterval(Duration.ofMinutes(3)).createRetry();
+ Retry retry =
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
+
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
+ .logInterval(Duration.ofMinutes(3)).createRetry();
- while (retry.canRetry()) {
- try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
- var extent = metaJob.getTabletMetadata().getExtent();
+ while (retry.canRetry()) {
+ try (var tabletsMutator =
ctx.getAmple().conditionallyMutateTablets()) {
+ var extent = metaJob.getTabletMetadata().getExtent();
- if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(),
jobFiles, ctx,
- manager.getSteadyTime())) {
- return null;
- }
+ if (!canReserveCompaction(tabletMetadata,
metaJob.getJob().getKind(), jobFiles, ctx,
+ manager.getSteadyTime())) {
+ return null;
+ }
- var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles,
tabletMetadata,
- compactorAddress, externalCompactionId);
-
- // any data that is read from the tablet to make a decision about if
it can compact or not
- // must be checked for changes in the conditional mutation.
- var tabletMutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
- .requireFiles(jobFiles).requireNotCompacting(jobFiles);
- if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
- // For system compactions the user compaction requested column is
examined when deciding
- // if a compaction can start so need to check for changes to this
column.
- tabletMutator.requireSame(tabletMetadata, SELECTED,
USER_COMPACTION_REQUESTED);
- } else {
- tabletMutator.requireSame(tabletMetadata, SELECTED);
- }
+ var ecm = createExternalCompactionMetadata(metaJob.getJob(),
jobFiles, tabletMetadata,
+ compactorAddress, externalCompactionId);
+
+ // any data that is read from the tablet to make a decision about
if it can compact or
+ // not
+ // must be checked for changes in the conditional mutation.
+ var tabletMutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
+ .requireFiles(jobFiles).requireNotCompacting(jobFiles);
+ if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+ // For system compactions the user compaction requested column
is examined when
+ // deciding
+ // if a compaction can start so need to check for changes to
this column.
+ tabletMutator.requireSame(tabletMetadata, SELECTED,
USER_COMPACTION_REQUESTED);
+ } else {
+ tabletMutator.requireSame(tabletMetadata, SELECTED);
+ }
- if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
- var selectedFiles = tabletMetadata.getSelectedFiles();
- var reserved = getFilesReservedBySelection(tabletMetadata,
manager.getSteadyTime(), ctx);
-
- // If there is a selectedFiles column, and the reserved set is empty
this means that
- // either no user jobs were completed yet or the selection
expiration time has passed
- // so the column is eligible to be deleted so a system job can run
instead
- if (selectedFiles != null && reserved.isEmpty()
- && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
- LOG.debug("Deleting user compaction selected files for {} {}",
extent,
- externalCompactionId);
- tabletMutator.deleteSelectedFiles();
- }
- }
+ if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+ var selectedFiles = tabletMetadata.getSelectedFiles();
+ var reserved =
+ getFilesReservedBySelection(tabletMetadata,
manager.getSteadyTime(), ctx);
+
+ // If there is a selectedFiles column, and the reserved set is
empty this means that
+ // either no user jobs were completed yet or the selection
expiration time has passed
+ // so the column is eligible to be deleted so a system job can
run instead
+ if (selectedFiles != null && reserved.isEmpty()
+ && !Collections.disjoint(jobFiles,
selectedFiles.getFiles())) {
+ LOG.debug("Deleting user compaction selected files for {} {}",
extent,
+ externalCompactionId);
+ tabletMutator.deleteSelectedFiles();
+ }
+ }
- tabletMutator.putExternalCompaction(externalCompactionId, ecm);
- tabletMutator.submit(tm ->
tm.getExternalCompactions().containsKey(externalCompactionId));
+ tabletMutator.putExternalCompaction(externalCompactionId, ecm);
+ tabletMutator
+ .submit(tm ->
tm.getExternalCompactions().containsKey(externalCompactionId));
- var result = tabletsMutator.process().get(extent);
+ var result = tabletsMutator.process().get(extent);
- if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
- return ecm;
- } else {
- tabletMetadata = result.readMetadata();
+ if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED)
{
+ return ecm;
+ } else {
+ tabletMetadata = result.readMetadata();
+ }
+ }
+
+ retry.useRetry();
+ try {
+ retry.waitForNextAttempt(LOG,
+ "Reserved compaction for " +
metaJob.getTabletMetadata().getExtent());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- retry.useRetry();
- try {
- retry.waitForNextAttempt(LOG,
- "Reserved compaction for " +
metaJob.getTabletMetadata().getExtent());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ return null;
+ } finally {
+
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress),
+ "compactorAddress:%s", compactorAddress);
}
}
+ }
+
+ protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob
metaJob,
+ String compactorAddress, ExternalCompactionId externalCompactionId) {
+
+ if (activeCompactorReservationRequest.contains(compactorAddress)) {
+ // In this case the compactor has a previously submitted reservation
request that is still
+ // processing. Do not want to let it queue up another reservation
request. One possible cause
+ // of this is that compactor timed out waiting for its last request to
process and is now
+ // making another request. The previously submitted request can not be
used because the
+ // compactor generates a new uuid for each request it makes. So the best
thing to do is to
+ // return null and wait for this situation to resolve. This will likely
happen when some part
+ // of the distributed system is not working well, so at this point want
to avoid making
+ // problems worse instead of trying to reserve a job.
+ LOG.warn(
+ "Ignoring request from {} to reserve compaction job because it has a
reservation request in progress.",
+ compactorAddress);
+ return null;
+ }
- return null;
+ var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId());
+ var future = CompletableFuture.supplyAsync(
+ new ReserveCompactionTask(metaJob, compactorAddress,
externalCompactionId),
+ reservationPools.get(dataLevel));
+ return future.join();
}
protected TExternalCompactionJob createThriftJob(String externalCompactionId,
@@ -1123,6 +1206,14 @@ public class CompactionCoordinator
// 5. Log compactors with no groups
// 6. Log groups with compactors and queued jos that have not checked in
+ var config = ctx.getConfiguration();
+ ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
+ Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT);
+ ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config,
+ Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META);
+ ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config,
+ Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER);
+
// grab a snapshot of the ids in the set before reading the metadata
table. This is done to
// avoid removing things that are added while reading the metadata.
final Set<ExternalCompactionId> idsSnapshot =
Set.copyOf(RUNNING_CACHE.keySet());