This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b90ca95323 Adds constants for threadPool Names (#4768)
b90ca95323 is described below
commit b90ca953239ac6def31f9ae508b48e51b610bcf2
Author: Daniel Roberts <[email protected]>
AuthorDate: Sun Jul 28 18:16:48 2024 -0400
Adds constants for threadPool Names (#4768)
- updates pool prefix so that pools report as accumulo.pool....
- change names that had names with spaces to use dot notation.
- removed metrics from pools where no user provided properties are used
- Create constants for thread pool names to help with metric names and
ease of troubleshooting
Adds constants for the following pools:
* Bulk Import Threads
* Compaction Service Planner
* Tablet wal creator
* Scan server tablet metadata cache
* Manager upgrade metadata
* Compaction Coordinator summary gatherer
* Utility check pools
---------
Co-authored-by: Ed Coleman <[email protected]>
Co-authored-by: Ed <[email protected]>
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../accumulo/core/clientImpl/ClientContext.java | 10 +--
.../core/clientImpl/ConditionalWriterImpl.java | 3 +-
.../core/clientImpl/InstanceOperationsImpl.java | 3 +-
.../core/clientImpl/TableOperationsImpl.java | 3 +-
.../core/clientImpl/TabletServerBatchReader.java | 8 ++-
.../core/clientImpl/TabletServerBatchWriter.java | 8 ++-
.../accumulo/core/clientImpl/bulk/BulkImport.java | 12 ++--
.../accumulo/core/file/BloomFilterLayer.java | 3 +-
.../util/compaction/ExternalCompactionUtil.java | 6 +-
.../core/util/threads/ThreadPoolNames.java | 78 +++++++++++++++++++++
.../accumulo/core/util/threads/ThreadPools.java | 79 ++++++++++++++++------
.../core/file/rfile/MultiThreadedRFileTest.java | 2 +-
.../threads/ThreadPoolExecutorBuilderTest.java | 14 ++--
.../accumulo/server/client/BulkImporter.java | 13 ++--
.../conf/store/impl/PropCacheCaffeineImpl.java | 4 +-
.../server/conf/store/impl/PropStoreWatcher.java | 2 +-
.../accumulo/server/problems/ProblemReports.java | 2 +-
.../apache/accumulo/server/rpc/TServerUtils.java | 12 ++--
.../server/util/RemoveEntriesForMissingFiles.java | 6 +-
.../server/util/VerifyTabletAssignments.java | 6 +-
.../server/conf/store/impl/ReadyMonitorTest.java | 2 +-
.../coordinator/CompactionCoordinator.java | 3 +-
.../accumulo/coordinator/CompactionFinalizer.java | 6 +-
.../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +-
.../tableOps/tableImport/MoveExportedFiles.java | 4 +-
.../manager/upgrade/UpgradeCoordinator.java | 6 +-
.../org/apache/accumulo/tserver/ScanServer.java | 5 +-
.../tserver/TabletServerResourceManager.java | 60 +++++++++-------
.../tserver/compactions/CompactionService.java | 6 +-
.../compactions/InternalCompactionExecutor.java | 8 ++-
.../org/apache/accumulo/tserver/log/LogSorter.java | 3 +-
.../accumulo/tserver/log/TabletServerLogger.java | 5 +-
.../accumulo/test/BalanceWithOfflineTableIT.java | 4 +-
.../test/functional/BatchWriterFlushIT.java | 2 +-
34 files changed, 281 insertions(+), 111 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 7c6ba76549..204665d0fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -26,6 +26,8 @@ import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCANNER_READ_AHEAD_POOL;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URL;
@@ -257,9 +259,9 @@ public class ClientContext implements AccumuloClient {
submitScannerReadAheadTask(Callable<List<KeyValue>> c) {
ensureOpen();
if (scannerReadaheadPool == null) {
- scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo
scanner read ahead thread")
+ scannerReadaheadPool =
clientThreadPools.getPoolBuilder(SCANNER_READ_AHEAD_POOL)
.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L,
SECONDS)
- .withQueue(new
SynchronousQueue<>()).enableThreadPoolMetrics().build();
+ .withQueue(new SynchronousQueue<>()).build();
}
return scannerReadaheadPool.submit(c);
}
@@ -267,8 +269,8 @@ public class ClientContext implements AccumuloClient {
public synchronized void executeCleanupTask(Runnable r) {
ensureOpen();
if (cleanupThreadPool == null) {
- cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer
Cleanup Thread")
- .numCoreThreads(1).withTimeOut(3L,
SECONDS).enableThreadPoolMetrics().build();
+ cleanupThreadPool =
clientThreadPools.getPoolBuilder(CONDITIONAL_WRITER_CLEANUP_POOL)
+ .numCoreThreads(1).withTimeOut(3L, SECONDS).build();
}
this.cleanupThreadPool.execute(r);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index cb7675196c..ceaf6901a5 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_POOL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -379,7 +380,7 @@ public class ConditionalWriterImpl implements
ConditionalWriter {
this.auths = config.getAuthorizations();
this.ve = new VisibilityEvaluator(config.getAuthorizations());
this.threadPool = context.threadPools().createScheduledExecutorService(
- config.getMaxWriteThreads(), this.getClass().getSimpleName());
+ config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName);
this.locator = new SyncingTabletLocator(context, tableId);
this.serverQueues = new HashMap<>();
this.tableId = tableId;
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 084d59ef11..8c73dab8e5 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -28,6 +28,7 @@ import static
org.apache.accumulo.core.rpc.ThriftUtil.createClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport;
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL;
import java.util.ArrayList;
import java.util.Collections;
@@ -301,7 +302,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
List<String> tservers = getTabletServers();
int numThreads = Math.max(4, Math.min((tservers.size() +
compactors.size()) / 10, 256));
- var executorService =
context.threadPools().getPoolBuilder("getactivecompactions")
+ var executorService =
context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL)
.numCoreThreads(numThreads).build();
try {
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 129cb6a681..c3b51237b6 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -30,6 +30,7 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME;
+import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@@ -496,7 +497,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
AtomicReference<Exception> exception = new AtomicReference<>(null);
ExecutorService executor =
-
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
+
context.threadPools().getPoolBuilder(SPLIT_POOL).numCoreThreads(16).build();
try {
executor.execute(
new SplitTask(new SplitEnv(tableName, tableId, executor, latch,
exception), splits));
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index 23f40e9be3..8b149da57f 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.clientImpl;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
import java.lang.ref.Cleaner.Cleanable;
import java.util.ArrayList;
@@ -71,9 +72,10 @@ public class TabletServerBatchReader extends ScannerOptions
implements BatchScan
this.tableName = tableName;
this.numThreads = numQueryThreads;
- queryThreadPool =
- context.threadPools().getPoolBuilder("batch scanner " +
batchReaderInstance + "-")
- .numCoreThreads(numQueryThreads).build();
+ queryThreadPool = context.threadPools()
+ .getPoolBuilder(
+ ACCUMULO_POOL_PREFIX.poolName + ".client.batch.scanner." +
batchReaderInstance)
+ .numCoreThreads(numQueryThreads).build();
// Call shutdown on this thread pool in case the caller does not call
close().
cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool,
closed, log);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 980ba0408a..a20e3ba9c5 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_BIN_MUTATIONS_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_SEND_POOL;
import java.io.IOException;
import java.lang.management.CompilationMXBean;
@@ -672,11 +674,11 @@ public class TabletServerBatchWriter implements
AutoCloseable {
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<>();
queued = new HashSet<>();
- sendThreadPool =
context.threadPools().getPoolBuilder(this.getClass().getName())
+ sendThreadPool =
context.threadPools().getPoolBuilder(BATCH_WRITER_SEND_POOL)
.numCoreThreads(numSendThreads).build();
locators = new HashMap<>();
- binningThreadPool =
context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1)
- .withQueue(new SynchronousQueue<>()).build();
+ binningThreadPool =
context.threadPools().getPoolBuilder(BATCH_WRITER_BIN_MUTATIONS_POOL)
+ .numCoreThreads(1).withQueue(new SynchronousQueue<>()).build();
binningThreadPool.setRejectedExecutionHandler(new
ThreadPoolExecutor.CallerRunsPolicy());
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index f13420d006..a85db74a86 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.groupingBy;
import static
org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId;
import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_CLIENT_BULK_THREADS_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_CLIENT_LOAD_POOL;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -482,12 +484,14 @@ public class BulkImport implements
ImportDestinationArguments, ImportMappingOpti
if (this.executor != null) {
executor = this.executor;
} else if (numThreads > 0) {
- executor = service =
context.threadPools().getPoolBuilder("BulkImportThread")
- .numCoreThreads(numThreads).build();
+ executor = service =
context.threadPools().getPoolBuilder(BULK_IMPORT_CLIENT_LOAD_POOL)
+ .numCoreThreads(numThreads).enableThreadPoolMetrics().build();
} else {
String threads =
context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
- executor = service =
context.threadPools().getPoolBuilder("BulkImportThread")
-
.numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build();
+ executor =
+ service =
context.threadPools().getPoolBuilder(BULK_IMPORT_CLIENT_BULK_THREADS_POOL)
+ .numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads))
+ .enableThreadPoolMetrics().build();
}
try {
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 1620f809b0..0801371d48 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.file;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BLOOM_LOADER_POOL;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -80,7 +81,7 @@ public class BloomFilterLayer {
}
if (maxLoadThreads > 0) {
- loadThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader")
+ loadThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder(BLOOM_LOADER_POOL)
.numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L,
SECONDS).build();
}
return loadThreadPool;
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index c2e5f81892..b3e4a99a7c 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -19,6 +19,8 @@
package org.apache.accumulo.core.util.compaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL;
import java.util.ArrayList;
import java.util.Collection;
@@ -224,7 +226,7 @@ public class ExternalCompactionUtil {
public static List<RunningCompaction>
getCompactionsRunningOnCompactors(ClientContext context) {
final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
final ExecutorService executor = ThreadPools.getServerThreadPools()
-
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
+
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
getCompactorAddrs(context).forEach((q, hp) -> {
hp.forEach(hostAndPort -> {
rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
@@ -251,7 +253,7 @@ public class ExternalCompactionUtil {
public static Collection<ExternalCompactionId>
getCompactionIdsRunningOnCompactors(ClientContext context) {
final ExecutorService executor = ThreadPools.getServerThreadPools()
-
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
+
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
List<Future<ExternalCompactionId>> futures = new ArrayList<>();
getCompactorAddrs(context).forEach((q, hp) -> {
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
new file mode 100644
index 0000000000..bdebd03b2d
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.threads;
+
+public enum ThreadPoolNames {
+
+ ACCUMULO_POOL_PREFIX("accumulo.pool"),
+ BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"),
+ BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"),
+ BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"),
+ BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"),
+
BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"),
+ BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"),
+
COMPACTION_COORDINATOR_SUMMARY_POOL("accumulo.pool.compaction.summary.gatherer"),
+
COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"),
+
COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"),
+
COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"),
+ CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"),
+
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
+
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
+
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
+ GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
+ GENERAL_SERVER_POOL("accumulo.pool.general.server"),
+ GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
+ IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),
+
INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"),
+ MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"),
+ MANAGER_FATE_POOL("accumulo.pool.manager.fate"),
+ MANAGER_RENAME_POOL("accumulo.pool.manager.rename"),
+ MANAGER_STATUS_POOL("accumulo.pool.manager.status"),
+
MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"),
+
METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"),
+ METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"),
+ METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"),
+ REPLICATION_WORKER_POOL("accumulo.pool.replication.worker"),
+ SCAN_POOL("accumulo.pool.scan"),
+
SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"),
+ SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"),
+ SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"),
+ SPLIT_POOL("accumulo.pool.table.ops.add.splits"),
+ TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"),
+ TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"),
+ TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"),
+ TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
+ TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
+
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
+ TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
+ TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
+ TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"),
+ TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
+ TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
+
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
+ TSERVER_WORKQ_POOL("accumulo.pool.tserver.workq"),
+ UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
+ UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");
+
+ public final String poolName;
+
+ ThreadPoolNames(String poolName) {
+ this.poolName = poolName;
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index ea26c563e4..b2b0bc02db 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -21,6 +21,23 @@ package org.apache.accumulo.core.util.threads;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_RETRIEVAL_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Iterator;
@@ -83,7 +100,7 @@ public class ThreadPools {
}
private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
- getServerThreadPools().getPoolBuilder("Scheduled Future
Checker").numCoreThreads(1).build();
+
getServerThreadPools().getPoolBuilder(SCHED_FUTURE_CHECKER_POOL).numCoreThreads(1).build();
private static final ConcurrentLinkedQueue<ScheduledFuture<?>>
CRITICAL_RUNNING_TASKS =
new ConcurrentLinkedQueue<>();
@@ -272,31 +289,33 @@ public class ThreadPools {
ThreadPoolExecutorBuilder builder;
switch (p) {
case GENERAL_SIMPLETIMER_THREADPOOL_SIZE:
- return createScheduledExecutorService(conf.getCount(p), "SimpleTimer");
+ return createScheduledExecutorService(conf.getCount(p),
+ GENERAL_SERVER_SIMPLETIMER_POOL.poolName);
case GENERAL_THREADPOOL_SIZE:
- return createScheduledExecutorService(conf.getCount(p),
"GeneralExecutor",
+ return createScheduledExecutorService(conf.getCount(p),
GENERAL_SERVER_POOL.poolName,
emitThreadPoolMetrics);
case MANAGER_BULK_THREADPOOL_SIZE:
- builder = getPoolBuilder("bulk
import").numCoreThreads(conf.getCount(p)).withTimeOut(
- conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT),
MILLISECONDS);
+ builder =
+
getPoolBuilder(MANAGER_BULK_IMPORT_POOL).numCoreThreads(conf.getCount(p)).withTimeOut(
+
conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case MANAGER_RENAME_THREADS:
- builder = getPoolBuilder("bulk move").numCoreThreads(conf.getCount(p));
+ builder =
getPoolBuilder(MANAGER_RENAME_POOL).numCoreThreads(conf.getCount(p));
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case MANAGER_FATE_THREADPOOL_SIZE:
- builder = getPoolBuilder("Repo
Runner").numCoreThreads(conf.getCount(p));
+ builder =
getPoolBuilder(MANAGER_FATE_POOL).numCoreThreads(conf.getCount(p));
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case MANAGER_STATUS_THREAD_POOL_SIZE:
- builder = getPoolBuilder("GatherTableInformation");
+ builder = getPoolBuilder(MANAGER_STATUS_POOL);
int threads = conf.getCount(p);
if (threads == 0) {
builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L,
SECONDS)
@@ -309,57 +328,57 @@ public class ThreadPools {
}
return builder.build();
case TSERV_WORKQ_THREADS:
- builder = getPoolBuilder("distributed work
queue").numCoreThreads(conf.getCount(p));
+ builder =
getPoolBuilder(TSERVER_WORKQ_POOL).numCoreThreads(conf.getCount(p));
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_MINC_MAXCONCURRENT:
- builder = getPoolBuilder("minor
compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L,
- MILLISECONDS);
+ builder =
getPoolBuilder(TSERVER_MINOR_COMPACTOR_POOL).numCoreThreads(conf.getCount(p))
+ .withTimeOut(0L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_MIGRATE_MAXCONCURRENT:
- builder = getPoolBuilder("tablet
migration").numCoreThreads(conf.getCount(p))
+ builder =
getPoolBuilder(TSERVER_MIGRATIONS_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(0L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_ASSIGNMENT_MAXCONCURRENT:
- builder = getPoolBuilder("tablet
assignment").numCoreThreads(conf.getCount(p))
+ builder =
getPoolBuilder(TSERVER_ASSIGNMENT_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(0L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_SUMMARY_RETRIEVAL_THREADS:
- builder = getPoolBuilder("summary file
retriever").numCoreThreads(conf.getCount(p))
+ builder =
getPoolBuilder(TSERVER_SUMMARY_RETRIEVAL_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_SUMMARY_REMOTE_THREADS:
- builder = getPoolBuilder("summary
remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L,
- MILLISECONDS);
+ builder =
getPoolBuilder(TSERVER_SUMMARY_REMOTE_POOL).numCoreThreads(conf.getCount(p))
+ .withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_SUMMARY_PARTITION_THREADS:
- builder = getPoolBuilder("summary
partition").numCoreThreads(conf.getCount(p))
+ builder =
getPoolBuilder(TSERVER_SUMMARY_PARTITION_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case GC_DELETE_THREADS:
- return
getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build();
+ return
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
case REPLICATION_WORKER_THREADS:
- builder = getPoolBuilder("replication
task").numCoreThreads(conf.getCount(p));
+ builder =
getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p));
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
@@ -370,8 +389,28 @@ public class ThreadPools {
}
}
+ /**
+ * Fet a fluent-style pool builder.
+ *
+ * @param pool the constant pool name
+ */
+ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final
ThreadPoolNames pool) {
+ return new ThreadPoolExecutorBuilder(pool.poolName);
+ }
+
+ /**
+ * Fet a fluent-style pool builder.
+ *
+ * @param name the pool name - the name trimed and prepended with the
ACCUMULO_POOL_PREFIX so that
+ * pool names begin with a consistent prefix.
+ */
public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) {
- return new ThreadPoolExecutorBuilder(name);
+ String trimmed = name.trim();
+ if (trimmed.startsWith(ACCUMULO_POOL_PREFIX.poolName)) {
+ return new ThreadPoolExecutorBuilder(trimmed);
+ } else {
+ return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName +
trimmed);
+ }
}
public class ThreadPoolExecutorBuilder {
diff --git
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index ea0c4ceabe..331a050b60 100644
---
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -231,7 +231,7 @@ public class MultiThreadedRFileTest {
// now start up multiple RFile deepcopies
int maxThreads = 10;
- String name = "MultiThreadedRFileTestThread";
+ String name = "test.rfile.multi.thread.pool";
ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().getPoolBuilder(name).numCoreThreads(maxThreads
+ 1)
.numMaxThreads(maxThreads + 1).withTimeOut(5, MINUTES).build();
diff --git
a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
index 5146ccf5b2..1d6fae41cc 100644
---
a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java
@@ -31,7 +31,7 @@ public class ThreadPoolExecutorBuilderTest {
@Test
public void builderDefaultsTest() {
- var p = serverPool.getPoolBuilder("defaults").build();
+ var p = serverPool.getPoolBuilder("defaults.pool").build();
assertEquals(0, p.getCorePoolSize());
assertEquals(1, p.getMaximumPoolSize());
assertEquals(3L, p.getKeepAliveTime(MINUTES));
@@ -40,38 +40,38 @@ public class ThreadPoolExecutorBuilderTest {
@Test
public void builderInvalidNumCoreTest() {
assertThrows(IllegalArgumentException.class,
- () -> serverPool.getPoolBuilder("test1").numCoreThreads(-1).build());
+ () ->
serverPool.getPoolBuilder("test1.pool").numCoreThreads(-1).build());
}
@Test
public void builderInvalidNumMaxThreadsTest() {
// max threads must be > core threads
assertThrows(IllegalArgumentException.class,
- () ->
serverPool.getPoolBuilder("test1").numCoreThreads(2).numMaxThreads(1).build());
+ () ->
serverPool.getPoolBuilder("test1.pool").numCoreThreads(2).numMaxThreads(1).build());
}
@Test
public void builderPoolCoreMaxTest() {
- var p =
serverPool.getPoolBuilder("test1").numCoreThreads(1).numMaxThreads(2).build();
+ var p =
serverPool.getPoolBuilder("test1.pool").numCoreThreads(1).numMaxThreads(2).build();
assertEquals(1, p.getCorePoolSize());
assertEquals(2, p.getMaximumPoolSize());
}
@Test
public void builderFixedPoolTest() {
- var p = serverPool.getPoolBuilder("test1").numCoreThreads(2).build();
+ var p = serverPool.getPoolBuilder("test1.pool").numCoreThreads(2).build();
assertEquals(2, p.getCorePoolSize());
assertEquals(2, p.getMaximumPoolSize());
}
@Test
public void buildeSetTimeoutTest() {
- var p = serverPool.getPoolBuilder("test1").withTimeOut(0L,
MILLISECONDS).build();
+ var p = serverPool.getPoolBuilder("test1.pool").withTimeOut(0L,
MILLISECONDS).build();
assertEquals(0, p.getCorePoolSize());
assertEquals(1, p.getMaximumPoolSize());
assertEquals(0L, p.getKeepAliveTime(MINUTES));
- var p2 = serverPool.getPoolBuilder("test1").withTimeOut(123L,
MILLISECONDS).build();
+ var p2 = serverPool.getPoolBuilder("test1.pool").withTimeOut(123L,
MILLISECONDS).build();
assertEquals(0, p2.getCorePoolSize());
assertEquals(1, p2.getMaximumPoolSize());
assertEquals(123L, p2.getKeepAliveTime(MILLISECONDS));
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 4de81d7759..679941fafd 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -134,8 +134,9 @@ public class BulkImporter {
Collections.synchronizedSortedMap(new TreeMap<>());
timer.start(Timers.EXAMINE_MAP_FILES);
- ExecutorService threadPool = ThreadPools.getServerThreadPools()
-
.getPoolBuilder("findOverlapping").numCoreThreads(numThreads).build();
+ ExecutorService threadPool =
+
ThreadPools.getServerThreadPools().getPoolBuilder("bulk.import.find.overlapping")
+ .numCoreThreads(numThreads).enableThreadPoolMetrics().build();
for (Path path : paths) {
final Path mapFile = path;
@@ -362,8 +363,8 @@ public class BulkImporter {
final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new
TreeMap<>());
- ExecutorService threadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("estimateSizes")
- .numCoreThreads(numThreads).build();
+ ExecutorService threadPool = ThreadPools.getServerThreadPools()
+
.getPoolBuilder("bulk.import.size.estimate").numCoreThreads(numThreads).build();
for (final Entry<Path,List<TabletLocation>> entry :
assignments.entrySet()) {
if (entry.getValue().size() == 1) {
@@ -552,8 +553,8 @@ public class BulkImporter {
}
});
- ExecutorService threadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("submit")
- .numCoreThreads(numThreads).build();
+ ExecutorService threadPool = ThreadPools.getServerThreadPools()
+
.getPoolBuilder("bulk.import.submit").numCoreThreads(numThreads).build();
for (Entry<String,Map<KeyExtent,List<PathSize>>> entry :
assignmentsPerTabletServer
.entrySet()) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
index d55c9465b1..c50f896b99 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
@@ -45,8 +45,8 @@ public class PropCacheCaffeineImpl implements PropCache {
public static final int EXPIRE_MIN = 60;
private static final Logger log =
LoggerFactory.getLogger(PropCacheCaffeineImpl.class);
private static final Executor executor =
-
ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1)
- .numMaxThreads(20).withTimeOut(60L, SECONDS).build();
+
ThreadPools.getServerThreadPools().getPoolBuilder("caffeine.prop.cache.tasks")
+ .numCoreThreads(1).numMaxThreads(20).withTimeOut(60L,
SECONDS).build();
private final LoadingCache<PropStoreKey<?>,VersionedProperties> cache;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
index 952409a2bb..d4fdf455f6 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
@@ -58,7 +58,7 @@ public class PropStoreWatcher implements Watcher {
private static final Logger log =
LoggerFactory.getLogger(PropStoreWatcher.class);
private static final ExecutorService executorService =
ThreadPools.getServerThreadPools()
- .getPoolBuilder("zoo_change_update").numCoreThreads(2).build();
+
.getPoolBuilder("prop.store.zoo.change.update").numCoreThreads(2).build();
private final ReentrantReadWriteLock listenerLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock listenerReadLock =
listenerLock.readLock();
private final ReentrantReadWriteLock.WriteLock listenerWriteLock =
listenerLock.writeLock();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index 2c04fc9e49..6f8687fd64 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@ -68,7 +68,7 @@ public class ProblemReports implements
Iterable<ProblemReport> {
* is reporting lots of problems, but problem reports can not be processed
*/
private final ExecutorService reportExecutor =
ThreadPools.getServerThreadPools()
-
.getPoolBuilder("acu-problem-reporter").numCoreThreads(0).numMaxThreads(1)
+ .getPoolBuilder("problem.reporter").numCoreThreads(0).numMaxThreads(1)
.withTimeOut(60L, SECONDS).withQueue(new
LinkedBlockingQueue<>(500)).build();
private final ServerContext context;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 87bfb4c0c8..d4220ff6de 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.server.rpc;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
import java.io.IOException;
import java.net.InetAddress;
@@ -310,20 +311,21 @@ public class TServerUtils {
private static ThreadPoolExecutor createSelfResizingThreadPool(final String
serverName,
final int executorThreads, long threadTimeOut, final
AccumuloConfiguration conf,
long timeBetweenThreadChecks) {
- final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools()
- .getPoolBuilder(serverName +
"-ClientPool").numCoreThreads(executorThreads)
- .withTimeOut(threadTimeOut,
MILLISECONDS).enableThreadPoolMetrics().build();
+ String poolName = ACCUMULO_POOL_PREFIX.poolName + serverName.toLowerCase()
+ ".client";
+ final ThreadPoolExecutor pool =
+
ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads)
+ .withTimeOut(threadTimeOut,
MILLISECONDS).enableThreadPoolMetrics().build();
// periodically adjust the number of threads we need by checking how busy
our threads are
ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
// there is a minor race condition between sampling the current state of
the thread pool
// and adjusting it however, this isn't really an issue, since it
adjusts periodically
if (pool.getCorePoolSize() <= pool.getActiveCount()) {
int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(),
2);
- ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+ ThreadPools.resizePool(pool, () -> larger, poolName);
} else {
if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
- ThreadPools.resizePool(pool, () -> smaller, serverName +
"-ClientPool");
+ ThreadPools.resizePool(pool, () -> smaller, poolName);
}
}
});
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index 18de10604c..f9c05cdba8 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.server.util;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.UTILITY_CHECK_FILE_TASKS;
+
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@@ -121,8 +123,8 @@ public class RemoveEntriesForMissingFiles {
Map<Path,Path> cache = new LRUMap<>(100000);
Set<Path> processing = new HashSet<>();
- ExecutorService threadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("CheckFileTasks")
- .numCoreThreads(16).build();
+ ExecutorService threadPool = ThreadPools.getServerThreadPools()
+ .getPoolBuilder(UTILITY_CHECK_FILE_TASKS).numCoreThreads(16).build();
System.out.printf("Scanning : %s %s\n", tableName, range);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index f1201830a3..e14092116b 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.server.util;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.UTILITY_VERIFY_TABLET_ASSIGNMENTS;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -117,8 +119,8 @@ public class VerifyTabletAssignments {
}
}
- ExecutorService tp =
ThreadPools.getServerThreadPools().getPoolBuilder("CheckTabletServer")
- .numCoreThreads(20).build();
+ ExecutorService tp = ThreadPools.getServerThreadPools()
+
.getPoolBuilder(UTILITY_VERIFY_TABLET_ASSIGNMENTS).numCoreThreads(20).build();
for (final Entry<HostAndPort,List<KeyExtent>> entry :
extentsPerServer.entrySet()) {
Runnable r = () -> {
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
index e52b39a419..25a94c839c 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java
@@ -60,7 +60,7 @@ public class ReadyMonitorTest {
// these tests wait for workers to signal ready using count down latch.
// size pool so some threads are likely to wait on others to complete.
int numPoolThreads = numWorkerThreads / 2;
- workerPool =
ThreadPools.getServerThreadPools().getPoolBuilder("readyMonitor-test-pool")
+ workerPool =
ThreadPools.getServerThreadPools().getPoolBuilder("test.ready.monitor.pool")
.numCoreThreads(numPoolThreads).build();
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 16452b0805..9f03235eae 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTION_COORDINATOR_SUMMARY_POOL;
import java.net.UnknownHostException;
import java.util.HashSet;
@@ -352,7 +353,7 @@ public class CompactionCoordinator extends AbstractServer
private void updateSummaries() {
ExecutorService executor = ThreadPools.getServerThreadPools()
- .getPoolBuilder("Compaction Summary
Gatherer").numCoreThreads(10).build();
+
.getPoolBuilder(COMPACTION_COORDINATOR_SUMMARY_POOL).numCoreThreads(10).build();
try {
Set<String> queuesSeen = new ConcurrentSkipListSet<>();
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 45b6161bab..a341a58582 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_BACKGROUND_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_NOTIFIER_POOL;
import java.util.ArrayList;
import java.util.Iterator;
@@ -76,11 +78,11 @@ public class CompactionFinalizer {
.getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS);
this.ntfyExecutor = ThreadPools.getServerThreadPools()
- .getPoolBuilder("Compaction Finalizer
Notifier").numCoreThreads(3).numMaxThreads(max)
+
.getPoolBuilder(COORDINATOR_FINALIZER_NOTIFIER_POOL).numCoreThreads(3).numMaxThreads(max)
.withTimeOut(1L, MINUTES).enableThreadPoolMetrics().build();
this.backgroundExecutor =
- ThreadPools.getServerThreadPools().getPoolBuilder("Compaction
Finalizer Background Task")
+
ThreadPools.getServerThreadPools().getPoolBuilder(COORDINATOR_FINALIZER_BACKGROUND_POOL)
.numCoreThreads(1).enableThreadPoolMetrics().build();
backgroundExecutor.execute(() -> {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index 5ace9ccb6f..72e4923f6d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.tableOps.bulkVer2;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_DIR_MOVE_POOL;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -117,7 +119,7 @@ class BulkImportMove extends ManagerRepo {
oldToNewMap.put(originalPath, newPath);
}
try {
- fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fmtTid);
+ fs.bulkRename(oldToNewMap, workerCount,
BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid);
} catch (IOException ioe) {
throw new
AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
index c717527619..1bc30fc73c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.tableOps.tableImport;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.IMPORT_TABLE_RENAME_POOL;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@@ -112,7 +114,7 @@ class MoveExportedFiles extends ManagerRepo {
}
}
try {
- fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fmtTid);
+ fs.bulkRename(oldToNewPaths, workerCount,
IMPORT_TABLE_RENAME_POOL.poolName, fmtTid);
} catch (IOException ioe) {
throw new
AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null,
TableOperation.IMPORT, TableOperationExceptionType.OTHER,
ioe.getCause().getMessage());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 11cb713b58..5445d6d3da 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.upgrade;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL;
import java.io.IOException;
import java.util.Collections;
@@ -194,8 +195,9 @@ public class UpgradeCoordinator {
"Not currently in a suitable state to do metadata upgrade %s", status);
if (currentVersion < AccumuloDataVersion.get()) {
- return
ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads")
- .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L,
SECONDS)
+ return ThreadPools.getServerThreadPools()
+
.getPoolBuilder(MANAGER_UPGRADE_COORDINATOR_METADATA_POOL).numCoreThreads(0)
+ .numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS)
.withQueue(new SynchronousQueue<>()).build().submit(() -> {
try {
for (int v = currentVersion; v < AccumuloDataVersion.get(); v++)
{
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index ad9c72d0c0..18cb23b37b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -259,8 +260,8 @@ public class ScanServer extends AbstractServer
"Tablet metadata cache refresh percentage is '%s' but must be less
than 1",
cacheRefreshPercentage);
- tmCacheExecutor =
context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8)
- .enableThreadPoolMetrics().build();
+ tmCacheExecutor =
context.threadPools().getPoolBuilder(SCAN_SERVER_TABLET_METADATA_CACHE_POOL)
+ .numCoreThreads(8).enableThreadPoolMetrics().build();
var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration,
TimeUnit.MILLISECONDS)
.scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats();
if (cacheRefreshPercentage > 0) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index c3ad1fd830..655b54fdbd 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -23,6 +23,17 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toUnmodifiableMap;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
+import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL;
import java.io.IOException;
import java.util.ArrayList;
@@ -130,13 +141,13 @@ public class TabletServerResourceManager {
* pool executor
*
* @param maxThreads max threads
- * @param name name of thread pool
+ * @param pool name of thread pool
* @param tp executor
*/
- private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String
name,
+ private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String
pool,
final ThreadPoolExecutor tp) {
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
- () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, SECONDS));
+ () -> ThreadPools.resizePool(tp, maxThreads, pool), 1, 10, SECONDS));
}
private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
@@ -184,12 +195,14 @@ public class TabletServerResourceManager {
}
scanExecQueues.put(sec.name, queue);
-
- ThreadPoolExecutor es =
ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name)
+ ThreadPoolExecutor es = ThreadPools.getServerThreadPools()
+ .getPoolBuilder(ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name)
.numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads())
.withTimeOut(0L,
MILLISECONDS).withQueue(queue).atPriority(sec.priority)
.enableThreadPoolMetrics(enableMetrics).build();
- modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" +
sec.name, es);
+
+ modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads,
+ ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name, es);
return es;
}
@@ -306,25 +319,24 @@ public class TabletServerResourceManager {
Property.TSERV_MINC_MAXCONCURRENT, true);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
- "minor compactor", minorCompactionThreadPool);
+ TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool);
- splitThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("splitter")
- .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS)
- .enableThreadPoolMetrics(enableMetrics).build();
+ splitThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder(SPLIT_POOL)
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS).build();
- defaultSplitThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("md splitter")
- .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS)
- .enableThreadPoolMetrics(enableMetrics).build();
+ defaultSplitThreadPool =
+
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_DEFAULT_SPLIT_POOL)
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(60,
SECONDS).build();
- defaultMigrationPool = ThreadPools.getServerThreadPools()
- .getPoolBuilder("metadata tablet
migration").numCoreThreads(0).numMaxThreads(1)
- .withTimeOut(60,
SECONDS).enableThreadPoolMetrics(enableMetrics).build();
+ defaultMigrationPool =
+
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL)
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(60,
SECONDS).build();
migrationPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
- "tablet migration", migrationPool);
+ TSERVER_TABLET_MIGRATION_POOL.poolName, migrationPool);
// not sure if concurrent assignments can run safely... even if they could
there is probably no
// benefit at startup because
@@ -335,11 +347,11 @@ public class TabletServerResourceManager {
Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
- "tablet assignment", assignmentPool);
+ TABLET_ASSIGNMENT_POOL.poolName, assignmentPool);
- assignMetaDataPool = ThreadPools.getServerThreadPools()
- .getPoolBuilder("metadata tablet
assignment").numCoreThreads(0).numMaxThreads(1)
- .withTimeOut(60,
SECONDS).enableThreadPoolMetrics(enableMetrics).build();
+ assignMetaDataPool =
+
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_ASSIGNMENT_POOL)
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(60,
SECONDS).build();
activeAssignments = new ConcurrentHashMap<>();
@@ -347,19 +359,19 @@ public class TabletServerResourceManager {
Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
- "summary file retriever", summaryRetrievalPool);
+ TSERVER_SUMMARY_FILE_RETRIEVER_POOL.poolName, summaryRetrievalPool);
summaryRemotePool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
- "summary remote", summaryRemotePool);
+ TSERVER_SUMMARY_REMOTE_POOL.poolName, summaryRemotePool);
summaryPartitionPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
- "summary partition", summaryPartitionPool);
+ TSERVER_SUMMARY_PARTITION_POOL.poolName, summaryPartitionPool);
boolean isScanServer = (tserver instanceof ScanServer);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 3054db17ea..85df30494a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.compactions;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTION_SERVICE_COMPACTION_PLANNER_POOL;
import java.util.Collection;
import java.util.Collections;
@@ -132,8 +133,9 @@ public class CompactionService {
this.executors = Map.copyOf(tmpExecutors);
- this.planningExecutor =
ThreadPools.getServerThreadPools().getPoolBuilder("CompactionPlanner")
- .numCoreThreads(1).numMaxThreads(1).withTimeOut(0L,
MILLISECONDS).build();
+ this.planningExecutor = ThreadPools.getServerThreadPools()
+
.getPoolBuilder(COMPACTION_SERVICE_COMPACTION_PLANNER_POOL).numCoreThreads(1)
+ .numMaxThreads(1).withTimeOut(0L, MILLISECONDS).build();
this.queuedForPlanning = new EnumMap<>(CompactionKind.class);
for (CompactionKind kind : CompactionKind.values()) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index 229f53ca77..d059de7c30 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.compactions;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
import java.util.ArrayList;
import java.util.Collections;
@@ -173,7 +174,9 @@ public class InternalCompactionExecutor implements
CompactionExecutor {
queue = new PriorityBlockingQueue<>(100, comparator);
- threadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("compaction." + ceid)
+ threadPool = ThreadPools.getServerThreadPools()
+ .getPoolBuilder(
+ ACCUMULO_POOL_PREFIX.poolName +
".compaction.service.internal.compaction." + ceid)
.numCoreThreads(threads).numMaxThreads(threads).withTimeOut(60L,
SECONDS).withQueue(queue)
.build();
metricCloser =
@@ -204,7 +207,8 @@ public class InternalCompactionExecutor implements
CompactionExecutor {
}
public void setThreads(int numThreads) {
- ThreadPools.resizePool(threadPool, () -> numThreads, "compaction." + ceid);
+ ThreadPools.resizePool(threadPool, () -> numThreads,
+ ACCUMULO_POOL_PREFIX.poolName + "accumulo.pool.compaction." + ceid);
}
@Override
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 6fc396e4f4..32a249e7b6 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.log;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_SORT_CONCURRENT_POOL;
import java.io.DataInputStream;
import java.io.EOFException;
@@ -297,7 +298,7 @@ public class LogSorter {
int threadPoolSize = this.conf.getCount(this.conf
.resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT,
Property.TSERV_RECOVERY_MAX_CONCURRENT));
ThreadPoolExecutor threadPool =
-
ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName())
+
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL)
.numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build();
new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY,
sortedLogConf,
context).startProcessing(new LogProcessor(), threadPool);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 346b70166c..6757d276ee 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.log;
import static java.util.Collections.singletonList;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_CREATOR_POOL;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
@@ -262,8 +263,8 @@ public class TabletServerLogger {
if (nextLogMaker != null) {
return;
}
- nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog
creator")
- .numCoreThreads(1).enableThreadPoolMetrics().build();
+ nextLogMaker =
ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_CREATOR_POOL)
+ .numCoreThreads(1).build();
nextLogMaker.execute(new Runnable() {
@Override
public void run() {
diff --git
a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
index 40c1487e58..c51cd0c5d5 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
@@ -78,8 +78,8 @@ public class BalanceWithOfflineTableIT extends
ConfigurableMacBase {
log.info("Waiting for balance");
- ExecutorService pool =
ThreadPools.getServerThreadPools().getPoolBuilder("waitForBalance")
- .numCoreThreads(1).build();
+ ExecutorService pool = ThreadPools.getServerThreadPools()
+
.getPoolBuilder("test.wait.for.balance.pool").numCoreThreads(1).build();
Future<Boolean> wait = pool.submit(() -> {
c.instanceOperations().waitForBalance();
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 101c9fc65e..9ed38c5503 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -213,7 +213,7 @@ public class BatchWriterFlushIT extends
AccumuloClusterHarness {
}
ThreadPoolExecutor threads = ThreadPools.getServerThreadPools()
- .getPoolBuilder("ClientThreads").numCoreThreads(NUM_THREADS).build();
+
.getPoolBuilder("batch.writer.client.flush").numCoreThreads(NUM_THREADS).build();
threads.allowCoreThreadTimeOut(false);
threads.prestartAllCoreThreads();