This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new e7b5f30519 Added Mini method to stop worker processes in parallel
(#5697)
e7b5f30519 is described below
commit e7b5f3051906cb6bf4431cd12361dc1648e64c39
Author: Dave Marion <[email protected]>
AuthorDate: Thu Jul 3 07:42:00 2025 -0400
Added Mini method to stop worker processes in parallel (#5697)
Prior to this change stopping the compactor, scan server,
and tablet server processes would be done serially while
waiting up to 30 seconds for each process to terminate.
Added a method to stop all of the processes concurrently.
---
.../MiniAccumuloClusterControl.java | 64 ++++------------------
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 40 +++++++++++++-
2 files changed, 49 insertions(+), 55 deletions(-)
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index a7445302c5..286e561f5a 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -257,16 +257,7 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
if (group == null) {
return;
}
- group.forEach(process -> {
- try {
- cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("Compactor did not fully stop after 30 seconds", e);
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
+ cluster.stopProcessesWithTimeout(ServerType.COMPACTOR, group, 30,
TimeUnit.SECONDS);
compactorProcesses.remove(compactorResourceGroup);
}
}
@@ -277,16 +268,7 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
if (group == null) {
return;
}
- group.forEach(process -> {
- try {
- cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("TabletServer did not fully stop after 30 seconds", e);
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
+ cluster.stopProcessesWithTimeout(ServerType.TABLET_SERVER, group, 30,
TimeUnit.SECONDS);
tabletServerProcesses.remove(tserverResourceGroup);
}
}
@@ -341,17 +323,9 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
case TABLET_SERVER:
synchronized (tabletServerProcesses) {
try {
- tabletServerProcesses.values().forEach(list -> {
- list.forEach(process -> {
- try {
- cluster.stopProcessWithTimeout(process, 30,
TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("TabletServer did not fully stop after 30 seconds",
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- });
+ final List<Process> procs = new ArrayList<>();
+ tabletServerProcesses.values().forEach(procs::addAll);
+ cluster.stopProcessesWithTimeout(ServerType.TABLET_SERVER, procs,
30, TimeUnit.SECONDS);
} finally {
tabletServerProcesses.clear();
}
@@ -373,17 +347,9 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
case SCAN_SERVER:
synchronized (scanServerProcesses) {
try {
- scanServerProcesses.values().forEach(list -> {
- list.forEach(process -> {
- try {
- cluster.stopProcessWithTimeout(process, 30,
TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("TabletServer did not fully stop after 30 seconds",
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- });
+ final List<Process> procs = new ArrayList<>();
+ scanServerProcesses.values().forEach(procs::addAll);
+ cluster.stopProcessesWithTimeout(ServerType.SCAN_SERVER, procs,
30, TimeUnit.SECONDS);
} finally {
scanServerProcesses.clear();
}
@@ -392,17 +358,9 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
case COMPACTOR:
synchronized (compactorProcesses) {
try {
- compactorProcesses.values().forEach(list -> {
- list.forEach(process -> {
- try {
- cluster.stopProcessWithTimeout(process, 30,
TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("TabletServer did not fully stop after 30 seconds",
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- });
+ final List<Process> procs = new ArrayList<>();
+ compactorProcesses.values().forEach(procs::addAll);
+ cluster.stopProcessesWithTimeout(ServerType.COMPACTOR, procs, 30,
TimeUnit.SECONDS);
} finally {
compactorProcesses.clear();
}
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index bc43acc973..7f80211d8c 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -49,7 +49,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -98,6 +98,7 @@ import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.accumulo.manager.state.SetGoalState;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
@@ -633,7 +634,9 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
control.start(ServerType.GARBAGE_COLLECTOR);
if (executor == null) {
- executor = Executors.newSingleThreadExecutor();
+ executor =
ThreadPools.getServerThreadPools().getPoolBuilder(getClass().getSimpleName())
+ .numCoreThreads(1).numMaxThreads(16).withTimeOut(1, TimeUnit.SECONDS)
+ .enableThreadPoolMetrics(false).build();
}
Set<String> groups;
@@ -1094,6 +1097,39 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
return executor;
}
+ public void stopProcessesWithTimeout(final ServerType type, final
List<Process> procs,
+ final long timeout, final TimeUnit unit) {
+
+ final List<Future<Integer>> futures = new ArrayList<>();
+ for (Process proc : procs) {
+ futures.add(executor.submit(() -> {
+ proc.destroy();
+ proc.waitFor(timeout, unit);
+ return proc.exitValue();
+ }));
+ }
+
+ while (!futures.isEmpty()) {
+ futures.removeIf(f -> {
+ if (f.isDone()) {
+ try {
+ f.get();
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("{} did not fully stop after {} seconds", type,
unit.toSeconds(timeout), e);
+ }
+ return true;
+ }
+ return false;
+ });
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while trying to stop " + type + " processes.");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
public int stopProcessWithTimeout(final Process proc, long timeout, TimeUnit
unit)
throws InterruptedException, ExecutionException, TimeoutException {
FutureTask<Integer> future = new FutureTask<>(() -> {