This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 43fbd861420ae24667f4b4162c63c3dc02f351fe
Merge: 53e47c13bd 27bbe6a1e7
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 23 19:17:41 2025 +0000

    Merge branch '2.1'

 .../accumulo/server/manager/LiveTServerSet.java    |  19 ----
 .../org/apache/accumulo/server/util/Admin.java     |  85 +++++++++++++----
 .../org/apache/accumulo/server/util/ZooZap.java    |  46 +++++++---
 .../java/org/apache/accumulo/manager/Manager.java  |   9 +-
 .../manager/ManagerClientServiceHandler.java       |  13 ++-
 .../manager/tableOps/ShutdownTServerTest.java      |   6 +-
 .../org/apache/accumulo/tserver/TabletServer.java  | 102 +++++++--------------
 .../test/functional/GracefulShutdownIT.java        |  10 +-
 8 files changed, 157 insertions(+), 133 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 8756b5ca46,e6a722dc9c..eea48e07ab
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@@ -27,25 -27,18 +27,24 @@@ import java.util.HashMap
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
  import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
  
  import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.clientImpl.thrift.ClientService;
  import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 -import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 -import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
 -import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockPaths;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
 +import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
  import org.apache.accumulo.core.metadata.TServerInstance;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@@ -203,53 -237,33 +202,37 @@@ public class LiveTServerSet implements 
      }
    }
  
 -  // The set of active tservers with locks, indexed by their name in zookeeper
 +  // The set of active tservers with locks, indexed by their name in 
zookeeper. When the contents of
 +  // this map are modified, tServersSnapshot should be set to null.
    private final Map<String,TServerInfo> current = new HashMap<>();
 -  // as above, indexed by TServerInstance
 -  private final Map<TServerInstance,TServerInfo> currentInstances = new 
HashMap<>();
 +
 +  private LiveTServersSnapshot tServersSnapshot = null;
  
-   private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown =
-       new ConcurrentHashMap<>();
- 
    // The set of entries in zookeeper without locks, and the first time each 
