This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new bf16a1d302 speeds up checking zoo locks for lots of server processes
(#5058)
bf16a1d302 is described below
commit bf16a1d302197477a21b643e5e653c8dc6dd34b8
Author: Keith Turner <[email protected]>
AuthorDate: Thu Nov 14 17:01:40 2024 -0500
speeds up checking zoo locks for lots of server processes (#5058)
Many operations like listcompactions, listscans, admin serviceStatus
check the zookeeper locks of server processes. These lock checks are
currently done serially and each require a RPC to zookeeper. This
change parallelizes the lock checks.
---
.../accumulo/core/lock/ServiceLockPaths.java | 61 +++++++++++++++++-----
1 file changed, 48 insertions(+), 13 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 6b577f7e06..1df14549de 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -18,12 +18,17 @@
*/
package org.apache.accumulo.core.lock;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
@@ -412,7 +417,7 @@ public class ServiceLockPaths {
Objects.requireNonNull(resourceGroupPredicate);
Objects.requireNonNull(addressSelector);
- final Set<ServiceLockPath> results = new HashSet<>();
+ final Set<ServiceLockPath> results = ConcurrentHashMap.newKeySet();
final String typePath = ctx.getZooKeeperRoot() + serverType;
final ZooCache cache = ctx.getZooCache();
@@ -451,21 +456,51 @@ public class ServiceLockPaths {
addressPredicate = addressSelector.getPredicate();
}
- for (final String server : servers) {
- if (addressPredicate.test(server)) {
- final ServiceLockPath slp =
- parse(Optional.of(serverType), typePath + "/" + group + "/"
+ server);
- if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
- // Dead TServers don't have lock data
- results.add(slp);
- } else {
- final ZcStat stat = new ZcStat();
- Optional<ServiceLockData> sld = ServiceLock.getLockData(cache,
slp, stat);
- if (!sld.isEmpty()) {
+ ExecutorService executor = null;
+ try {
+ if (withLock) {
+ int numThreads = Math.max(1, Math.min(servers.size() / 1000,
16));
+ executor = Executors.newFixedThreadPool(numThreads);
+ }
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (final String server : servers) {
+ if (addressPredicate.test(server)) {
+ final ServiceLockPath slp =
+ parse(Optional.of(serverType), typePath + "/" + group +
"/" + server);
+ if (!withLock ||
slp.getType().equals(Constants.ZDEADTSERVERS)) {
+ // Dead TServers don't have lock data
results.add(slp);
+ } else {
+ // Execute reads to zookeeper to get lock info in parallel.
The zookeeper client
+ // has a single shared connection to a server so this will
not create lots of
+ // connections, it will place multiple outgoing request on
that single zookeeper
+ // connection at the same time though.
+ futures.add(executor.submit(() -> {
+ final ZcStat stat = new ZcStat();
+ Optional<ServiceLockData> sld =
ServiceLock.getLockData(cache, slp, stat);
+ if (sld.isPresent()) {
+ results.add(slp);
+ }
+ return null;
+ }));
}
}
}
+
+ // wait for futures to complete and check for errors
+ for (var future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
}
}
}