This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 4b5234bd87 Added ZK cleanup thread to Manager for Scan Server nodes
(#4562)
4b5234bd87 is described below
commit 4b5234bd87a46bfcd686b3db9bda9adff753f556
Author: Dave Marion <[email protected]>
AuthorDate: Thu May 16 14:54:05 2024 -0400
Added ZK cleanup thread to Manager for Scan Server nodes (#4562)
Closes #4559
---
.../java/org/apache/accumulo/manager/Manager.java | 49 +++++++++++++++++++++-
1 file changed, 47 insertions(+), 2 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 86a1dd71d3..84e8e68519 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -72,6 +72,7 @@ import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -174,7 +175,7 @@ public class Manager extends AbstractServer
static final Logger log = LoggerFactory.getLogger(Manager.class);
static final int ONE_SECOND = 1000;
- private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 *
ONE_SECOND;
+ private static final long CLEANUP_INTERVAL_MINUTES = 5;
static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
@@ -698,7 +699,7 @@ public class Manager extends AbstractServer
log.error("Error cleaning up migrations", ex);
}
}
- sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS);
+ sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
}
}
@@ -740,6 +741,48 @@ public class Manager extends AbstractServer
}
}
+ private class ScanServerZKCleaner implements Runnable {
+
+ @Override
+ public void run() {
+
+ final ZooReaderWriter zrw = getContext().getZooReaderWriter();
+ final String sserverZNodePath = getContext().getZooKeeperRoot() +
Constants.ZSSERVERS;
+
+ while (stillManager()) {
+ try {
+ for (String sserverClientAddress :
zrw.getChildren(sserverZNodePath)) {
+
+ final String sServerZPath = sserverZNodePath + "/" +
sserverClientAddress;
+ final var zLockPath = ServiceLock.path(sServerZPath);
+ ZcStat stat = new ZcStat();
+ byte[] lockData =
ServiceLock.getLockData(getContext().getZooCache(), zLockPath, stat);
+
+ if (lockData == null) {
+ try {
+ log.debug("Deleting empty ScanServer ZK node {}",
sServerZPath);
+ zrw.delete(sServerZPath);
+ } catch (KeeperException.NotEmptyException e) {
+ log.debug(
+ "Failed to delete ScanServer ZK node {} its not empty,
likely an expected race condition.",
+ sServerZPath);
+ }
+ }
+ }
+ } catch (KeeperException e) {
+ log.error("Exception trying to delete empty scan server ZNodes, will
retry", e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ log.error("Interrupted trying to delete empty scan server ZNodes,
will retry", e);
+ } finally {
+ // sleep for 5 mins
+ sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
+ }
+ }
+ }
+
+ }
+
private class StatusThread implements Runnable {
private boolean goodStats() {
@@ -1118,6 +1161,8 @@ public class Manager extends AbstractServer
tserverSet.startListeningForTabletServerChanges();
+ Threads.createThread("ScanServer Cleanup Thread", new
ScanServerZKCleaner()).start();
+
try {
blockForTservers();
} catch (InterruptedException ex) {