This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 43fbd861420ae24667f4b4162c63c3dc02f351fe Merge: 53e47c13bd 27bbe6a1e7 Author: Keith Turner <[email protected]> AuthorDate: Wed Jul 23 19:17:41 2025 +0000 Merge branch '2.1' .../accumulo/server/manager/LiveTServerSet.java | 19 ---- .../org/apache/accumulo/server/util/Admin.java | 85 +++++++++++++---- .../org/apache/accumulo/server/util/ZooZap.java | 46 +++++++--- .../java/org/apache/accumulo/manager/Manager.java | 9 +- .../manager/ManagerClientServiceHandler.java | 13 ++- .../manager/tableOps/ShutdownTServerTest.java | 6 +- .../org/apache/accumulo/tserver/TabletServer.java | 102 +++++++-------------- .../test/functional/GracefulShutdownIT.java | 10 +- 8 files changed, 157 insertions(+), 133 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 8756b5ca46,e6a722dc9c..eea48e07ab --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@@ -27,25 -27,18 +27,24 @@@ import java.util.HashMap import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.Set; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@@ -203,53 -237,33 +202,37 @@@ public class LiveTServerSet implements } } - // The set of active tservers with locks, indexed by their name in zookeeper + // The set of active tservers with locks, indexed by their name in zookeeper. When the contents of + // this map are modified, tServersSnapshot should be set to null. private final Map<String,TServerInfo> current = new HashMap<>(); - // as above, indexed by TServerInstance - private final Map<TServerInstance,TServerInfo> currentInstances = new HashMap<>(); + + private LiveTServersSnapshot tServersSnapshot = null; - private final ConcurrentHashMap<String,TServerInfo> serversShuttingDown = - new ConcurrentHashMap<>(); - // The set of entries in zookeeper without locks, and the first time each was noticed - private final Map<String,Long> locklessServers = new HashMap<>(); + private final Map<ServiceLockPath,Long> locklessServers = new HashMap<>(); - public LiveTServerSet(ServerContext context, Listener cback) { - this.cback = cback; + public LiveTServerSet(ServerContext context) { + this.cback = new AtomicReference<>(null); this.context = context; } - public synchronized ZooCache getZooCache() { - if (zooCache == null) { - zooCache = new ZooCache(context.getZooReader(), this); - } - return zooCache; + private Listener getCback() { + // fail fast if not yet set + return Objects.requireNonNull(cback.get()); } - public synchronized void startListeningForTabletServerChanges() { + public synchronized void startListeningForTabletServerChanges(Listener cback) { scanServers(); - + Objects.requireNonNull(cback); + if (this.cback.compareAndSet(null, cback)) { + this.context.getZooCache().addZooCacheWatcher(this); + } else if (this.cback.get() != cback) { + throw new IllegalStateException("Attempted to set different cback object"); + } ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor() - .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS)); + .scheduleWithFixedDelay(this::scanServers, 5000, 5000, TimeUnit.MILLISECONDS)); } - public void tabletServerShuttingDown(String server) { - - TServerInfo info = null; - synchronized (this) { - info = current.get(server); - } - if (info != null) { - serversShuttingDown.put(server, info); - } else { - log.info("Tablet Server reported it's shutting down, but not in list of current servers"); - } - } - public synchronized void scanServers() { try { final Set<TServerInstance> updates = new HashSet<>(); @@@ -280,45 -298,34 +263,44 @@@ } private synchronized void checkServer(final Set<TServerInstance> updates, - final Set<TServerInstance> doomed, final String path, final String zPath) + final Set<TServerInstance> doomed, final ServiceLockPath tserverPath) throws InterruptedException, KeeperException { - TServerInfo info = current.get(zPath); + // invalidate the snapshot forcing it to be recomputed the next time its requested + tServersSnapshot = null; + + final TServerInfo info = current.get(tserverPath.getServer()); - final var zLockPath = ServiceLock.path(path + "/" + zPath); ZcStat stat = new ZcStat(); - byte[] lockData = ServiceLock.getLockData(getZooCache(), zLockPath, stat); + Optional<ServiceLockData> sld = + ServiceLock.getLockData(context.getZooCache(), tserverPath, stat); - if (lockData == null) { + if (sld.isEmpty()) { + log.trace("lock does not exist for server: {}", tserverPath.getServer()); if (info != null) { doomed.add(info.instance); - current.remove(zPath); - currentInstances.remove(info.instance); + current.remove(tserverPath.getServer()); - serversShuttingDown.remove(tserverPath.toString()); + log.trace("removed {} from current set and adding to doomed list", tserverPath.getServer()); } - Long firstSeen = locklessServers.get(zPath); + Long firstSeen = locklessServers.get(tserverPath); if (firstSeen == null) { - locklessServers.put(zPath, System.currentTimeMillis()); + locklessServers.put(tserverPath, System.currentTimeMillis()); + log.trace("first seen, added {} to list of lockless servers", tserverPath.getServer()); } else if (System.currentTimeMillis() - firstSeen > MINUTES.toMillis(10)) { - deleteServerNode(path + "/" + zPath); - locklessServers.remove(zPath); + deleteServerNode(tserverPath.toString()); + locklessServers.remove(tserverPath); + log.trace( + "deleted zookeeper node for server: {}, has been without lock for over 10 minutes", + tserverPath.getServer()); } } else { - locklessServers.remove(zPath); - ServerServices services = new ServerServices(new String(lockData, UTF_8)); - HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT); - TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); + log.trace("Lock exists for server: {}, adding to current set", tserverPath.getServer()); + locklessServers.remove(tserverPath); + HostAndPort address = sld.orElseThrow().getAddress(ServiceLockData.ThriftService.TSERV); + ResourceGroupId resourceGroup = + sld.orElseThrow().getGroup(ServiceLockData.ThriftService.TSERV); + TServerInstance instance = new TServerInstance(address, stat.getEphemeralOwner()); if (info == null) { updates.add(instance); @@@ -404,70 -388,9 +386,69 @@@ return tServerInfo.connection; } + public synchronized ResourceGroupId getResourceGroup(TServerInstance server) { + if (server == null) { + return null; + } + TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server); + if (tServerInfo == null) { + return null; + } + return tServerInfo.resourceGroup; + } + + public static class LiveTServersSnapshot { + private final Set<TServerInstance> tservers; + private final Map<ResourceGroupId,Set<TServerInstance>> tserverGroups; + + // TServerInfo is only for internal use, so this field is private w/o a getter. + private final Map<TServerInstance,TServerInfo> tserversInfo; + + @VisibleForTesting + public LiveTServersSnapshot(Set<TServerInstance> currentServers, + Map<ResourceGroupId,Set<TServerInstance>> serverGroups) { + this.tserversInfo = null; + this.tservers = Set.copyOf(currentServers); + Map<ResourceGroupId,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> currentServers, + Map<ResourceGroupId,Set<TServerInstance>> serverGroups) { + this.tserversInfo = Map.copyOf(currentServers); + this.tservers = this.tserversInfo.keySet(); + Map<ResourceGroupId,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public Set<TServerInstance> getTservers() { + return tservers; + } + + public Map<ResourceGroupId,Set<TServerInstance>> getTserverGroups() { + return tserverGroups; + } + } + + public synchronized LiveTServersSnapshot getSnapshot() { + if (tServersSnapshot == null) { + HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>(); + Map<ResourceGroupId,Set<TServerInstance>> tserversGroups = new HashMap<>(); + current.values().forEach(tServerInfo -> { + tServerInstances.put(tServerInfo.instance, tServerInfo); + tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new HashSet<>()) + .add(tServerInfo.instance); + }); + tServersSnapshot = new LiveTServersSnapshot(tServerInstances, tserversGroups); + } + return tServersSnapshot; + } + public synchronized Set<TServerInstance> getCurrentServers() { - Set<TServerInstance> current = currentInstances.keySet(); - return new HashSet<>(current); + Set<TServerInstance> current = new HashSet<>(getSnapshot().getTservers()); - serversShuttingDown.values().forEach(tsi -> current.remove(tsi.instance)); + return current; } public synchronized int size() { diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 915e154aee,e2ec6f7fa1..89755b94b8 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@@ -519,10 -385,26 +510,8 @@@ public class Admin implements KeywordEx if (ping(context, pingCommand.args) != 0) { rc = 4; } - } else if (cl.getParsedCommand().equals("checkTablets")) { - System.out.println("\n*** Looking for offline tablets ***\n"); - if (FindOfflineTablets.findOffline(context, checkTabletsCommand.tableName) != 0) { - rc = 5; - } - System.out.println("\n*** Looking for missing files ***\n"); - if (checkTabletsCommand.tableName == null) { - if (RemoveEntriesForMissingFiles.checkAllTables(context, checkTabletsCommand.fixFiles) - != 0) { - rc = 6; - } - } else { - if (RemoveEntriesForMissingFiles.checkTable(context, checkTabletsCommand.tableName, - checkTabletsCommand.fixFiles) != 0) { - rc = 6; - } - } - } else if (cl.getParsedCommand().equals("stop")) { - stopTabletServer(context, stopOpts.args, stopOpts.force); - } else if (cl.getParsedCommand().equals("signalShutdown")) { - signalGracefulShutdown(context, gracefulShutdownCommand.address); + stopServers(context, stopOpts.args, stopOpts.force); } else if (cl.getParsedCommand().equals("dumpConfig")) { printConfig(context, dumpConfigCommand); } else if (cl.getParsedCommand().equals("volumes")) { @@@ -668,11 -552,57 +657,62 @@@ client -> client.shutdown(TraceUtil.traceInfo(), context.rpcCreds(), tabletServersToo)); } - // Visible for tests - public static void signalGracefulShutdown(final ClientContext context, String address) { + private static void stopServers(final ServerContext context, List<String> servers, + final boolean force) + throws AccumuloException, AccumuloSecurityException, InterruptedException, KeeperException { + List<String> hostOnly = new ArrayList<>(); - Set<HostAndPort> hostAndPort = new TreeSet<>(); ++ Set<String> hostAndPort = new TreeSet<>(); + + for (var server : servers) { + if (server.contains(":")) { - hostAndPort.add(HostAndPort.fromString(server)); ++ hostAndPort.add(server); + } else { + hostOnly.add(server); + } + } + + if (!hostOnly.isEmpty()) { + // The old impl of this command with the old behavior + stopTabletServer(context, hostOnly, force); + } - Objects.requireNonNull(address, "address not set"); - final HostAndPort hp = HostAndPort.fromString(address); + if (!hostAndPort.isEmpty()) { + // New behavior for this command when ports are present, supports more than just tservers. Is + // also async. + if (force) { - ZooZap.Opts opts = new ZooZap.Opts(); - var zk = context.getZooReaderWriter(); - var iid = context.getInstanceID(); - - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; - ZooZap.removeLocks(zk, tserversPath, hostAndPort::contains, opts); - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; - ZooZap.removeGroupedLocks(zk, compactorsBasepath, rg -> true, hostAndPort::contains, opts); - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; - ZooZap.removeGroupedLocks(zk, sserversPath, rg -> true, hostAndPort::contains, opts); - - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - ZooZap.removeSingletonLock(zk, managerLockPath, hostAndPort::contains, opts); - String gcLockPath = Constants.ZROOT + "/" + iid + Constants.ZGC_LOCK; - ZooZap.removeSingletonLock(zk, gcLockPath, hostAndPort::contains, opts); - String monitorLockPath = Constants.ZROOT + "/" + iid + Constants.ZMONITOR_LOCK; - ZooZap.removeSingletonLock(zk, monitorLockPath, hostAndPort::contains, opts); ++ var zoo = context.getZooSession().asReaderWriter(); ++ ++ AddressSelector addresses = AddressSelector.matching(hostAndPort::contains); ++ List<ServiceLockPath> pathsToRemove = new ArrayList<>(); ++ pathsToRemove.addAll(context.getServerPaths().getCompactor(rg -> true, addresses, false)); ++ pathsToRemove.addAll(context.getServerPaths().getScanServer(rg -> true, addresses, false)); ++ pathsToRemove ++ .addAll(context.getServerPaths().getTabletServer(rg -> true, addresses, false)); ++ ZooZap.filterSingleton(context, context.getServerPaths().getManager(false), addresses) ++ .ifPresent(pathsToRemove::add); ++ ZooZap.filterSingleton(context, context.getServerPaths().getGarbageCollector(false), ++ addresses).ifPresent(pathsToRemove::add); ++ ZooZap.filterSingleton(context, context.getServerPaths().getMonitor(false), addresses) ++ .ifPresent(pathsToRemove::add); ++ ++ for (var path : pathsToRemove) { ++ List<String> children = zoo.getChildren(path.toString()); ++ for (String child : children) { ++ log.trace("removing lock {}", path + "/" + child); ++ zoo.recursiveDelete(path + "/" + child, ZooUtil.NodeMissingPolicy.SKIP); ++ } ++ } + } else { + for (var server : hostAndPort) { - signalGracefulShutdown(context, server); ++ signalGracefulShutdown(context, HostAndPort.fromString(server)); + } + } + } + } + + // Visible for tests + public static void signalGracefulShutdown(final ClientContext context, HostAndPort hp) { + Objects.requireNonNull(hp, "address not set"); ServerProcessService.Client client = null; try { client = ThriftClientTypes.SERVER_PROCESS.getServerProcessConnection(context, log, diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index edeb3adfb4,3d548c37cf..8b486468b8 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@@ -22,23 -24,27 +22,25 @@@ import java.io.IOException import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.Arrays; +import java.util.HashSet; import java.util.List; -import java.util.function.Predicate; ++import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; +import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.singletons.SingletonManager; -import org.apache.accumulo.core.singletons.SingletonManager.Mode; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.volume.VolumeConfiguration; -import org.apache.accumulo.server.fs.VolumeManager; ++import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.start.spi.KeywordExecutable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -145,30 -157,38 +147,33 @@@ public class ZooZap implements KeywordE return; } - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - + var zrw = context.getZooSession().asReaderWriter(); + if (opts.zapManager) { - ServiceLockPath managerLockPath = context.getServerPaths().createManagerPath(); try { - removeSingletonLock(zrw, managerLockPath, addressSelector, opts); - removeSingletonLock(zoo, managerLockPath, hostPortPredicate, opts); -- } catch (KeeperException | InterruptedException e) { ++ ServiceLockPath managerLockPath = context.getServerPaths().createManagerPath(); ++ filterSingleton(context, managerLockPath, addressSelector) ++ .ifPresent(slp -> removeSingletonLock(zrw, slp, opts)); ++ } catch (RuntimeException e) { log.error("Error deleting manager lock", e); } } if (opts.zapGc) { - ServiceLockPath gcLockPath = context.getServerPaths().createGarbageCollectorPath(); - String gcLockPath = Constants.ZROOT + "/" + iid + Constants.ZGC_LOCK; try { - removeSingletonLock(zrw, gcLockPath, addressSelector, opts); - removeSingletonLock(zoo, gcLockPath, hostPortPredicate, opts); -- } catch (KeeperException | InterruptedException e) { -- log.error("Error deleting manager lock", e); ++ ServiceLockPath gcLockPath = context.getServerPaths().createGarbageCollectorPath(); ++ filterSingleton(context, gcLockPath, addressSelector) ++ .ifPresent(slp -> removeSingletonLock(zrw, slp, opts)); ++ } catch (RuntimeException e) { ++ log.error("Error deleting gc lock", e); } } if (opts.zapMonitor) { - ServiceLockPath monitorLockPath = context.getServerPaths().createMonitorPath(); - String monitorLockPath = Constants.ZROOT + "/" + iid + Constants.ZMONITOR_LOCK; try { - removeSingletonLock(zrw, monitorLockPath, addressSelector, opts); - removeSingletonLock(zoo, monitorLockPath, hostPortPredicate, opts); -- } catch (KeeperException | InterruptedException e) { ++ ServiceLockPath monitorPath = context.getServerPaths().createMonitorPath(); ++ filterSingleton(context, monitorPath, addressSelector) ++ .ifPresent(slp -> removeSingletonLock(zrw, slp, opts)); ++ } catch (RuntimeException e) { log.error("Error deleting monitor lock", e); } } @@@ -235,11 -262,45 +240,26 @@@ } } - private static void removeSingletonLock(ZooReaderWriter zoo, ServiceLockPath path, - AddressSelector addressSelector, Opts ops) throws KeeperException, InterruptedException { - if (addressSelector.getPredicate().test(path.getServer())) { - static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<String> groupPredicate, - Predicate<HostAndPort> hostPortPredicate, Opts opts) - throws KeeperException, InterruptedException { - if (zoo.exists(path)) { - List<String> groups = zoo.getChildren(path); - for (String group : groups) { - if (groupPredicate.test(group)) { - removeLocks(zoo, path + "/" + group, hostPortPredicate, opts); - } - } ++ private static void removeSingletonLock(ZooReaderWriter zoo, ServiceLockPath path, Opts ops) { ++ try { + zapDirectory(zoo, path, ops); ++ } catch (KeeperException | InterruptedException e) { ++ throw new IllegalStateException(e); } } - static void removeLocks(ZooReaderWriter zoo, String path, - Predicate<HostAndPort> hostPortPredicate, Opts opts) - throws KeeperException, InterruptedException { - if (zoo.exists(path)) { - List<String> children = zoo.getChildren(path); - for (String child : children) { - if (hostPortPredicate.test(HostAndPort.fromString(child))) { - message("Deleting " + path + "/" + child + " from zookeeper", opts); - if (!opts.dryRun) { - // TODO not sure this is the correct way to delete this lock.. the code was deleting - // locks in multiple different ways for diff servers types. - zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP); - } ++ static Optional<ServiceLockPath> filterSingleton(ServerContext context, ServiceLockPath path, ++ AddressSelector addressSelector) { ++ Optional<ServiceLockData> sld = context.getZooCache().getLockData(path); ++ return sld.filter(lockData -> { ++ for (var service : ServiceLockData.ThriftService.values()) { ++ var address = lockData.getAddress(service); ++ if (address != null) { ++ return addressSelector.getPredicate().test(address.toString()); + } + } - } - } + - static void removeSingletonLock(ZooReaderWriter zoo, String path, - Predicate<HostAndPort> hostPortPredicate, Opts ops) - throws KeeperException, InterruptedException { - var lockData = ServiceLock.getLockData(zoo.getZooKeeper(), ServiceLock.path(path)); - if (lockData != null - && hostPortPredicate.test(HostAndPort.fromString(new String(lockData, UTF_8)))) { - zapDirectory(zoo, path, ops); - } ++ return false; ++ }).map(lockData -> path); + } - } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 2be39fdfcf,b0f173974b..992d46a282 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@@ -51,8 -52,11 +51,9 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.NamespaceId; -import org.apache.accumulo.core.data.Range; ++import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; @@@ -345,9 -349,41 +346,19 @@@ public class ManagerClientServiceHandle SecurityErrorCode.PERMISSION_DENIED); } log.info("Tablet Server {} has reported it's shutting down", tabletServer); - manager.tserverSet.tabletServerShuttingDown(tabletServer); + var tserver = new TServerInstance(tabletServer); + if (manager.shutdownTServer(tserver)) { + // If there is an exception seeding the fate tx this should cause the RPC to fail which should + // cause the tserver to halt. Because of that not making an attempt to handle failure here. - Fate<Manager> fate = manager.fate(); - long tid = fate.startTransaction(); ++ Fate<Manager> fate = manager.fate(FateInstanceType.META); ++ var tid = fate.startTransaction(); + String msg = "Shutdown tserver " + tabletServer; - fate.seedTransaction("ShutdownTServer", tid, - new TraceRepo<>(new ShutdownTServer(tserver, false)), true, msg); ++ // TODO resource group ++ fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, tid, ++ new TraceRepo<>(new ShutdownTServer(tserver, ResourceGroupId.DEFAULT, false)), true, msg); + } } - @Override - public void reportSplitExtent(TInfo info, TCredentials credentials, String serverName, - TabletSplit split) throws ThriftSecurityException { - if (!manager.security.canPerformSystemActions(credentials)) { - throw new ThriftSecurityException(credentials.getPrincipal(), - SecurityErrorCode.PERMISSION_DENIED); - } - - KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet); - if (manager.migrations.removeExtent(oldTablet) != null) { - Manager.log.info("Canceled migration of {}", split.oldTablet); - } - for (TServerInstance instance : manager.tserverSet.getCurrentServers()) { - if (serverName.equals(instance.getHostPort())) { - manager.nextEvent.event("%s reported split %s, %s", serverName, - KeyExtent.fromThrift(split.newTablets.get(0)), - KeyExtent.fromThrift(split.newTablets.get(1))); - return; - } - } - Manager.log.warn("Got a split from a server we don't recognize: {}", serverName); - } - @Override public void reportTabletStatus(TInfo info, TCredentials credentials, String serverName, TabletLoadState status, TKeyExtent ttablet) throws ThriftSecurityException { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 95816c5369,2573d6b451..ed81fe48b2 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -51,10 -54,9 +51,8 @@@ import java.util.TreeSet import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; - import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledFuture; --import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@@ -76,36 -73,27 +74,34 @@@ import org.apache.accumulo.core.conf.Si import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; -import org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.ServiceLockWatcher; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockSupport; +import org.apache.accumulo.core.lock.ServiceLockSupport.ServiceLockWatcher; +import org.apache.accumulo.core.manager.thrift.Compacting; import org.apache.accumulo.core.manager.thrift.ManagerClientService; -import org.apache.accumulo.core.master.thrift.BulkImportState; -import org.apache.accumulo.core.master.thrift.Compacting; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.manager.thrift.TableInfo; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.Ample; - import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; -import org.apache.accumulo.core.process.thrift.ServerProcessService; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader; +import org.apache.accumulo.core.spi.ondemand.OnDemandTabletUnloader.UnloaderParams; - import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; +import org.apache.accumulo.core.tabletserver.UnloaderParamsImpl; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; @@@ -114,13 -103,15 +110,11 @@@ import org.apache.accumulo.core.util.Ma import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.RetryFactory; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; - import org.apache.accumulo.core.util.threads.ThreadPoolNames; -import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; - import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.AbstractServer; -import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionWatcher; @@@ -675,81 -950,42 +669,37 @@@ public class TabletServer extends Abstr } } - // Tell the Manager we are shutting down so that it doesn't try - // to assign tablets. ManagerClientService.Client iface = managerConnection(getManagerAddress()); try { - iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), - advertiseAddressString); - } catch (TException e) { - Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!", e); - } finally { - returnManagerConnection(iface); - } + // Ask the manager to unload our tablets and stop loading new tablets + if (iface == null) { + Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!"); + } else { + iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(), + getTabletSession().getHostPortSession()); + } - // Best-effort attempt at unloading tablets. - log.debug("Unloading tablets"); - final List<Future<?>> futures = new ArrayList<>(); - final ThreadPoolExecutor tpe = getContext().threadPools() - .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8) - .numMaxThreads(16).build(); + boolean managerDown = false; + while (!getOnlineTablets().isEmpty()) { + log.info("Shutdown requested, waiting for manager to unload {} tablets", + getOnlineTablets().size()); - iface = managerConnection(getManagerAddress()); - boolean managerDown = false; - managerDown = sendManagerMessages(managerDown, iface); ++ managerDown = sendManagerMessages(managerDown, iface, advertiseAddressString); - try { - for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { - getOnlineTablets().keySet().forEach(ke -> { - if (DataLevel.of(ke.tableId()) == level) { - futures.add(tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, - SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS)))); - } - }); - while (!futures.isEmpty()) { - Iterator<Future<?>> unloads = futures.iterator(); - while (unloads.hasNext()) { - Future<?> f = unloads.next(); - if (f.isDone()) { - if (!managerDown) { - ManagerMessage mm = managerMessages.poll(); - try { - mm.send(getContext().rpcCreds(), advertiseAddressString, iface); - } catch (TException e) { - managerDown = true; - log.debug("Error sending message to Manager during tablet unloading, msg: {}", - e.getMessage()); - } - } - unloads.remove(); - } - } - log.debug("Waiting on {} {} tablets to close.", futures.size(), level); - UtilWaitThread.sleep(1000); - } - log.debug("All {} tablets unloaded", level); + UtilWaitThread.sleep(1000); } + - sendManagerMessages(managerDown, iface); ++ sendManagerMessages(managerDown, iface, advertiseAddressString); + + } catch (TException | RuntimeException e) { + Halt.halt(-1, "Error informing Manager that we are shutting down, exiting!", e); } finally { - if (!managerDown) { - try { - ManagerMessage mm = managerMessages.poll(); - do { - if (mm != null) { - mm.send(getContext().rpcCreds(), advertiseAddressString, iface); - } - mm = managerMessages.poll(); - } while (mm != null); - } catch (TException e) { - log.debug("Error sending message to Manager during tablet unloading, msg: {}", - e.getMessage()); - } - } returnManagerConnection(iface); - tpe.shutdown(); } - log.debug("Stopping Replication Server"); - if (this.replServer != null) { - this.replServer.stop(); - } - log.debug("Stopping Thrift Servers"); - if (server != null) { - server.stop(); + if (getThriftServer() != null) { + getThriftServer().stop(); } try { @@@ -770,8 -1006,55 +720,24 @@@ } } - private boolean sendManagerMessages(boolean managerDown, ManagerClientService.Client iface) { ++ private boolean sendManagerMessages(boolean managerDown, ManagerClientService.Client iface, ++ String advertiseAddressString) { + ManagerMessage mm = managerMessages.poll(); + while (mm != null && !managerDown) { + try { - mm.send(getContext().rpcCreds(), getClientAddressString(), iface); ++ mm.send(getContext().rpcCreds(), advertiseAddressString, iface); + mm = managerMessages.poll(); + } catch (TException e) { + managerDown = true; - LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", ++ log.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + return managerDown; + } + - @SuppressWarnings("deprecation") - private void setupReplication(AccumuloConfiguration aconf) { - // Start the thrift service listening for incoming replication requests - try { - startReplicationService(); - } catch (UnknownHostException e) { - throw new RuntimeException("Failed to start replication service", e); - } - - // Start the pool to handle outgoing replications - final ThreadPoolExecutor replicationThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS); - replWorker.setExecutor(replicationThreadPool); - replWorker.run(); - - // Check the configuration value for the size of the pool and, if changed, resize the pool - Runnable replicationWorkThreadPoolResizer = () -> { - ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS); - }; - ScheduledFuture<?> future = context.getScheduledExecutor() - .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS); - watchNonCriticalScheduledTask(future); - } - - public String getClientAddressString() { - if (clientAddress == null) { - return null; - } - return clientAddress.getHost() + ":" + clientAddress.getPort(); - } - public TServerInstance getTabletSession() { - String address = getClientAddressString(); - if (address == null) { + if (getAdvertiseAddress() == null) { return null; } if (lockSessionId == -1) { diff --cc test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java index 2c2706ecd4,cbdb01b2f2..ada20ee904 --- a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java @@@ -177,12 -180,10 +177,12 @@@ public class GracefulShutdownIT extend }); // Restart Tablet Server - final List<String> tservers = client.instanceOperations().getTabletServers(); + final Set<ServiceLockPath> tservers = getCluster().getServerContext().getServerPaths() + .getTabletServer((rg) -> rg.equals(ResourceGroupId.DEFAULT), AddressSelector.all(), true); assertEquals(2, tservers.size()); - final HostAndPort tserverAddress = HostAndPort.fromString(tservers.get(0)); + final HostAndPort tserverAddress = + HostAndPort.fromString(tservers.iterator().next().getServer()); - Admin.signalGracefulShutdown(ctx, tserverAddress.toString()); + Admin.signalGracefulShutdown(ctx, tserverAddress); Wait.waitFor(() -> { control.refreshProcesses(ServerType.TABLET_SERVER); return control.getProcesses(ServerType.TABLET_SERVER).size() == 1; @@@ -193,16 -194,10 +193,16 @@@ client.instanceOperations().waitForBalance(); // Restart Manager - final List<String> managers = client.instanceOperations().getManagerLocations(); - assertEquals(1, managers.size()); - final HostAndPort managerAddress = HostAndPort.fromString(managers.get(0)); + final ServiceLockPath manager = + getCluster().getServerContext().getServerPaths().getManager(true); + assertNotNull(manager); + Set<ServerId> managerLocations = + client.instanceOperations().getServers(ServerId.Type.MANAGER); + assertNotNull(managerLocations); + assertEquals(1, managerLocations.size()); + final HostAndPort managerAddress = + HostAndPort.fromString(managerLocations.iterator().next().toHostPortString()); - Admin.signalGracefulShutdown(ctx, managerAddress.toString()); + Admin.signalGracefulShutdown(ctx, managerAddress); Wait.waitFor(() -> { control.refreshProcesses(ServerType.MANAGER); return control.getProcesses(ServerType.MANAGER).isEmpty(); @@@ -235,18 -223,11 +235,18 @@@ final long numFiles2 = getNumFilesForTable(ctx, tid); assertEquals(numFiles2, numFiles); - assertEquals(0, ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize()); + Set<ServerId> newManagerLocations = + client.instanceOperations().getServers(ServerId.Type.MANAGER); + assertNotNull(newManagerLocations); + assertEquals(1, newManagerLocations.size()); + final HostAndPort newManagerAddress = + HostAndPort.fromString(newManagerLocations.iterator().next().toHostPortString()); + assertEquals(0, ExternalCompactionTestUtils + .getRunningCompactions(ctx, Optional.of(newManagerAddress)).getCompactionsSize()); client.tableOperations().compact(tableName, cc); - Wait.waitFor( - () -> ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize() > 0); + Wait.waitFor(() -> ExternalCompactionTestUtils + .getRunningCompactions(ctx, Optional.of(newManagerAddress)).getCompactionsSize() > 0); - Admin.signalGracefulShutdown(ctx, compactorAddress.toString()); + Admin.signalGracefulShutdown(ctx, compactorAddress); Wait.waitFor(() -> { control.refreshProcesses(ServerType.COMPACTOR); return control.getProcesses(ServerType.COMPACTOR).isEmpty();