was noticed
 -  private final Map<String,Long> locklessServers = new HashMap<>();
 +  private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>();
  
 -  public LiveTServerSet(ServerContext context, Listener cback) {
 -    this.cback = cback;
 +  public LiveTServerSet(ServerContext context) {
 +    this.cback = new AtomicReference<>(null);
      this.context = context;
    }
  
 -  public synchronized ZooCache getZooCache() {
 -    if (zooCache == null) {
 -      zooCache = new ZooCache(context.getZooReader(), this);
 -    }
 -    return zooCache;
 +  private Listener getCback() {
 +    // fail fast if not yet set
 +    return Objects.requireNonNull(cback.get());
    }
  
 -  public synchronized void startListeningForTabletServerChanges() {
 +  public synchronized void startListeningForTabletServerChanges(Listener 
cback) {
      scanServers();
 -
 +    Objects.requireNonNull(cback);
 +    if (this.cback.compareAndSet(null, cback)) {
 +      this.context.getZooCache().addZooCacheWatcher(this);
 +    } else if (this.cback.get() != cback) {
 +      throw new IllegalStateException("Attempted to set different cback 
object");
 +    }
      ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor()
 -        .scheduleWithFixedDelay(this::scanServers, 0, 5000, 
TimeUnit.MILLISECONDS));
 +        .scheduleWithFixedDelay(this::scanServers, 5000, 5000, 
TimeUnit.MILLISECONDS));
    }
  
-   public void tabletServerShuttingDown(String server) {
- 
-     TServerInfo info = null;
-     synchronized (this) {
-       info = current.get(server);
-     }
-     if (info != null) {
-       serversShuttingDown.put(server, info);
-     } else {
-       log.info("Tablet Server reported it's shutting down, but not in list of 
current servers");
-     }
-   }
- 
    public synchronized void scanServers() {
      try {
        final Set<TServerInstance> updates = new HashSet<>();
@@@ -280,45 -298,34 +263,44 @@@
    }
  
    private synchronized void checkServer(final Set<TServerInstance> updates,
 -      final Set<TServerInstance> doomed, final String path, final String 
zPath)
 +      final Set<TServerInstance> doomed, final ServiceLockPath tserverPath)
        throws InterruptedException, KeeperException {
  
 -    TServerInfo info = current.get(zPath);
 +    // invalidate the snapshot forcing it to be recomputed the next time its 
requested
 +    tServersSnapshot = null;
 +
 +    final TServerInfo info = current.get(tserverPath.getServer());
  
 -    final var zLockPath = ServiceLock.path(path + "/" + zPath);
      ZcStat stat = new ZcStat();
 -    byte[] lockData = ServiceLock.getLockData(getZooCache(), zLockPath, stat);
 +    Optional<ServiceLockData> sld =
 +        ServiceLock.getLockData(context.getZooCache(), tserverPath, stat);
  
 -    if (lockData == null) {
 +    if (sld.isEmpty()) {
 +      log.trace("lock does not exist for server: {}", 
tserverPath.getServer());
        if (info != null) {
          doomed.add(info.instance);
 -        current.remove(zPath);
 -        currentInstances.remove(info.instance);
 +        current.remove(tserverPath.getServer());
-         serversShuttingDown.remove(tserverPath.toString());
 +        log.trace("removed {} from current set and adding to doomed list", 
tserverPath.getServer());
        }
  
 -      Long firstSeen = locklessServers.get(zPath);
 +      Long firstSeen = locklessServers.get(tserverPath);
        if (firstSeen == null) {
 -        locklessServers.put(zPath, System.currentTimeMillis());
 +        locklessServers.put(tserverPath, System.currentTimeMillis());
 +        log.trace("first seen, added {} to list of lockless servers", 
tserverPath.getServer());
        } else if (System.currentTimeMillis() - firstSeen > 
MINUTES.toMillis(10)) {
 -        deleteServerNode(path + "/" + zPath);
 -        locklessServers.remove(zPath);
 +        deleteServerNode(tserverPath.toString());
 +        locklessServers.remove(tserverPath);
 +        log.trace(
 +            "deleted zookeeper node for server: {}, has been without lock for 
over 10 minutes",
 +            tserverPath.getServer());
        }
      } else {
 -      locklessServers.remove(zPath);
 -      ServerServices services = new ServerServices(new String(lockData, 
UTF_8));
 -      HostAndPort client = 
services.getAddress(ServerServices.Service.TSERV_CLIENT);
 -      TServerInstance instance = new TServerInstance(client, 
stat.getEphemeralOwner());
 +      log.trace("Lock exists for server: {}, adding to current set", 
tserverPath.getServer());
 +      locklessServers.remove(tserverPath);
 +      HostAndPort address = 
sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV);
 +      ResourceGroupId resourceGroup =
 +          sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV);
 +      TServerInstance instance = new TServerInstance(address, 
stat.getEphemeralOwner());
  
        if (info == null) {
          updates.add(instance);
@@@ -404,70 -388,9 +386,69 @@@
      return tServerInfo.connection;
    }
  
 +  public synchronized ResourceGroupId getResourceGroup(TServerInstance 
server) {
 +    if (server == null) {
 +      return null;
 +    }
 +    TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server);
 +    if (tServerInfo == null) {
 +      return null;
 +    }
 +    return tServerInfo.resourceGroup;
 +  }
 +
 +  public static class LiveTServersSnapshot {
 +    private final Set<TServerInstance> tservers;
 +    private final Map<ResourceGroupId,Set<TServerInstance>> tserverGroups;
 +
 +    // TServerInfo is only for internal use, so this field is private w/o a 
getter.
 +    private final Map<TServerInstance,TServerInfo> tserversInfo;
 +
 +    @VisibleForTesting
 +    public LiveTServersSnapshot(Set<TServerInstance> currentServers,
 +        Map<ResourceGroupId,Set<TServerInstance>> serverGroups) {
 +      this.tserversInfo = null;
 +      this.tservers = Set.copyOf(currentServers);
 +      Map<ResourceGroupId,Set<TServerInstance>> copy = new HashMap<>();
 +      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
 +      this.tserverGroups = Collections.unmodifiableMap(copy);
 +    }
 +
 +    public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> 
currentServers,
 +        Map<ResourceGroupId,Set<TServerInstance>> serverGroups) {
 +      this.tserversInfo = Map.copyOf(currentServers);
 +      this.tservers = this.tserversInfo.keySet();
 +      Map<ResourceGroupId,Set<TServerInstance>> copy = new HashMap<>();
 +      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
 +      this.tserverGroups = Collections.unmodifiableMap(copy);
 +    }
 +
 +    public Set<TServerInstance> getTservers() {
 +      return tservers;
 +    }
 +
 +    public Map<ResourceGroupId,Set<TServerInstance>> getTserverGroups() {
 +      return tserverGroups;
 +    }
 +  }
 +
 +  public synchronized LiveTServersSnapshot getSnapshot() {
 +    if (tServersSnapshot == null) {
 +      HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>();
 +      Map<ResourceGroupId,Set<TServerInstance>> tserversGroups = new 
HashMap<>();
 +      current.values().forEach(tServerInfo -> {
 +        tServerInstances.put(tServerInfo.instance, tServerInfo);
 +        tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new 
HashSet<>())
 +            .add(tServerInfo.instance);
 +      });
 +      tServersSnapshot = new LiveTServersSnapshot(tServerInstances, 
tserversGroups);
 +    }
 +    return tServersSnapshot;
 +  }
 +
    public synchronized Set<TServerInstance> getCurrentServers() {
 -    Set<TServerInstance> current = currentInstances.keySet();
 -    return new HashSet<>(current);
 +    Set<TServerInstance> current = new HashSet<>(getSnapshot().getTservers());
-     serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance));
 +    return current;
    }
  
    public synchronized int size() {
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 915e154aee,e2ec6f7fa1..89755b94b8
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@@ -519,10 -385,26 +510,8 @@@ public class Admin implements KeywordEx
          if (ping(context, pingCommand.args) != 0) {
            rc = 4;
          }
 -      } else if (cl.getParsedCommand().equals("checkTablets")) {
 -        System.out.println("\n*** Looking for offline tablets ***\n");
 -        if (FindOfflineTablets.findOffline(context, 
checkTabletsCommand.tableName) != 0) {
 -          rc = 5;
 -        }
 -        System.out.println("\n*** Looking for missing files ***\n");
 -        if (checkTabletsCommand.tableName == null) {
 -          if (RemoveEntriesForMissingFiles.checkAllTables(context, 
checkTabletsCommand.fixFiles)
 -              != 0) {
 -            rc = 6;
 -          }
 -        } else {
 -          if (RemoveEntriesForMissingFiles.checkTable(context, 
checkTabletsCommand.tableName,
 -              checkTabletsCommand.fixFiles) != 0) {
 -            rc = 6;
 -          }
 -        }
 -
        } else if (cl.getParsedCommand().equals("stop")) {
-         stopTabletServer(context, stopOpts.args, stopOpts.force);
-       } else if (cl.getParsedCommand().equals("signalShutdown")) {
-         signalGracefulShutdown(context, gracefulShutdownCommand.address);
+         stopServers(context, stopOpts.args, stopOpts.force);
        } else if (cl.getParsedCommand().equals("dumpConfig")) {
          printConfig(context, dumpConfigCommand);
        } else if (cl.getParsedCommand().equals("volumes")) {
@@@ -668,11 -552,57 +657,62 @@@
          client -> client.shutdown(TraceUtil.traceInfo(), context.rpcCreds(), 
tabletServersToo));
    }
  
