This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new e7b5f30519 Added Mini method to stop worker processes in parallel 
(#5697)
e7b5f30519 is described below

commit e7b5f3051906cb6bf4431cd12361dc1648e64c39
Author: Dave Marion <[email protected]>
AuthorDate: Thu Jul 3 07:42:00 2025 -0400

    Added Mini method to stop worker processes in parallel (#5697)
    
    Prior to this change stopping the compactor, scan server,
    and tablet server processes would be done serially while
    waiting up to 30 seconds for each process to terminate.
    Added a method to stop all of the processes concurrently.
---
 .../MiniAccumuloClusterControl.java                | 64 ++++------------------
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   | 40 +++++++++++++-
 2 files changed, 49 insertions(+), 55 deletions(-)

diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index a7445302c5..286e561f5a 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -257,16 +257,7 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
       if (group == null) {
         return;
       }
-      group.forEach(process -> {
-        try {
-          cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
-        } catch (ExecutionException | TimeoutException e) {
-          log.warn("Compactor did not fully stop after 30 seconds", e);
-          throw new RuntimeException(e);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      });
+      cluster.stopProcessesWithTimeout(ServerType.COMPACTOR, group, 30, 
TimeUnit.SECONDS);
       compactorProcesses.remove(compactorResourceGroup);
     }
   }
@@ -277,16 +268,7 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
       if (group == null) {
         return;
       }
-      group.forEach(process -> {
-        try {
-          cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
-        } catch (ExecutionException | TimeoutException e) {
-          log.warn("TabletServer did not fully stop after 30 seconds", e);
-          throw new RuntimeException(e);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      });
+      cluster.stopProcessesWithTimeout(ServerType.TABLET_SERVER, group, 30, 
TimeUnit.SECONDS);
       tabletServerProcesses.remove(tserverResourceGroup);
     }
   }
@@ -341,17 +323,9 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
       case TABLET_SERVER:
         synchronized (tabletServerProcesses) {
           try {
-            tabletServerProcesses.values().forEach(list -> {
-              list.forEach(process -> {
-                try {
-                  cluster.stopProcessWithTimeout(process, 30, 
TimeUnit.SECONDS);
-                } catch (ExecutionException | TimeoutException e) {
-                  log.warn("TabletServer did not fully stop after 30 seconds", 
e);
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                }
-              });
-            });
+            final List<Process> procs = new ArrayList<>();
+            tabletServerProcesses.values().forEach(procs::addAll);
+            cluster.stopProcessesWithTimeout(ServerType.TABLET_SERVER, procs, 
30, TimeUnit.SECONDS);
           } finally {
             tabletServerProcesses.clear();
           }
@@ -373,17 +347,9 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
       case SCAN_SERVER:
         synchronized (scanServerProcesses) {
           try {
-            scanServerProcesses.values().forEach(list -> {
-              list.forEach(process -> {
-                try {
-                  cluster.stopProcessWithTimeout(process, 30, 
TimeUnit.SECONDS);
-                } catch (ExecutionException | TimeoutException e) {
-                  log.warn("TabletServer did not fully stop after 30 seconds", 
e);
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                }
-              });
-            });
+            final List<Process> procs = new ArrayList<>();
+            scanServerProcesses.values().forEach(procs::addAll);
+            cluster.stopProcessesWithTimeout(ServerType.SCAN_SERVER, procs, 
30, TimeUnit.SECONDS);
           } finally {
             scanServerProcesses.clear();
           }
@@ -392,17 +358,9 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
       case COMPACTOR:
         synchronized (compactorProcesses) {
           try {
-            compactorProcesses.values().forEach(list -> {
-              list.forEach(process -> {
-                try {
-                  cluster.stopProcessWithTimeout(process, 30, 
TimeUnit.SECONDS);
-                } catch (ExecutionException | TimeoutException e) {
-                  log.warn("TabletServer did not fully stop after 30 seconds", 
e);
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                }
-              });
-            });
+            final List<Process> procs = new ArrayList<>();
+            compactorProcesses.values().forEach(procs::addAll);
+            cluster.stopProcessesWithTimeout(ServerType.COMPACTOR, procs, 30, 
TimeUnit.SECONDS);
           } finally {
             compactorProcesses.clear();
           }
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index bc43acc973..7f80211d8c 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -49,7 +49,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -98,6 +98,7 @@ import org.apache.accumulo.core.util.ConfigurationImpl;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.zookeeper.ZooSession;
 import org.apache.accumulo.manager.state.SetGoalState;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
@@ -633,7 +634,9 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
     control.start(ServerType.GARBAGE_COLLECTOR);
 
     if (executor == null) {
-      executor = Executors.newSingleThreadExecutor();
+      executor = 
ThreadPools.getServerThreadPools().getPoolBuilder(getClass().getSimpleName())
+          .numCoreThreads(1).numMaxThreads(16).withTimeOut(1, TimeUnit.SECONDS)
+          .enableThreadPoolMetrics(false).build();
     }
 
     Set<String> groups;
@@ -1094,6 +1097,39 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
     return executor;
   }
 
+  public void stopProcessesWithTimeout(final ServerType type, final 
List<Process> procs,
+      final long timeout, final TimeUnit unit) {
+
+    final List<Future<Integer>> futures = new ArrayList<>();
+    for (Process proc : procs) {
+      futures.add(executor.submit(() -> {
+        proc.destroy();
+        proc.waitFor(timeout, unit);
+        return proc.exitValue();
+      }));
+    }
+
+    while (!futures.isEmpty()) {
+      futures.removeIf(f -> {
+        if (f.isDone()) {
+          try {
+            f.get();
+          } catch (ExecutionException | InterruptedException e) {
+            log.warn("{} did not fully stop after {} seconds", type, 
unit.toSeconds(timeout), e);
+          }
+          return true;
+        }
+        return false;
+      });
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted while trying to stop " + type + " processes.");
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   public int stopProcessWithTimeout(final Process proc, long timeout, TimeUnit 
unit)
       throws InterruptedException, ExecutionException, TimeoutException {
     FutureTask<Integer> future = new FutureTask<>(() -> {

Reply via email to