-   // Visible for tests
-   public static void signalGracefulShutdown(final ClientContext context, 
String address) {
+   private static void stopServers(final ServerContext context, List<String> 
servers,
+       final boolean force)
+       throws AccumuloException, AccumuloSecurityException, 
InterruptedException, KeeperException {
+     List<String> hostOnly = new ArrayList<>();
 -    Set<HostAndPort> hostAndPort = new TreeSet<>();
++    Set<String> hostAndPort = new TreeSet<>();
+ 
+     for (var server : servers) {
+       if (server.contains(":")) {
 -        hostAndPort.add(HostAndPort.fromString(server));
++        hostAndPort.add(server);
+       } else {
+         hostOnly.add(server);
+       }
+     }
+ 
+     if (!hostOnly.isEmpty()) {
+       // The old impl of this command with the old behavior
+       stopTabletServer(context, hostOnly, force);
+     }
  
-     Objects.requireNonNull(address, "address not set");
-     final HostAndPort hp = HostAndPort.fromString(address);
+     if (!hostAndPort.isEmpty()) {
+       // New behavior for this command when ports are present, supports more 
than just tservers. Is
+       // also async.
+       if (force) {
 -        ZooZap.Opts opts = new ZooZap.Opts();
 -        var zk = context.getZooReaderWriter();
 -        var iid = context.getInstanceID();
 -
 -        String tserversPath = Constants.ZROOT + "/" + iid + 
Constants.ZTSERVERS;
 -        ZooZap.removeLocks(zk, tserversPath, hostAndPort::contains, opts);
 -        String compactorsBasepath = Constants.ZROOT + "/" + iid + 
Constants.ZCOMPACTORS;
 -        ZooZap.removeGroupedLocks(zk, compactorsBasepath, rg -> true, 
hostAndPort::contains, opts);
 -        String sserversPath = Constants.ZROOT + "/" + iid + 
Constants.ZSSERVERS;
 -        ZooZap.removeGroupedLocks(zk, sserversPath, rg -> true, 
hostAndPort::contains, opts);
 -
 -        String managerLockPath = Constants.ZROOT + "/" + iid + 
Constants.ZMANAGER_LOCK;
 -        ZooZap.removeSingletonLock(zk, managerLockPath, 
hostAndPort::contains, opts);
 -        String gcLockPath = Constants.ZROOT + "/" + iid + Constants.ZGC_LOCK;
 -        ZooZap.removeSingletonLock(zk, gcLockPath, hostAndPort::contains, 
opts);
 -        String monitorLockPath = Constants.ZROOT + "/" + iid + 
Constants.ZMONITOR_LOCK;
 -        ZooZap.removeSingletonLock(zk, monitorLockPath, 
hostAndPort::contains, opts);
++        var zoo = context.getZooSession().asReaderWriter();
++
++        AddressSelector addresses = 
AddressSelector.matching(hostAndPort::contains);
++        List<ServiceLockPath> pathsToRemove = new ArrayList<>();
++        pathsToRemove.addAll(context.getServerPaths().getCompactor(rg -> 
true, addresses, false));
++        pathsToRemove.addAll(context.getServerPaths().getScanServer(rg -> 
true, addresses, false));
++        pathsToRemove
++            .addAll(context.getServerPaths().getTabletServer(rg -> true, 
addresses, false));
++        ZooZap.filterSingleton(context, 
context.getServerPaths().getManager(false), addresses)
++            .ifPresent(pathsToRemove::add);
++        ZooZap.filterSingleton(context, 
context.getServerPaths().getGarbageCollector(false),
++            addresses).ifPresent(pathsToRemove::add);
++        ZooZap.filterSingleton(context, 
context.getServerPaths().getMonitor(false), addresses)
++            .ifPresent(pathsToRemove::add);
++
++        for (var path : pathsToRemove) {
++          List<String> children = zoo.getChildren(path.toString());
++          for (String child : children) {
++            log.trace("removing lock {}", path + "/" + child);
++            zoo.recursiveDelete(path + "/" + child, 
ZooUtil.NodeMissingPolicy.SKIP);
++          }
++        }
+       } else {
+         for (var server : hostAndPort) {
 -          signalGracefulShutdown(context, server);
++          signalGracefulShutdown(context, HostAndPort.fromString(server));
+         }
+       }
+     }
+   }
+ 
+   // Visible for tests
+   public static void signalGracefulShutdown(final ClientContext context, 
HostAndPort hp) {
+     Objects.requireNonNull(hp, "address not set");
      ServerProcessService.Client client = null;
      try {
        client = 
ThriftClientTypes.SERVER_PROCESS.getServerProcessConnection(context, log,
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index edeb3adfb4,3d548c37cf..8b486468b8
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@@ -22,23 -24,27 +22,25 @@@ import java.io.IOException
  import java.io.UncheckedIOException;
  import java.nio.file.Files;
  import java.util.Arrays;
 +import java.util.HashSet;
  import java.util.List;
 -import java.util.function.Predicate;
++import java.util.Optional;
 +import java.util.Set;
  import java.util.stream.Collectors;
  
 -import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.Help;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.conf.SiteConfiguration;
 -import org.apache.accumulo.core.data.InstanceId;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 +import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 -import org.apache.accumulo.core.singletons.SingletonManager;
 -import org.apache.accumulo.core.singletons.SingletonManager.Mode;
 -import org.apache.accumulo.core.util.HostAndPort;
 -import org.apache.accumulo.core.volume.VolumeConfiguration;
 -import org.apache.accumulo.server.fs.VolumeManager;
++import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
 +import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.security.SecurityUtil;
  import org.apache.accumulo.start.spi.KeywordExecutable;
 -import org.apache.hadoop.conf.Configuration;
 -import org.apache.hadoop.fs.Path;
  import org.apache.zookeeper.KeeperException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -145,30 -157,38 +147,33 @@@ public class ZooZap implements KeywordE
        return;
      }
  
 -    String volDir = 
VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
 -    Path instanceDir = new Path(volDir, "instance_id");
 -    InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new 
Configuration());
 -    ZooReaderWriter zoo = new ZooReaderWriter(siteConf);
 -
 -    if (opts.zapMaster) {
 -      log.warn("The -master option is deprecated. Please use -manager 
instead.");
 -    }
 -    if (opts.zapManager || opts.zapMaster) {
 -      String managerLockPath = Constants.ZROOT + "/" + iid + 
Constants.ZMANAGER_LOCK;
 -
 +    var zrw = context.getZooSession().asReaderWriter();
 +    if (opts.zapManager) {
-       ServiceLockPath managerLockPath = 
context.getServerPaths().createManagerPath();
        try {
-         removeSingletonLock(zrw, managerLockPath, addressSelector, opts);
 -        removeSingletonLock(zoo, managerLockPath, hostPortPredicate, opts);
--      } catch (KeeperException | InterruptedException e) {
++        ServiceLockPath managerLockPath = 
context.getServerPaths().createManagerPath();
++        filterSingleton(context, managerLockPath, addressSelector)
++            .ifPresent(slp -> removeSingletonLock(zrw, slp, opts));
++      } catch (RuntimeException e) {
          log.error("Error deleting manager lock", e);
        }
      }
  
      if (opts.zapGc) {
-       ServiceLockPath gcLockPath = 
context.getServerPaths().createGarbageCollectorPath();
 -      String gcLockPath = Constants.ZROOT + "/" + iid + Constants.ZGC_LOCK;
        try {
-         removeSingletonLock(zrw, gcLockPath, addressSelector, opts);
 -        removeSingletonLock(zoo, gcLockPath, hostPortPredicate, opts);
--      } catch (KeeperException | InterruptedException e) {
--        log.error("Error deleting manager lock", e);
++        ServiceLockPath gcLockPath = 
context.getServerPaths().createGarbageCollectorPath();
++        filterSingleton(context, gcLockPath, addressSelector)
++            .ifPresent(slp -> removeSingletonLock(zrw, slp, opts));
++      } catch (RuntimeException e) {
++        log.error("Error deleting gc lock", e);
        }
      }
  
      if (opts.zapMonitor) {
-       ServiceLockPath monitorLockPath = 
context.getServerPaths().createMonitorPath();
 -      String monitorLockPath = Constants.ZROOT + "/" + iid + 
Constants.ZMONITOR_LOCK;
        try {
-         removeSingletonLock(zrw, monitorLockPath, addressSelector, opts);
 -        removeSingletonLock(zoo, monitorLockPath, hostPortPredicate, opts);
--      } catch (KeeperException | InterruptedException e) {
++        ServiceLockPath monitorPath = 
context.getServerPaths().createMonitorPath();
++        filterSingleton(context, monitorPath, addressSelector)
++            .ifPresent(slp -> removeSingletonLock(zrw, slp, opts));
++      } catch (RuntimeException e) {
          log.error("Error deleting monitor lock", e);
        }
      }
@@@ -235,11 -262,45 +240,26 @@@
      }
    }
  
-   private static void removeSingletonLock(ZooReaderWriter zoo, 
ServiceLockPath path,
-       AddressSelector addressSelector, Opts ops) throws KeeperException, 
InterruptedException {
-     if (addressSelector.getPredicate().test(path.getServer())) {
 -  static void removeGroupedLocks(ZooReaderWriter zoo, String path, 
Predicate<String> groupPredicate,
 -      Predicate<HostAndPort> hostPortPredicate, Opts opts)
 -      throws KeeperException, InterruptedException {
 -    if (zoo.exists(path)) {
 -      List<String> groups = zoo.getChildren(path);
 -      for (String group : groups) {
 -        if (groupPredicate.test(group)) {
 -          removeLocks(zoo, path + "/" + group, hostPortPredicate, opts);
 -        }
 -      }
++  private static void removeSingletonLock(ZooReaderWriter zoo, 
ServiceLockPath path, Opts ops) {
++    try {
 +      zapDirectory(zoo, path, ops);
++    } catch (KeeperException | InterruptedException e) {
++      throw new IllegalStateException(e);
      }
    }
  
 -  static void removeLocks(ZooReaderWriter zoo, String path,
 -      Predicate<HostAndPort> hostPortPredicate, Opts opts)
 -      throws KeeperException, InterruptedException {
 -    if (zoo.exists(path)) {
 -      List<String> children = zoo.getChildren(path);
 -      for (String child : children) {
 -        if (hostPortPredicate.test(HostAndPort.fromString(child))) {
 -          message("Deleting " + path + "/" + child + " from zookeeper", opts);
 -          if (!opts.dryRun) {
 -            // TODO not sure this is the correct way to delete this lock.. 
the code was deleting
 -            // locks in multiple different ways for diff servers types.
 -            zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
 -          }
++  static Optional<ServiceLockPath> filterSingleton(ServerContext context, 
ServiceLockPath path,
++      AddressSelector addressSelector) {
++    Optional<ServiceLockData> sld = context.getZooCache().getLockData(path);
++    return sld.filter(lockData -> {
++      for (var service : ServiceLockData.ThriftService.values()) {
++        var address = lockData.getAddress(service);
++        if (address != null) {
++          return addressSelector.getPredicate().test(address.toString());
+         }
+       }
 -    }
 -  }
+ 
 -  static void removeSingletonLock(ZooReaderWriter zoo, String path,
 -      Predicate<HostAndPort> hostPortPredicate, Opts ops)
 -      throws KeeperException, InterruptedException {
 -    var lockData = ServiceLock.getLockData(zoo.getZooKeeper(), 
ServiceLock.path(path));
 -    if (lockData != null
 -        && hostPortPredicate.test(HostAndPort.fromString(new String(lockData, 
UTF_8)))) {
 -      zapDirectory(zoo, path, ops);
 -    }
++      return false;
++    }).map(lockData -> path);
+   }
 -
  }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index 2be39fdfcf,b0f173974b..992d46a282
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@@ -51,8 -52,11 +51,9 @@@ import org.apache.accumulo.core.clientI
  import 
org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
  import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
  import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 -import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.NamespaceId;
 -import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.data.ResourceGroupId;
  import org.apache.accumulo.core.data.TableId;
 -import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
  import org.apache.accumulo.core.fate.Fate;
@@@ -345,9 -349,41 +346,19 @@@ public class ManagerClientServiceHandle
            SecurityErrorCode.PERMISSION_DENIED);
      }
      log.info("Tablet Server {} has reported it's shutting down", 
tabletServer);
-     manager.tserverSet.tabletServerShuttingDown(tabletServer);
+     var tserver = new TServerInstance(tabletServer);
+     if (manager.shutdownTServer(tserver)) {
+       // If there is an exception seeding the fate tx this should cause the 
RPC to fail which should
+       // cause the tserver to halt. Because of that not making an attempt to 
handle failure here.
 -      Fate<Manager> fate = manager.fate();
 -      long tid = fate.startTransaction();
++      Fate<Manager> fate = manager.fate(FateInstanceType.META);
++      var tid = fate.startTransaction();
+       String msg = "Shutdown tserver " + tabletServer;
 -      fate.seedTransaction("ShutdownTServer", tid,
 -          new TraceRepo<>(new ShutdownTServer(tserver, false)), true, msg);
++      // TODO resource group
++      fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, tid,
++          new TraceRepo<>(new ShutdownTServer(tserver, 
ResourceGroupId.DEFAULT, false)), true, msg);
+     }
    }
  
 -  @Override
 -  public void reportSplitExtent(TInfo info, TCredentials credentials, String 
serverName,
 -      TabletSplit split) throws ThriftSecurityException {
 -    if (!manager.security.canPerformSystemActions(credentials)) {
 -      throw new ThriftSecurityException(credentials.getPrincipal(),
 -          SecurityErrorCode.PERMISSION_DENIED);
 -    }
 -
 -    KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet);
 -    if (manager.migrations.removeExtent(oldTablet) != null) {
 -      Manager.log.info("Canceled migration of {}", split.oldTablet);
 -    }
 -    for (TServerInstance instance : manager.tserverSet.getCurrentServers()) {
 -      if (serverName.equals(instance.getHostPort())) {
 -        manager.nextEvent.event("%s reported split %s, %s", serverName,
 -            KeyExtent.fromThrift(split.newTablets.get(0)),
 -            KeyExtent.fromThrift(split.newTablets.get(1)));
 -        return;
 -      }
 -    }
 -    Manager.log.warn("Got a split from a server we don't recognize: {}", 
serverName);
 -  }
 -
    @Override
    public void reportTabletStatus(TInfo info, TCredentials credentials, String 
serverName,
        TabletLoadState status, TKeyExtent ttablet) throws 
ThriftSecurityException {
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 95816c5369,2573d6b451..ed81fe48b2
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -51,10 -54,9 +51,8 @@@ import java.util.TreeSet
  import java.util.UUID;
  import java.util.concurrent.BlockingDeque;
  import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Future;
  import java.util.concurrent.LinkedBlockingDeque;
  import java.util.concurrent.ScheduledFuture;
--import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
@@@ -76,36 -73,27 +74,34 @@@ import org.apache.accumulo.core.conf.Si
  import org.apache.accumulo.core.data.InstanceId;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
 -import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher;
 -import org.apache.accumulo.core.fate.zookeeper.ZooCache;
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
  import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
 +import org.apache.accumulo.core.lock.ServiceLockSupport;
 +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher;
 +import org.apache.accumulo.core.manager.thrift.Compacting;
  import org.apache.accumulo.core.manager.thrift.ManagerClientService;
 -import org.apache.accumulo.core.master.thrift.BulkImportState;
 -import org.apache.accumulo.core.master.thrift.Compacting;
 -import org.apache.accumulo.core.master.thrift.TableInfo;
 -import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 -import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.manager.thrift.TableInfo;
 +import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.SystemTables;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.metadata.schema.Ample;
- import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.metrics.MetricsInfo;
 -import org.apache.accumulo.core.process.thrift.ServerProcessService;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
  import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
 +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader;
 +import 
org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader.UnloaderParams;
- import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
 +import org.apache.accumulo.core.tabletserver.UnloaderParamsImpl;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.ComparablePair;
@@@ -114,13 -103,15 +110,11 @@@ import org.apache.accumulo.core.util.Ma
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.core.util.Retry.RetryFactory;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
  import org.apache.accumulo.core.util.UtilWaitThread;
- import org.apache.accumulo.core.util.threads.ThreadPoolNames;
 -import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
- import org.apache.accumulo.core.util.time.SteadyTime;
  import org.apache.accumulo.server.AbstractServer;
 -import org.apache.accumulo.server.GarbageCollectionLogger;
  import org.apache.accumulo.server.ServerContext;
 -import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
  import org.apache.accumulo.server.TabletLevel;
  import org.apache.accumulo.server.client.ClientServiceHandler;
  import org.apache.accumulo.server.compaction.CompactionWatcher;
@@@ -675,81 -950,42 +669,37 @@@ public class TabletServer extends Abstr
        }
      }
  
-     // Tell the Manager we are shutting down so that it doesn't try
-     // to assign tablets.
      ManagerClientService.Client iface = 
managerConnection(getManagerAddress());
      try {
-       iface.tabletServerStopping(TraceUtil.traceInfo(), 
getContext().rpcCreds(),
-           advertiseAddressString);
-     } catch (TException e) {
-       Halt.halt(-1, "Error informing Manager that we are shutting down, 
exiting!", e);
-     } finally {
-       returnManagerConnection(iface);
-     }
+       // Ask the manager to unload our tablets and stop loading new tablets
+       if (iface == null) {
+         Halt.halt(-1, "Error informing Manager that we are shutting down, 
exiting!");
+       } else {
+         iface.tabletServerStopping(TraceUtil.traceInfo(), 
getContext().rpcCreds(),
+             getTabletSession().getHostPortSession());
+       }
  
-     // Best-effort attempt at unloading tablets.
-     log.debug("Unloading tablets");
-     final List<Future<?>> futures = new ArrayList<>();
-     final ThreadPoolExecutor tpe = getContext().threadPools()
-         
.getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8)
-         .numMaxThreads(16).build();
+       boolean managerDown = false;
+       while (!getOnlineTablets().isEmpty()) {
+         log.info("Shutdown requested, waiting for manager to unload {} 
tablets",
+             getOnlineTablets().size());
  
-     iface = managerConnection(getManagerAddress());
-     boolean managerDown = false;
 -        managerDown = sendManagerMessages(managerDown, iface);
++        managerDown = sendManagerMessages(managerDown, iface, 
advertiseAddressString);
  
-     try {
-       for (DataLevel level : new DataLevel[] {DataLevel.USER, 
DataLevel.METADATA, DataLevel.ROOT}) {
-         getOnlineTablets().keySet().forEach(ke -> {
-           if (DataLevel.of(ke.tableId()) == level) {
-             futures.add(tpe.submit(new UnloadTabletHandler(this, ke, 
TUnloadTabletGoal.UNASSIGNED,
-                 SteadyTime.from(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS))));
-           }
-         });
-         while (!futures.isEmpty()) {
-           Iterator<Future<?>> unloads = futures.iterator();
-           while (unloads.hasNext()) {
-             Future<?> f = unloads.next();
-             if (f.isDone()) {
-               if (!managerDown) {
-                 ManagerMessage mm = managerMessages.poll();
-                 try {
-                   mm.send(getContext().rpcCreds(), advertiseAddressString, 
iface);
-                 } catch (TException e) {
-                   managerDown = true;
-                   log.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
-                       e.getMessage());
-                 }
-               }
-               unloads.remove();
-             }
-           }
-           log.debug("Waiting on {} {} tablets to close.", futures.size(), 
level);
-           UtilWaitThread.sleep(1000);
-         }
-         log.debug("All {} tablets unloaded", level);
+         UtilWaitThread.sleep(1000);
        }
+ 
 -      sendManagerMessages(managerDown, iface);
++      sendManagerMessages(managerDown, iface, advertiseAddressString);
+ 
+     } catch (TException | RuntimeException e) {
+       Halt.halt(-1, "Error informing Manager that we are shutting down, 
exiting!", e);
      } finally {
-       if (!managerDown) {
-         try {
-           ManagerMessage mm = managerMessages.poll();
-           do {
-             if (mm != null) {
-               mm.send(getContext().rpcCreds(), advertiseAddressString, iface);
-             }
-             mm = managerMessages.poll();
-           } while (mm != null);
-         } catch (TException e) {
-           log.debug("Error sending message to Manager during tablet 
unloading, msg: {}",
-               e.getMessage());
-         }
-       }
        returnManagerConnection(iface);
-       tpe.shutdown();
      }
  
 -    log.debug("Stopping Replication Server");
 -    if (this.replServer != null) {
 -      this.replServer.stop();
 -    }
 -
      log.debug("Stopping Thrift Servers");
 -    if (server != null) {
 -      server.stop();
 +    if (getThriftServer() != null) {
 +      getThriftServer().stop();
      }
  
      try {
@@@ -770,8 -1006,55 +720,24 @@@
      }
    }
  
 -  private boolean sendManagerMessages(boolean managerDown, 
ManagerClientService.Client iface) {
++  private boolean sendManagerMessages(boolean managerDown, 
ManagerClientService.Client iface,
++      String advertiseAddressString) {
+     ManagerMessage mm = managerMessages.poll();
+     while (mm != null && !managerDown) {
+       try {
 -        mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
++        mm.send(getContext().rpcCreds(), advertiseAddressString, iface);
+         mm = managerMessages.poll();
+       } catch (TException e) {
+         managerDown = true;
 -        LOG.debug("Error sending message to Manager during tablet unloading, 
msg: {}",
++        log.debug("Error sending message to Manager during tablet unloading, 
msg: {}",
+             e.getMessage());
+       }
+     }
+     return managerDown;
+   }
+ 
 -  @SuppressWarnings("deprecation")
 -  private void setupReplication(AccumuloConfiguration aconf) {
 -    // Start the thrift service listening for incoming replication requests
 -    try {
 -      startReplicationService();
 -    } catch (UnknownHostException e) {
 -      throw new RuntimeException("Failed to start replication service", e);
 -    }
 -
 -    // Start the pool to handle outgoing replications
 -    final ThreadPoolExecutor replicationThreadPool = 
ThreadPools.getServerThreadPools()
 -        .createExecutorService(getConfiguration(), 
Property.REPLICATION_WORKER_THREADS);
 -    replWorker.setExecutor(replicationThreadPool);
 -    replWorker.run();
 -
 -    // Check the configuration value for the size of the pool and, if 
changed, resize the pool
 -    Runnable replicationWorkThreadPoolResizer = () -> {
 -      ThreadPools.resizePool(replicationThreadPool, aconf, 
Property.REPLICATION_WORKER_THREADS);
 -    };
 -    ScheduledFuture<?> future = context.getScheduledExecutor()
 -        .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, 
TimeUnit.SECONDS);
 -    watchNonCriticalScheduledTask(future);
 -  }
 -
 -  public String getClientAddressString() {
 -    if (clientAddress == null) {
 -      return null;
 -    }
 -    return clientAddress.getHost() + ":" + clientAddress.getPort();
 -  }
 -
    public TServerInstance getTabletSession() {
 -    String address = getClientAddressString();
 -    if (address == null) {
 +    if (getAdvertiseAddress() == null) {
        return null;
      }
      if (lockSessionId == -1) {
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
index 2c2706ecd4,cbdb01b2f2..ada20ee904
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java
@@@ -177,12 -180,10 +177,12 @@@ public class GracefulShutdownIT extend
        });
  
        // Restart Tablet Server
 -      final List<String> tservers = 
client.instanceOperations().getTabletServers();
 +      final Set<ServiceLockPath> tservers = 
getCluster().getServerContext().getServerPaths()
 +          .getTabletServer((rg) -> rg.equals(ResourceGroupId.DEFAULT), 
AddressSelector.all(), true);
        assertEquals(2, tservers.size());
 -      final HostAndPort tserverAddress = 
HostAndPort.fromString(tservers.get(0));
 +      final HostAndPort tserverAddress =
 +          HostAndPort.fromString(tservers.iterator().next().getServer());
-       Admin.signalGracefulShutdown(ctx, tserverAddress.toString());
+       Admin.signalGracefulShutdown(ctx, tserverAddress);
        Wait.waitFor(() -> {
          control.refreshProcesses(ServerType.TABLET_SERVER);
          return control.getProcesses(ServerType.TABLET_SERVER).size() == 1;
@@@ -193,16 -194,10 +193,16 @@@
        client.instanceOperations().waitForBalance();
  
        // Restart Manager
 -      final List<String> managers = 
client.instanceOperations().getManagerLocations();
 -      assertEquals(1, managers.size());
 -      final HostAndPort managerAddress = 
HostAndPort.fromString(managers.get(0));
 +      final ServiceLockPath manager =
 +          getCluster().getServerContext().getServerPaths().getManager(true);
 +      assertNotNull(manager);
 +      Set<ServerId> managerLocations =
 +          client.instanceOperations().getServers(ServerId.Type.MANAGER);
 +      assertNotNull(managerLocations);
 +      assertEquals(1, managerLocations.size());
 +      final HostAndPort managerAddress =
 +          
HostAndPort.fromString(managerLocations.iterator().next().toHostPortString());
-       Admin.signalGracefulShutdown(ctx, managerAddress.toString());
+       Admin.signalGracefulShutdown(ctx, managerAddress);
        Wait.waitFor(() -> {
          control.refreshProcesses(ServerType.MANAGER);
          return control.getProcesses(ServerType.MANAGER).isEmpty();
@@@ -235,18 -223,11 +235,18 @@@
  
        final long numFiles2 = getNumFilesForTable(ctx, tid);
        assertEquals(numFiles2, numFiles);
 -      assertEquals(0, 
ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize());
 +      Set<ServerId> newManagerLocations =
 +          client.instanceOperations().getServers(ServerId.Type.MANAGER);
 +      assertNotNull(newManagerLocations);
 +      assertEquals(1, newManagerLocations.size());
 +      final HostAndPort newManagerAddress =
 +          
HostAndPort.fromString(newManagerLocations.iterator().next().toHostPortString());
 +      assertEquals(0, ExternalCompactionTestUtils
 +          .getRunningCompactions(ctx, 
Optional.of(newManagerAddress)).getCompactionsSize());
        client.tableOperations().compact(tableName, cc);
 -      Wait.waitFor(
 -          () -> 
ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize() > 
0);
 +      Wait.waitFor(() -> ExternalCompactionTestUtils
 +          .getRunningCompactions(ctx, 
Optional.of(newManagerAddress)).getCompactionsSize() > 0);
-       Admin.signalGracefulShutdown(ctx, compactorAddress.toString());
+       Admin.signalGracefulShutdown(ctx, compactorAddress);
        Wait.waitFor(() -> {
          control.refreshProcesses(ServerType.COMPACTOR);
          return control.getProcesses(ServerType.COMPACTOR).isEmpty();


Reply via email to