This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c133b44133 Changes to public API to expose resource groups (#4851)
c133b44133 is described below
commit c133b44133a59ee2ee62d0040f9a8f9e6a19e14e
Author: Dave Marion <[email protected]>
AuthorDate: Tue Oct 8 06:46:37 2024 -0400
Changes to public API to expose resource groups (#4851)
Added methods to InstanceOperations that return server information (host,
port, resource group, and type) using a new ServerId type. Deprecated older
methods on InstanceOperations that just returned the address of the servers
as Strings. Modified existing code to use the new methods and added a new
test class, InstanceOperationsIT. This work leverages the new
ServiceLockPaths
class that was added in #4861 to get information about the servers from
their
respective paths in ZooKeeper.
Closes #4849
---
.../core/client/admin/ActiveCompaction.java | 2 +
.../core/client/admin/InstanceOperations.java | 69 ++++++
.../core/client/admin/servers/ServerId.java | 122 +++++++++++
.../core/clientImpl/ActiveCompactionImpl.java | 15 +-
.../accumulo/core/clientImpl/ClientContext.java | 61 +-----
.../core/clientImpl/InstanceOperationsImpl.java | 242 +++++++++++++++++----
.../accumulo/core/lock/ServiceLockPaths.java | 10 +
.../accumulo/core/rpc/clients/ManagerClient.java | 14 +-
.../org/apache/accumulo/core/summary/Gatherer.java | 8 +-
.../util/compaction/ExternalCompactionUtil.java | 23 +-
.../org/apache/accumulo/server/util/Admin.java | 9 +-
.../org/apache/accumulo/server/util/ECAdmin.java | 17 +-
.../java/org/apache/accumulo/server/util/Info.java | 11 +-
.../coordinator/CompactionCoordinator.java | 19 +-
.../compaction/CompactionCoordinatorTest.java | 5 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 62 +++---
.../rest/compactions/external/Compactors.java | 5 +-
.../rest/compactions/external/CoordinatorInfo.java | 10 +-
.../external/ExternalCompactionInfo.java | 11 +-
.../monitor/rest/manager/ManagerResource.java | 13 +-
.../monitor/rest/status/StatusResource.java | 7 +-
.../org/apache/accumulo/tserver/TabletServer.java | 7 +-
.../shell/commands/ActiveCompactionHelper.java | 43 ++--
.../shell/commands/ActiveScanIterator.java | 26 ++-
.../accumulo/shell/commands/ListScansCommand.java | 25 ++-
.../accumulo/shell/commands/PingCommand.java | 7 +-
.../apache/accumulo/test/ComprehensiveBaseIT.java | 4 +-
.../apache/accumulo/test/InstanceOperationsIT.java | 102 +++++++++
.../accumulo/test/InterruptibleScannersIT.java | 4 +-
.../java/org/apache/accumulo/test/LocatorIT.java | 5 +-
.../java/org/apache/accumulo/test/RecoveryIT.java | 7 +-
.../test/ScanServerGroupConfigurationIT.java | 19 +-
.../accumulo/test/ScanServerMaxLatencyIT.java | 4 +-
.../apache/accumulo/test/ScanServerShutdownIT.java | 5 +-
.../accumulo/test/TabletServerGivesUpIT.java | 10 +-
.../accumulo/test/TabletServerHdfsRestartIT.java | 3 +-
.../org/apache/accumulo/test/TotalQueuedIT.java | 8 +-
.../apache/accumulo/test/TransportCachingIT.java | 19 +-
.../org/apache/accumulo/test/ZombieScanIT.java | 22 +-
.../CompactionPriorityQueueMetricsIT.java | 5 +-
.../accumulo/test/fate/FateOpsCommandsIT.java | 4 +-
.../BalanceInPresenceOfOfflineTableIT.java | 8 +-
.../accumulo/test/functional/CompactionIT.java | 21 +-
.../test/functional/DebugClientConnectionIT.java | 13 +-
.../test/functional/HalfDeadTServerIT.java | 5 +-
.../test/functional/ManagerAssignmentIT.java | 21 +-
.../test/functional/MemoryStarvedMajCIT.java | 16 +-
.../apache/accumulo/test/functional/ScanIdIT.java | 5 +-
.../apache/accumulo/test/functional/ScannerIT.java | 14 +-
.../test/functional/SessionBlockVerifyIT.java | 4 +-
.../accumulo/test/functional/ShutdownIT.java | 14 +-
.../test/functional/SimpleBalancerFairnessIT.java | 4 +-
.../accumulo/test/functional/TabletMetadataIT.java | 7 +-
.../accumulo/test/manager/SuspendedTabletsIT.java | 4 +-
.../apache/accumulo/test/shell/ShellServerIT.java | 12 +-
55 files changed, 871 insertions(+), 341 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
index 2ca9f69fb5..8aa35ae498 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
@@ -145,6 +145,8 @@ public abstract class ActiveCompaction {
String getAddress();
int getPort();
+
+ String getResourceGroup();
}
/**
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 9906dd4d36..935fcb0283 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -22,9 +22,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.InstanceId;
public interface InstanceOperations {
@@ -181,7 +183,9 @@ public interface InstanceOperations {
*
* @return a list of locations in <code>hostname:port</code> form.
* @since 2.1.0
+ * @deprecated see {@link #getServers(ServerId.Type)}
*/
+ @Deprecated(since = "4.0.0")
List<String> getManagerLocations();
/**
@@ -189,34 +193,83 @@ public interface InstanceOperations {
*
* @return A set of currently active compactors.
* @since 2.1.4
+ * @deprecated see {@link #getServers(ServerId.Type)}
*/
+ @Deprecated(since = "4.0.0")
Set<String> getCompactors();
/**
* Returns the locations of the active scan servers
*
* @return A set of currently active scan servers.
+ * @deprecated see {@link #getServers(ServerId.Type)}
* @since 2.1.0
*/
+ @Deprecated(since = "4.0.0")
Set<String> getScanServers();
/**
* List the currently active tablet servers participating in the accumulo
instance
*
* @return A list of currently active tablet servers.
+ * @deprecated see {@link #getServers(ServerId.Type)}
*/
+ @Deprecated(since = "4.0.0")
List<String> getTabletServers();
+ /**
+ * Resolve the server of the given type and address to a ServerId
+ *
+ * @param type type of server
+ * @param resourceGroup group of server, can be null
+ * @param host host name, cannot be null
+ * @param port host port
+ * @return ServerId if found, else null
+ * @since 4.0.0
+ */
+ ServerId getServer(ServerId.Type type, String resourceGroup, String host,
int port);
+
+ /**
+ * Returns all servers of the given types. For the Manager, the result will
contain only one
+ * element for the current active Manager.
+ *
+ * @return set of servers of the supplied type
+ * @since 4.0.0
+ */
+ Set<ServerId> getServers(ServerId.Type type);
+
+ /**
+ * Returns the servers of a given type that match the given criteria
+ *
+ * @return set of servers of the supplied type matching the supplied test
+ * @since 4.0.0
+ */
+ Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test);
+
/**
* List the active scans on a tablet server.
*
* @param tserver The tablet server address. This should be of the form
* {@code <ip address>:<port>}
* @return A list of active scans on tablet server.
+ * @deprecated see {@link #getActiveScans(ServerId)}
*/
+ @Deprecated(since = "4.0.0")
List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException;
+ /**
+ * List the active scans on a server.
+ *
+ * @param server server type and address
+ * @return A stream of active scans on server.
+ * @throws IllegalArgumentException when the type of the server is not
TABLET_SERVER or
+ * SCAN_SERVER
+ * @since 4.0.0
+ */
+ List<ActiveScan> getActiveScans(ServerId server)
+ throws AccumuloException, AccumuloSecurityException;
+
/**
* List the active compaction running on a TabletServer or Compactor. The
server address can be
* retrieved using {@link #getCompactors()} or {@link #getTabletServers()}.
Use
@@ -226,10 +279,26 @@ public interface InstanceOperations {
* @param tserver The server address. This should be of the form {@code <ip
address>:<port>}
* @return the list of active compactions
* @since 1.5.0
+ * @deprecated see {@link #getActiveCompactions(ServerId server)}
*/
+ @Deprecated(since = "4.0.0")
List<ActiveCompaction> getActiveCompactions(String tserver)
throws AccumuloException, AccumuloSecurityException;
+ /**
+ * List the active compaction running on a TabletServer or Compactor. The
server address can be
+ * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}.
Use
+ * {@link #getActiveCompactions()} to get a list of all compactions running
on tservers and
+ * compactors.
+ *
+ * @param server The ServerId object
+ * @return the list of active compactions
+ * @throws IllegalArgumentException when the type of the server is not
TABLET_SERVER or COMPACTOR
+ * @since 4.0.0
+ */
+ List<ActiveCompaction> getActiveCompactions(ServerId server)
+ throws AccumuloException, AccumuloSecurityException;
+
/**
* List all internal and external compactions running in Accumulo.
*
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java
new file mode 100644
index 0000000000..b1c7a44d2b
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.admin.servers;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.PropertyType.PortRange;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Object representing the type, resource group, and address of a server
process.
+ *
+ * @since 4.0.0
+ */
+public final class ServerId implements Comparable<ServerId> {
+
+ /**
+ * Server process type names that a client can be expected to interact with.
Clients are not
+ * expected to interact directly with the GarbageCollector or Monitor
processes.
+ *
+ * @since 4.0.0
+ */
+ public enum Type {
+ MANAGER, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
+ }
+
+ private final Type type;
+ private final String resourceGroup;
+ private final String host;
+ private final int port;
+
+ public ServerId(Type type, String resourceGroup, String host, int port) {
+ super();
+ Preconditions.checkArgument(port == 0 ||
PortRange.VALID_RANGE.contains(port),
+ "invalid server port value: " + port);
+ this.type = Objects.requireNonNull(type);
+ this.resourceGroup = Objects.requireNonNull(resourceGroup);
+ this.host = Objects.requireNonNull(host);
+ this.port = port;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public String getResourceGroup() {
+ return this.resourceGroup;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int compareTo(ServerId other) {
+ if (this == other) {
+ return 0;
+ }
+ int result = this.getType().compareTo(other.getType());
+ if (result == 0) {
+ result = this.getResourceGroup().compareTo(other.getResourceGroup());
+ if (result == 0) {
+ result = this.getHost().compareTo(other.getHost());
+ if (result == 0) {
+ result = Integer.compare(this.getPort(), other.getPort());
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port, type, resourceGroup);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ServerId other = (ServerId) obj;
+ return 0 == compareTo(other);
+ }
+
+ @Override
+ public String toString() {
+ return "Server [type= " + type + ", resource group= " + resourceGroup + ",
host= " + host
+ + ", port= " + port + "]";
+ }
+
+ public String toHostPortString() {
+ return host + ":" + port;
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
index d744789fe8..c6c89e6f47 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveCompaction;
import
org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost.Type;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,14 +43,15 @@ public class ActiveCompactionImpl extends ActiveCompaction {
private final ClientContext context;
private final HostAndPort hostport;
private final Type type;
+ private final String resourceGroup;
ActiveCompactionImpl(ClientContext context,
- org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac,
HostAndPort hostport,
- CompactionHost.Type type) {
+ org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac,
ServerId server) {
this.tac = tac;
this.context = context;
- this.hostport = hostport;
- this.type = type;
+ this.hostport = HostAndPort.fromParts(server.getHost(), server.getPort());
+ this.type = server.getType() == ServerId.Type.COMPACTOR ? Type.COMPACTOR :
Type.TSERVER;
+ this.resourceGroup = server.getResourceGroup();
}
@Override
@@ -140,6 +142,11 @@ public class ActiveCompactionImpl extends ActiveCompaction
{
public int getPort() {
return hostport.getPort();
}
+
+ @Override
+ public String getResourceGroup() {
+ return resourceGroup;
+ }
};
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 1369094328..0017809259 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -100,7 +100,6 @@ import
org.apache.accumulo.core.spi.common.ServiceEnvironment;
import org.apache.accumulo.core.spi.scan.ScanServerInfo;
import org.apache.accumulo.core.spi.scan.ScanServerSelector;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -197,16 +196,16 @@ public class ClientContext implements AccumuloClient {
@Override
public Supplier<Collection<ScanServerInfo>> getScanServers() {
- return () -> ClientContext.this.getScanServers().entrySet().stream()
+ return () -> getServerPaths().getScanServer(rg -> true, addr ->
true, true).stream()
.map(entry -> new ScanServerInfo() {
@Override
public String getAddress() {
- return entry.getKey();
+ return entry.getServer();
}
@Override
public String getGroup() {
- return entry.getValue().getSecond();
+ return entry.getResourceGroup();
}
}).collect(Collectors.toSet());
}
@@ -400,6 +399,15 @@ public class ClientContext implements AccumuloClient {
return batchWriterConfig;
}
+ /**
+ * @return the scan server selector implementation used for determining
which scan servers will be
+ * used when performing an eventually consistent scan
+ */
+ public ScanServerSelector getScanServerSelector() {
+ ensureOpen();
+ return scanServerSelectorSupplier.get();
+ }
+
/**
* @return map of live scan server addresses to lock uuids.
*/
@@ -425,15 +433,6 @@ public class ClientContext implements AccumuloClient {
return liveScanServers;
}
- /**
- * @return the scan server selector implementation used for determining
which scan servers will be
- * used when performing an eventually consistent scan
- */
- public ScanServerSelector getScanServerSelector() {
- ensureOpen();
- return scanServerSelectorSupplier.get();
- }
-
static ConditionalWriterConfig getConditionalWriterConfig(Properties props) {
ConditionalWriterConfig conditionalWriterConfig = new
ConditionalWriterConfig();
@@ -476,42 +475,6 @@ public class ClientContext implements AccumuloClient {
return rpcCreds;
}
- /**
- * Returns the location(s) of the accumulo manager and any redundant servers.
- *
- * @return a list of locations in "hostname:port" form
- */
- public List<String> getManagerLocations() {
- ensureOpen();
- var zLockManagerPath = getServerPaths().getManager(true);
-
- Timer timer = null;
-
- if (log.isTraceEnabled()) {
- log.trace("tid={} Looking up manager location in zookeeper at {}.",
- Thread.currentThread().getId(), zLockManagerPath);
- timer = Timer.startNew();
- }
-
- Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath);
- String location = null;
- if (sld.isPresent()) {
- location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
- }
-
- if (timer != null) {
- log.trace("tid={} Found manager at {} in {}",
Thread.currentThread().getId(),
- (location == null ? "null" : location),
- String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0));
- }
-
- if (location == null) {
- return Collections.emptyList();
- }
-
- return Collections.singletonList(location);
- }
-
/**
* Returns a unique string that identifies this instance of accumulo.
*
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 8ea74e90bb..fa9dd6ba44 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.core.clientImpl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toList;
import static org.apache.accumulo.core.rpc.ThriftUtil.createClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport;
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
@@ -29,31 +28,38 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveCompaction;
-import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
+import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
@@ -68,6 +74,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
/**
@@ -214,38 +221,46 @@ public class InstanceOperationsImpl implements
InstanceOperations {
}
@Override
+ @Deprecated(since = "4.0.0")
public List<String> getManagerLocations() {
- return context.getManagerLocations();
+
+ Set<ServerId> managers = getServers(ServerId.Type.MANAGER);
+ if (managers == null || managers.isEmpty()) {
+ return List.of();
+ } else {
+ return List.of(managers.iterator().next().toHostPortString());
+ }
}
@Override
+ @Deprecated(since = "4.0.0")
public Set<String> getCompactors() {
- Set<String> compactors = new HashSet<>();
- ExternalCompactionUtil.getCompactorAddrs(context).values().forEach(addrs
-> {
- addrs.forEach(hp -> compactors.add(hp.toString()));
- });
- return compactors;
+ Set<String> results = new HashSet<>();
+ context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
+ .forEach(t -> results.add(t.getServer()));
+ return results;
}
@Override
+ @Deprecated(since = "4.0.0")
public Set<String> getScanServers() {
- return Set.copyOf(context.getScanServers().keySet());
+ Set<String> results = new HashSet<>();
+ context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
+ .forEach(t -> results.add(t.getServer()));
+ return results;
}
@Override
+ @Deprecated(since = "4.0.0")
public List<String> getTabletServers() {
- Set<ServiceLockPath> paths =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
List<String> results = new ArrayList<>();
- paths.forEach(p -> {
- if (!p.getServer().equals("manager")) {
- results.add(p.getServer());
- }
- });
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)
+ .forEach(t -> results.add(t.getServer()));
return results;
}
@Override
+ @Deprecated(since = "4.0.0")
public List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException {
final var parsedTserver = HostAndPort.fromString(tserver);
@@ -273,6 +288,42 @@ public class InstanceOperationsImpl implements
InstanceOperations {
}
}
+ @Override
+ public List<ActiveScan> getActiveScans(ServerId server)
+ throws AccumuloException, AccumuloSecurityException {
+
+ Objects.requireNonNull(server);
+ Preconditions.checkArgument(
+ server.getType() == ServerId.Type.SCAN_SERVER
+ || server.getType() == ServerId.Type.TABLET_SERVER,
+ "Server type %s is not %s or %s.", server.getType(),
ServerId.Type.SCAN_SERVER,
+ ServerId.Type.TABLET_SERVER);
+
+ final var parsedTserver = HostAndPort.fromParts(server.getHost(),
server.getPort());
+ TabletScanClientService.Client rpcClient = null;
+ try {
+ rpcClient = getClient(ThriftClientTypes.TABLET_SCAN, parsedTserver,
context);
+
+ List<ActiveScan> as = new ArrayList<>();
+ for (var activeScan : rpcClient.getActiveScans(TraceUtil.traceInfo(),
context.rpcCreds())) {
+ try {
+ as.add(new ActiveScanImpl(context, activeScan));
+ } catch (TableNotFoundException e) {
+ throw new AccumuloException(e);
+ }
+ }
+ return as;
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (TException e) {
+ throw new AccumuloException(e);
+ } finally {
+ if (rpcClient != null) {
+ returnClient(rpcClient, context);
+ }
+ }
+ }
+
@Override
public boolean testClassLoad(final String className, final String asTypeName)
throws AccumuloException, AccumuloSecurityException {
@@ -281,19 +332,42 @@ public class InstanceOperationsImpl implements
InstanceOperations {
}
@Override
+ @Deprecated
public List<ActiveCompaction> getActiveCompactions(String server)
throws AccumuloException, AccumuloSecurityException {
- final var serverHostAndPort = HostAndPort.fromString(server);
+ HostAndPort hp = HostAndPort.fromString(server);
+
+ ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(),
hp.getPort());
+ if (si == null) {
+ si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(),
hp.getPort());
+ }
+ if (si == null) {
+ return List.of();
+ }
+ return getActiveCompactions(si);
+ }
+
+ @Override
+ public List<ActiveCompaction> getActiveCompactions(ServerId server)
+ throws AccumuloException, AccumuloSecurityException {
+
+ Objects.requireNonNull(server);
+ Preconditions.checkArgument(
+ server.getType() == ServerId.Type.COMPACTOR
+ || server.getType() == ServerId.Type.TABLET_SERVER,
+ "Server type %s is not %s or %s.", server.getType(),
ServerId.Type.COMPACTOR,
+ ServerId.Type.TABLET_SERVER);
+
+ final HostAndPort serverHostAndPort =
HostAndPort.fromParts(server.getHost(), server.getPort());
final List<ActiveCompaction> as = new ArrayList<>();
try {
- if (context.getTServerLockChecker().doesTabletServerLockExist(server)) {
+ if (server.getType() == ServerId.Type.TABLET_SERVER) {
Client client = null;
try {
client = getClient(ThriftClientTypes.TABLET_SERVER,
serverHostAndPort, context);
for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(),
context.rpcCreds())) {
- as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort,
- CompactionHost.Type.TSERVER));
+ as.add(new ActiveCompactionImpl(context, tac, server));
}
} finally {
if (client != null) {
@@ -303,8 +377,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
} else {
// if not a TabletServer address, maybe it's a Compactor
for (var tac :
ExternalCompactionUtil.getActiveCompaction(serverHostAndPort, context)) {
- as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort,
- CompactionHost.Type.COMPACTOR));
+ as.add(new ActiveCompactionImpl(context, tac, server));
}
}
return as;
@@ -319,31 +392,20 @@ public class InstanceOperationsImpl implements
InstanceOperations {
public List<ActiveCompaction> getActiveCompactions()
throws AccumuloException, AccumuloSecurityException {
- Map<String,Set<HostAndPort>> compactors =
ExternalCompactionUtil.getCompactorAddrs(context);
- List<String> tservers = getTabletServers();
+ Set<ServerId> compactionServers = new HashSet<>();
+ compactionServers.addAll(getServers(ServerId.Type.COMPACTOR));
+ compactionServers.addAll(getServers(ServerId.Type.TABLET_SERVER));
- int numThreads = Math.max(4, Math.min((tservers.size() +
compactors.size()) / 10, 256));
+ int numThreads = Math.max(4, Math.min((compactionServers.size()) / 10,
256));
var executorService =
context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL)
.numCoreThreads(numThreads).build();
try {
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
- for (String tserver : tservers) {
- futures.add(executorService.submit(() ->
getActiveCompactions(tserver)));
+ for (ServerId server : compactionServers) {
+ futures.add(executorService.submit(() ->
getActiveCompactions(server)));
}
- compactors.values().forEach(compactorList -> {
- for (HostAndPort compactorAddr : compactorList) {
- Callable<List<ActiveCompaction>> task =
- () -> ExternalCompactionUtil.getActiveCompaction(compactorAddr,
context).stream()
- .map(tac -> new ActiveCompactionImpl(context, tac,
compactorAddr,
- CompactionHost.Type.COMPACTOR))
- .collect(toList());
-
- futures.add(executorService.submit(task));
- }
- });
-
List<ActiveCompaction> ret = new ArrayList<>();
for (Future<List<ActiveCompaction>> future : futures) {
try {
@@ -407,4 +469,104 @@ public class InstanceOperationsImpl implements
InstanceOperations {
return context.getInstanceID();
}
+ @Override
+ public ServerId getServer(ServerId.Type type, String resourceGroup, String
host, int port) {
+ Objects.requireNonNull(type, "type parameter cannot be null");
+ Objects.requireNonNull(host, "host parameter cannot be null");
+
+ final ResourceGroupPredicate rg =
+ resourceGroup == null ? rgt -> true : rgt -> rgt.equals(resourceGroup);
+ final AddressPredicate hp =
AddressPredicate.exact(HostAndPort.fromParts(host, port));
+
+ switch (type) {
+ case COMPACTOR:
+ Set<ServiceLockPath> compactors =
context.getServerPaths().getCompactor(rg, hp, true);
+ if (compactors.isEmpty()) {
+ return null;
+ } else if (compactors.size() == 1) {
+ return createServerId(type, compactors.iterator().next());
+ } else {
+ throw new IllegalStateException("Multiple servers matching provided
address");
+ }
+ case MANAGER:
+ Set<ServerId> managers = getServers(type, null);
+ if (managers.isEmpty()) {
+ return null;
+ } else {
+ return managers.iterator().next();
+ }
+ case SCAN_SERVER:
+ Set<ServiceLockPath> sservers =
context.getServerPaths().getScanServer(rg, hp, true);
+ if (sservers.isEmpty()) {
+ return null;
+ } else if (sservers.size() == 1) {
+ return createServerId(type, sservers.iterator().next());
+ } else {
+ throw new IllegalStateException("Multiple servers matching provided
address");
+ }
+ case TABLET_SERVER:
+ Set<ServiceLockPath> tservers =
context.getServerPaths().getScanServer(rg, hp, true);
+ if (tservers.isEmpty()) {
+ return null;
+ } else if (tservers.size() == 1) {
+ return createServerId(type, tservers.iterator().next());
+ } else {
+ throw new IllegalStateException("Multiple servers matching provided
address");
+ }
+ default:
+ throw new IllegalArgumentException("Unhandled server type: " + type);
+ }
+ }
+
+ @Override
+ public Set<ServerId> getServers(ServerId.Type type) {
+ return getServers(type, null);
+ }
+
+ @Override
+ public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId>
test) {
+ final Set<ServerId> results = new HashSet<>();
+ switch (type) {
+ case COMPACTOR:
+ context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
+ .forEach(c -> results.add(createServerId(type, c)));
+ break;
+ case MANAGER:
+ ServiceLockPath m = context.getServerPaths().getManager(true);
+ Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
+ String location = null;
+ if (sld.isPresent()) {
+ location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
+ HostAndPort hp = HostAndPort.fromString(location);
+ results.add(new ServerId(type,
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
+ hp.getPort()));
+ }
+ break;
+ case SCAN_SERVER:
+ context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
+ .forEach(s -> results.add(createServerId(type, s)));
+ break;
+ case TABLET_SERVER:
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true)
+ .forEach(t -> results.add(createServerId(type, t)));
+ break;
+ default:
+ break;
+ }
+ if (test == null) {
+ return Collections.unmodifiableSet(results);
+ }
+ return
results.stream().filter(test).collect(Collectors.toUnmodifiableSet());
+ }
+
+ private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
+ Objects.requireNonNull(type);
+ Objects.requireNonNull(slp);
+ String resourceGroup = Objects.requireNonNull(slp.getResourceGroup());
+ HostAndPort hp =
HostAndPort.fromString(Objects.requireNonNull(slp.getServer()));
+ String host = hp.getHost();
+ int port = hp.getPort();
+ return new ServerId(type, resourceGroup, host, port);
+ }
+
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index b60712e92b..13752a9ae1 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -288,6 +288,11 @@ public class ServiceLockPaths {
return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address,
withLock);
}
+ /**
+ * Note that the ServiceLockPath object returned by this method does not
populate the server
+ * attribute. To get the location of the GarbageCollector you will need to
parse the lock data at
+ * the ZooKeeper path.
+ */
public ServiceLockPath getGarbageCollector(boolean withLock) {
Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr ->
true, withLock);
if (results.isEmpty()) {
@@ -297,6 +302,11 @@ public class ServiceLockPaths {
}
}
+ /**
+ * Note that the ServiceLockPath object returned by this method does not
populate the server
+ * attribute. The location of the Manager is not in the ZooKeeper path.
Instead, use
+ * InstanceOperations.getServers(ServerId.Type.MANAGER) to get the location.
+ */
public ServiceLockPath getManager(boolean withLock) {
Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true,
addr -> true, withLock);
if (results.isEmpty()) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
index d0076e69f1..29057cf3cb 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java
@@ -21,8 +21,9 @@ package org.apache.accumulo.core.rpc.clients;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.UnknownHostException;
-import java.util.List;
+import java.util.Set;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.thrift.TServiceClient;
@@ -36,18 +37,14 @@ public interface ManagerClient<C extends TServiceClient> {
default C getManagerConnection(Logger log, ThriftClientTypes<C> type,
ClientContext context) {
checkArgument(context != null, "context is null");
- List<String> locations = context.getManagerLocations();
+ Set<ServerId> managers =
context.instanceOperations().getServers(ServerId.Type.MANAGER);
- if (locations.isEmpty()) {
+ if (managers == null || managers.isEmpty()) {
log.debug("No managers...");
return null;
}
- HostAndPort manager = HostAndPort.fromString(locations.get(0));
- if (manager.getPort() == 0) {
- return null;
- }
-
+ HostAndPort manager =
HostAndPort.fromString(managers.iterator().next().toHostPortString());
try {
// Manager requests can take a long time: don't ever time out
return ThriftUtil.getClientNoTimeout(type, manager, context);
@@ -60,7 +57,6 @@ public interface ManagerClient<C extends TServiceClient> {
log.debug("Failed to connect to manager=" + manager + ", will retry...
", tte);
return null;
}
-
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index d775d61a0a..d9258a1130 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -49,6 +49,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
@@ -185,7 +186,7 @@ public class Gatherer {
Map<String,Map<StoredTabletFile,List<TRowRange>>> locations = new
HashMap<>();
- List<String> tservers = null;
+ List<ServerId> tservers = null;
for (Entry<StoredTabletFile,List<TabletMetadata>> entry :
files.entrySet()) {
@@ -203,7 +204,8 @@ public class Gatherer {
if (location == null) {
if (tservers == null) {
- tservers = ctx.instanceOperations().getTabletServers();
+ tservers =
+ new
ArrayList<>(ctx.instanceOperations().getServers(ServerId.Type.TABLET_SERVER));
Collections.sort(tservers);
}
@@ -211,7 +213,7 @@ public class Gatherer {
// same file (as long as the set of tservers is stable).
int idx = Math.abs(Hashing.murmur3_32_fixed()
.hashString(entry.getKey().getNormalizedPathStr(), UTF_8).asInt())
% tservers.size();
- location = tservers.get(idx);
+ location = tservers.get(idx).toHostPortString();
}
// merge contiguous ranges
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 7dafb38929..15c3a92019 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -62,10 +62,9 @@ public class ExternalCompactionUtil {
private final HostAndPort compactor;
private final Future<TExternalCompactionJob> future;
- public RunningCompactionFuture(String group, HostAndPort compactor,
- Future<TExternalCompactionJob> future) {
- this.group = group;
- this.compactor = compactor;
+ public RunningCompactionFuture(ServiceLockPath slp,
Future<TExternalCompactionJob> future) {
+ this.group = slp.getResourceGroup();
+ this.compactor = HostAndPort.fromString(slp.getServer());
this.future = future;
}
@@ -202,11 +201,10 @@ public class ExternalCompactionUtil {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
- getCompactorAddrs(context).forEach((group, hp) -> {
- hp.forEach(hostAndPort -> {
- rcFutures.add(new RunningCompactionFuture(group, hostAndPort,
- executor.submit(() -> getRunningCompaction(hostAndPort,
context))));
- });
+ context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
+ final HostAndPort hp = HostAndPort.fromString(slp.getServer());
+ rcFutures.add(new RunningCompactionFuture(slp,
+ executor.submit(() -> getRunningCompaction(hp, context))));
});
executor.shutdown();
@@ -231,10 +229,9 @@ public class ExternalCompactionUtil {
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
List<Future<ExternalCompactionId>> futures = new ArrayList<>();
- getCompactorAddrs(context).forEach((q, hp) -> {
- hp.forEach(hostAndPort -> {
- futures.add(executor.submit(() -> getRunningCompactionId(hostAndPort,
context)));
- });
+ context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
+ final HostAndPort hp = HostAndPort.fromString(slp.getServer());
+ futures.add(executor.submit(() -> getRunningCompactionId(hp, context)));
});
executor.shutdown();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 7290a53669..66b2601b13 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -61,6 +61,7 @@ import
org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.NamespaceNotFoundException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -550,7 +551,7 @@ public class Admin implements KeywordExecutable {
InstanceOperations io = context.instanceOperations();
if (args.isEmpty()) {
- args = io.getTabletServers();
+ io.getServers(ServerId.Type.TABLET_SERVER).forEach(t ->
args.add(t.toHostPortString()));
}
int unreachable = 0;
@@ -631,7 +632,7 @@ public class Admin implements KeywordExecutable {
private static void stopTabletServer(final ClientContext context,
List<String> servers,
final boolean force) throws AccumuloException, AccumuloSecurityException
{
- if (context.getManagerLocations().isEmpty()) {
+ if
(context.instanceOperations().getServers(ServerId.Type.MANAGER).isEmpty()) {
log.info("No managers running. Not attempting safe unload of tserver.");
return;
}
@@ -641,10 +642,10 @@ public class Admin implements KeywordExecutable {
}
final ZooCache zc = context.getZooCache();
- List<String> runningServers;
+ Set<ServerId> runningServers;
for (String server : servers) {
- runningServers = context.instanceOperations().getTabletServers();
+ runningServers =
context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
if (runningServers.size() == 1 && !force) {
log.info("Only 1 tablet server running. Not attempting shutdown of
{}", server);
return;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
index 416bbf39a2..b55ad71114 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
@@ -18,6 +18,13 @@
*/
package org.apache.accumulo.server.util;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -145,11 +152,15 @@ public class ECAdmin implements KeywordExecutable {
}
private void listCompactorsByQueue(ServerContext context) {
- var groupToCompactorsMap =
ExternalCompactionUtil.getCompactorAddrs(context);
- if (groupToCompactorsMap.isEmpty()) {
+ Set<ServerId> compactors =
context.instanceOperations().getServers(ServerId.Type.COMPACTOR);
+ if (compactors.isEmpty()) {
System.out.println("No Compactors found.");
} else {
- groupToCompactorsMap.forEach((q, compactors) -> System.out.println(q +
": " + compactors));
+ Map<String,List<ServerId>> m = new TreeMap<>();
+ compactors.forEach(csi -> {
+ m.putIfAbsent(csi.getResourceGroup(), new ArrayList<>()).add(csi);
+ });
+ m.forEach((q, c) -> System.out.println(q + ": " + c));
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Info.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java
index b2645a7688..e1f3129106 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Info.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java
@@ -18,6 +18,9 @@
*/
package org.apache.accumulo.server.util;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.util.MonitorUtil;
import org.apache.accumulo.server.ServerContext;
@@ -47,8 +50,14 @@ public class Info implements KeywordExecutable {
@Override
public void execute(final String[] args) throws KeeperException,
InterruptedException {
var context = new ServerContext(SiteConfiguration.auto());
+ Set<ServerId> managers =
context.instanceOperations().getServers(ServerId.Type.MANAGER);
+ String manager = null;
+ if (managers != null && !managers.isEmpty()) {
+ manager = managers.iterator().next().getHost();
+ }
+
System.out.println("monitor: " + MonitorUtil.getLocation(context));
- System.out.println("managers: " + context.getManagerLocations());
+ System.out.println("managers: " + manager);
System.out.println("zookeepers: " + context.getZooKeepers());
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 722e467120..3dae538065 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -311,12 +312,12 @@ public class CompactionCoordinator
LOG.info("Shutting down");
}
- private Map<String,Set<HostAndPort>>
- getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) {
+ private Map<String,Set<HostAndPort>> getIdleCompactors(Set<ServerId>
runningCompactors) {
final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
- runningCompactors
- .forEach((group, compactorList) -> allCompactors.put(group, new
HashSet<>(compactorList)));
+ runningCompactors.forEach(
+ (csi) -> allCompactors.computeIfAbsent(csi.getResourceGroup(), (k) ->
new HashSet<>())
+ .add(HostAndPort.fromParts(csi.getHost(), csi.getPort())));
final Set<String> emptyQueues = new HashSet<>();
@@ -1009,8 +1010,8 @@ public class CompactionCoordinator
}
/* Method exists to be overridden in test to hide static method */
- protected Map<String,Set<HostAndPort>> getRunningCompactors() {
- return ExternalCompactionUtil.getCompactorAddrs(this.ctx);
+ protected Set<ServerId> getRunningCompactors() {
+ return ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR);
}
/* Method exists to be overridden in test to hide static method */
@@ -1171,11 +1172,11 @@ public class CompactionCoordinator
Sets.difference(trackedGroups,
TIME_COMPACTOR_LAST_CHECKED.keySet()));
}
- final Map<String,Set<HostAndPort>> runningCompactors =
getRunningCompactors();
+ final Set<ServerId> runningCompactors = getRunningCompactors();
final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
- runningCompactors.keySet()
- .forEach(group ->
runningCompactorGroups.add(CompactorGroupId.of(group)));
+ runningCompactors
+ .forEach(c ->
runningCompactorGroups.add(CompactorGroupId.of(c.getResourceGroup())));
final Set<CompactorGroupId> groupsWithNoCompactors =
Sets.difference(groupsInConfiguration, runningCompactorGroups);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index ae2cd37223..9a4237e619 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
@@ -190,8 +191,8 @@ public class CompactionCoordinatorTest {
}
@Override
- protected Map<String,Set<HostAndPort>> getRunningCompactors() {
- return Map.of();
+ protected Set<ServerId> getRunningCompactors() {
+ return Set.of();
}
@Override
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 1f1cbbc4ce..70a8ba2229 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -47,7 +47,9 @@ import jakarta.inject.Singleton;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
@@ -668,8 +670,8 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
log.info("User initiated fetch of External Compaction info");
- Map<String,Set<HostAndPort>> compactors =
- ExternalCompactionUtil.getCompactorAddrs(getContext());
+ Set<ServerId> compactors =
+
getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR);
log.debug("Found compactors: " + compactors);
ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
ecInfo.setCompactors(compactors);
@@ -734,11 +736,15 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
private void fetchScans() {
- ServerContext context = getContext();
- for (String server : context.instanceOperations().getTabletServers()) {
- final HostAndPort parsedServer = HostAndPort.fromString(server);
+ final ServerContext context = getContext();
+ final Set<ServerId> servers = new HashSet<>();
+
servers.addAll(context.instanceOperations().getServers(ServerId.Type.SCAN_SERVER));
+
servers.addAll(context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER));
+
+ for (ServerId server : servers) {
TabletScanClientService.Client tserver = null;
try {
+ HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(),
server.getPort());
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, context);
List<ActiveScan> scans = tserver.getActiveScans(null,
context.rpcCreds());
tserverScans.put(parsedServer, new ScanStats(scans));
@@ -759,38 +765,13 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
tserverIter.remove();
}
}
- // Scan Servers
- for (String server : context.instanceOperations().getScanServers()) {
- final HostAndPort parsedServer = HostAndPort.fromString(server);
- TabletScanClientService.Client sserver = null;
- try {
- sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, context);
- List<ActiveScan> scans = sserver.getActiveScans(null,
context.rpcCreds());
- sserverScans.put(parsedServer, new ScanStats(scans));
- scansFetchedNanos = System.nanoTime();
- } catch (Exception ex) {
- log.error("Failed to get active scans from {}", server, ex);
- } finally {
- ThriftUtil.returnClient(sserver, context);
- }
- }
- // Age off old scan information
- Iterator<Entry<HostAndPort,ScanStats>> sserverIter =
sserverScans.entrySet().iterator();
- // clock time used for fetched for date friendly display
- now = System.currentTimeMillis();
- while (sserverIter.hasNext()) {
- Entry<HostAndPort,ScanStats> entry = sserverIter.next();
- if (now - entry.getValue().fetched > ageOffEntriesMillis) {
- sserverIter.remove();
- }
- }
}
private void fetchCompactions() {
- ServerContext context = getContext();
+ final ServerContext context = getContext();
- for (String server : context.instanceOperations().getTabletServers()) {
- final HostAndPort parsedServer = HostAndPort.fromString(server);
+ for (ServerId server :
context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) {
+ final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(),
server.getPort());
Client tserver = null;
try {
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
parsedServer, context);
@@ -803,6 +784,21 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
ThriftUtil.returnClient(tserver, context);
}
}
+ for (ServerId server :
context.instanceOperations().getServers(ServerId.Type.COMPACTOR)) {
+ final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(),
server.getPort());
+ CompactorService.Client compactor = null;
+ try {
+ compactor = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR,
parsedServer, context);
+ var compacts = compactor.getActiveCompactions(null,
context.rpcCreds());
+ allCompactions.put(parsedServer, new CompactionStats(compacts));
+ compactsFetchedNanos = System.nanoTime();
+ } catch (Exception ex) {
+ log.debug("Failed to get active compactions from {}", server, ex);
+ } finally {
+ ThriftUtil.returnClient(compactor, context);
+ }
+ }
+
// Age off old compaction information
var entryIter = allCompactions.entrySet().iterator();
// clock time used for fetched for date friendly display
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
index 87a899e6ff..e364d1eb5b 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
@@ -31,9 +31,10 @@ public class Compactors {
public final List<CompactorInfo> compactors = new ArrayList<>();
public Compactors(ExternalCompactionInfo ecInfo) {
- ecInfo.getCompactors().forEach((q, c) -> {
+ ecInfo.getCompactors().forEach(csi -> {
var fetchedTime = ecInfo.getFetchedTimeMillis();
- c.forEach(hp -> compactors.add(new CompactorInfo(fetchedTime, q,
hp.toString())));
+ compactors
+ .add(new CompactorInfo(fetchedTime, csi.getResourceGroup(),
csi.toHostPortString()));
});
numCompactors = compactors.size();
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
index eccda4569e..059347e3f1 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
@@ -21,6 +21,8 @@ package org.apache.accumulo.monitor.rest.compactions.external;
import java.util.Optional;
import java.util.Set;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
+
import com.google.common.net.HostAndPort;
public class CoordinatorInfo {
@@ -28,14 +30,14 @@ public class CoordinatorInfo {
// Variable names become JSON keys
public long lastContact;
public String server;
- public int numQueues;
+ public long numQueues;
public int numCompactors;
public CoordinatorInfo(Optional<HostAndPort> serverOpt,
ExternalCompactionInfo ecInfo) {
server = serverOpt.map(HostAndPort::toString).orElse("none");
- var groupToCompactors = ecInfo.getCompactors();
- numQueues = groupToCompactors.size();
- numCompactors =
groupToCompactors.values().stream().mapToInt(Set::size).sum();
+ Set<ServerId> compactors = ecInfo.getCompactors();
+ numQueues = compactors.stream().map(csi ->
csi.getResourceGroup()).distinct().count();
+ numCompactors = compactors.size();
lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis();
}
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
index 251eb16a32..6d0e9f798a 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
@@ -18,11 +18,12 @@
*/
package org.apache.accumulo.monitor.rest.compactions.external;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
+
import com.google.common.net.HostAndPort;
/**
@@ -31,7 +32,7 @@ import com.google.common.net.HostAndPort;
public class ExternalCompactionInfo {
private Optional<HostAndPort> coordinatorHost;
- private Map<String,Set<HostAndPort>> compactors = new HashMap<>();
+ private Set<ServerId> compactors = new HashSet<>();
private long fetchedTimeMillis;
public void setCoordinatorHost(Optional<HostAndPort> coordinatorHost) {
@@ -42,11 +43,11 @@ public class ExternalCompactionInfo {
return coordinatorHost;
}
- public Map<String,Set<HostAndPort>> getCompactors() {
+ public Set<ServerId> getCompactors() {
return compactors;
}
- public void setCompactors(Map<String,Set<HostAndPort>> compactors) {
+ public void setCompactors(Set<ServerId> compactors) {
this.compactors = compactors;
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java
index c2c20cc3e3..6abfc6e2fb 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
@@ -30,11 +31,11 @@ import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.manager.thrift.DeadServer;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
-import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.rest.logs.DeadLoggerInformation;
import org.apache.accumulo.monitor.rest.logs.DeadLoggerList;
@@ -99,10 +100,14 @@ public class ManagerResource {
for (DeadServer down : mmi.deadTabletServers) {
tservers.add(down.server);
}
- List<String> managers = monitor.getContext().getManagerLocations();
- String manager =
- managers.isEmpty() ? "Down" :
AddressUtil.parseAddress(managers.get(0)).getHost();
+ Set<ServerId> managers =
+
monitor.getContext().instanceOperations().getServers(ServerId.Type.MANAGER);
+ String manager = "Down";
+ if (managers != null && !managers.isEmpty()) {
+ manager = managers.iterator().next().getHost();
+ }
+
int onlineTabletServers = mmi.tServerInfo.size();
int totalTabletServers = tservers.size();
int tablets = monitor.getTotalTabletCount();
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java
index 775f612a61..1b0140af47 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java
@@ -18,14 +18,13 @@
*/
package org.apache.accumulo.monitor.rest.status;
-import java.util.List;
-
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
+import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.monitor.Monitor;
@@ -67,8 +66,8 @@ public class StatusResource {
gcStatus = Status.ERROR;
}
- List<String> managers = monitor.getContext().getManagerLocations();
- managerStatus = managers.isEmpty() ? Status.ERROR : Status.OK;
+ ServiceLockPath slp =
monitor.getContext().getServerPaths().getManager(true);
+ managerStatus = slp == null ? Status.ERROR : Status.OK;
int tServerUp = mmi.getTServerInfoSize();
int tServerDown = mmi.getDeadTabletServersSize();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index cb12d06359..5cb59315fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -65,6 +65,7 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
import org.apache.accumulo.core.clientImpl.DurabilityImpl;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -414,11 +415,11 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
private HostAndPort getManagerAddress() {
try {
- List<String> locations = getContext().getManagerLocations();
- if (locations.isEmpty()) {
+ Set<ServerId> managers =
getContext().instanceOperations().getServers(ServerId.Type.MANAGER);
+ if (managers == null || managers.isEmpty()) {
return null;
}
- return HostAndPort.fromString(locations.get(0));
+ return
HostAndPort.fromString(managers.iterator().next().toHostPortString());
} catch (Exception e) {
log.warn("Failed to obtain manager host " + e);
}
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java
index 62dd93f269..044973dd2d 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java
@@ -31,11 +31,16 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveCompaction;
import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.util.DurationFormat;
-import org.apache.accumulo.shell.Shell;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
class ActiveCompactionHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(ActiveCompactionHelper.class);
private static final Comparator<ActiveCompaction> COMPACTION_AGE_DESCENDING =
Comparator.comparingLong(ActiveCompaction::getAge).reversed();
@@ -94,10 +99,10 @@ class ActiveCompactionHelper {
try {
var dur = new DurationFormat(ac.getAge(), "");
return String.format(
- "%21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s |
%9s | %s", host, dur,
- ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()),
- shortenCount(ac.getEntriesWritten()), ac.getTable(), ac.getTablet(),
- ac.getInputFiles().size(), output, iterList, iterOpts);
+ "%21s | %21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s |
%35s | %9s | %s",
+ ac.getHost().getResourceGroup(), host, dur, ac.getType(),
ac.getReason(),
+ shortenCount(ac.getEntriesRead()),
shortenCount(ac.getEntriesWritten()), ac.getTable(),
+ ac.getTablet(), ac.getInputFiles().size(), output, iterList,
iterOpts);
} catch (TableNotFoundException e) {
return "ERROR " + e.getMessage();
}
@@ -105,20 +110,30 @@ class ActiveCompactionHelper {
public static Stream<String> appendHeader(Stream<String> stream) {
Stream<String> header = Stream.of(String.format(
- " %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s |
%-35s | %-9s | %s",
- "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE", "TABLET",
"INPUT", "OUTPUT",
- "ITERATORS", "ITERATOR OPTIONS"));
+ " %-21s| %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s |
%-5s | %-35s | %-9s | %s",
+ "GROUP", "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE",
"TABLET", "INPUT",
+ "OUTPUT", "ITERATORS", "ITERATOR OPTIONS"));
return Stream.concat(header, stream);
}
public static Stream<String> activeCompactionsForServer(String tserver,
InstanceOperations instanceOps) {
- try {
- return
instanceOps.getActiveCompactions(tserver).stream().sorted(COMPACTION_AGE_DESCENDING)
- .map(ActiveCompactionHelper::formatActiveCompactionLine);
- } catch (Exception e) {
- Shell.log.debug("Failed to list active compactions for server {}",
tserver, e);
- return Stream.of(tserver + " ERROR " + e.getMessage());
+ final HostAndPort hp = HostAndPort.fromString(tserver);
+ ServerId server =
+ instanceOps.getServer(ServerId.Type.COMPACTOR, null, hp.getHost(),
hp.getPort());
+ if (server == null) {
+ server = instanceOps.getServer(ServerId.Type.TABLET_SERVER, null,
hp.getHost(), hp.getPort());
+ }
+ if (server == null) {
+ return Stream.of();
+ } else {
+ try {
+ return
instanceOps.getActiveCompactions(server).stream().sorted(COMPACTION_AGE_DESCENDING)
+ .map(ActiveCompactionHelper::formatActiveCompactionLine);
+ } catch (Exception e) {
+ LOG.debug("Failed to list active compactions for server {}", tserver,
e);
+ return Stream.of(tserver + " ERROR " + e.getMessage());
+ }
}
}
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java
index 77288369ce..6eb6a076d8 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java
@@ -22,16 +22,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.ScanType;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.util.DurationFormat;
class ActiveScanIterator implements Iterator<String> {
private InstanceOperations instanceOps;
- private Iterator<String> tsIter;
+ private Iterator<ServerId> tsIter;
private Iterator<String> scansIter;
private void readNext() {
@@ -39,22 +41,22 @@ class ActiveScanIterator implements Iterator<String> {
while (tsIter.hasNext()) {
- final String tserver = tsIter.next();
+ final ServerId server = tsIter.next();
try {
- final List<ActiveScan> asl = instanceOps.getActiveScans(tserver);
+ final List<ActiveScan> asl = instanceOps.getActiveScans(server);
for (ActiveScan as : asl) {
var dur = new DurationFormat(as.getAge(), "");
var dur2 = new DurationFormat(as.getLastContactTime(), "");
scans.add(String.format(
- "%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s |%10s
|%20s |%10s | %s",
- tserver, as.getClient(), dur, dur2, as.getState(), as.getType(),
as.getUser(),
- as.getTable(), as.getColumns(), as.getAuthorizations(),
- (as.getType() == ScanType.SINGLE ? as.getTablet() : "N/A"),
as.getScanid(),
- as.getSsiList(), as.getSsio()));
+ "%21s |%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s
|%10s |%20s |%10s | %s",
+ server.getResourceGroup(), server.toHostPortString(),
as.getClient(), dur, dur2,
+ as.getState(), as.getType(), as.getUser(), as.getTable(),
as.getColumns(),
+ as.getAuthorizations(), (as.getType() == ScanType.SINGLE ?
as.getTablet() : "N/A"),
+ as.getScanid(), as.getSsiList(), as.getSsio()));
}
} catch (Exception e) {
- scans.add(tserver + " ERROR " + e.getMessage());
+ scans.add(server + " ERROR " + e.getMessage());
}
if (!scans.isEmpty()) {
@@ -65,14 +67,14 @@ class ActiveScanIterator implements Iterator<String> {
scansIter = scans.iterator();
}
- ActiveScanIterator(List<String> tservers, InstanceOperations instanceOps) {
+ ActiveScanIterator(Set<ServerId> tservers, InstanceOperations instanceOps) {
this.instanceOps = instanceOps;
this.tsIter = tservers.iterator();
final String header = String.format(
- " %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|"
+ " %-21s| %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|"
+ " %-8s| %-8s| %-10s| %-20s| %-10s| %-10s | %-20s | %s",
- "TABLET SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER",
"TABLE", "COLUMNS",
+ "GROUP", "SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER",
"TABLE", "COLUMNS",
"AUTHORIZATIONS", "TABLET", "SCAN ID", "ITERATORS", "ITERATOR
OPTIONS");
scansIter = Collections.singletonList(header).iterator();
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java
index acca3aa838..54ec60055b 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java
@@ -18,16 +18,19 @@
*/
package org.apache.accumulo.shell.commands;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import com.google.common.net.HostAndPort;
+
public class ListScansCommand extends Command {
private Option tserverOption, disablePaginationOpt;
@@ -42,21 +45,23 @@ public class ListScansCommand extends Command {
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws Exception {
- List<String> tservers;
-
final InstanceOperations instanceOps =
shellState.getAccumuloClient().instanceOperations();
-
final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt());
+ final Set<ServerId> servers = new HashSet<>();
if (cl.hasOption(tserverOption.getOpt())) {
- tservers = new ArrayList<>();
- tservers.add(cl.getOptionValue(tserverOption.getOpt()));
+ String serverAddress = cl.getOptionValue(tserverOption.getOpt());
+ final HostAndPort hp = HostAndPort.fromString(serverAddress);
+ servers
+ .add(instanceOps.getServer(ServerId.Type.SCAN_SERVER, null,
hp.getHost(), hp.getPort()));
+ servers.add(
+ instanceOps.getServer(ServerId.Type.TABLET_SERVER, null,
hp.getHost(), hp.getPort()));
} else {
- tservers = instanceOps.getTabletServers();
- tservers.addAll(instanceOps.getScanServers());
+ servers.addAll(instanceOps.getServers(ServerId.Type.SCAN_SERVER));
+ servers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER));
}
- shellState.printLines(new ActiveScanIterator(tservers, instanceOps),
paginate);
+ shellState.printLines(new ActiveScanIterator(servers, instanceOps),
paginate);
return 0;
}
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
index f9fdaa7b7b..006a7e3b12 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
import org.apache.commons.cli.CommandLine;
@@ -41,17 +42,17 @@ public class PingCommand extends Command {
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws Exception {
- List<String> tservers;
+ final List<String> tservers = new ArrayList<>();
final InstanceOperations instanceOps =
shellState.getAccumuloClient().instanceOperations();
final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt());
if (cl.hasOption(tserverOption.getOpt())) {
- tservers = new ArrayList<>();
tservers.add(cl.getOptionValue(tserverOption.getOpt()));
} else {
- tservers = instanceOps.getTabletServers();
+ instanceOps.getServers(ServerId.Type.TABLET_SERVER)
+ .forEach(s -> tservers.add(s.toHostPortString()));
}
shellState.printLines(new PingIterator(tservers, instanceOps), paginate);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
index 1dc1c2c0f9..3513e1e90e 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
@@ -69,6 +69,7 @@ import
org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.sample.Sampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
@@ -475,7 +476,8 @@ public abstract class ComprehensiveBaseIT extends
SharedMiniClusterBase {
public void invalidInstanceName() {
try (var client = Accumulo.newClient().to("fake_instance_name",
getCluster().getZooKeepers())
.as(getAdminPrincipal(), getToken()).build()) {
- assertThrows(RuntimeException.class, () ->
client.instanceOperations().getTabletServers());
+ assertThrows(RuntimeException.class,
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER));
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
new file mode 100644
index 0000000000..5372bad803
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+public class InstanceOperationsIT extends AccumuloClusterHarness {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
+ cfg.getClusterServerConfiguration().setNumDefaultCompactors(3);
+ cfg.getClusterServerConfiguration().setNumDefaultScanServers(2);
+ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testGetServers() throws AccumuloException,
AccumuloSecurityException {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ InstanceOperations iops = client.instanceOperations();
+
+ assertEquals(3, iops.getServers(ServerId.Type.COMPACTOR).size());
+ assertEquals(3, iops.getCompactors().size());
+ validateAddresses(iops.getCompactors(),
iops.getServers(ServerId.Type.COMPACTOR));
+
+ assertEquals(2, iops.getServers(ServerId.Type.SCAN_SERVER).size());
+ assertEquals(2, iops.getScanServers().size());
+ validateAddresses(iops.getScanServers(),
iops.getServers(ServerId.Type.SCAN_SERVER));
+
+ assertEquals(1, iops.getServers(ServerId.Type.TABLET_SERVER).size());
+ assertEquals(1, iops.getTabletServers().size());
+ validateAddresses(iops.getTabletServers(),
iops.getServers(ServerId.Type.TABLET_SERVER));
+
+ assertEquals(1, iops.getServers(ServerId.Type.MANAGER).size());
+ assertEquals(1, iops.getManagerLocations().size());
+ validateAddresses(iops.getManagerLocations(),
iops.getServers(ServerId.Type.MANAGER));
+
+ for (ServerId compactor : iops.getServers(ServerId.Type.COMPACTOR)) {
+ assertNotNull(iops.getActiveCompactions(compactor));
+ assertThrows(IllegalArgumentException.class, () ->
iops.getActiveScans(compactor));
+ }
+
+ for (ServerId tserver : iops.getServers(ServerId.Type.TABLET_SERVER)) {
+ assertNotNull(iops.getActiveCompactions(tserver));
+ assertNotNull(iops.getActiveScans(tserver));
+ }
+
+ for (ServerId sserver : iops.getServers(ServerId.Type.SCAN_SERVER)) {
+ assertThrows(IllegalArgumentException.class, () ->
iops.getActiveCompactions(sserver));
+ assertNotNull(iops.getActiveScans(sserver));
+ }
+
+ }
+ }
+
+ private void validateAddresses(Collection<String> e, Set<ServerId>
addresses) {
+ List<String> actual = new ArrayList<>(addresses.size());
+ addresses.forEach(a -> actual.add(a.toHostPortString()));
+ List<String> expected = new ArrayList<>(e);
+ Collections.sort(expected);
+ Collections.sort(actual);
+ assertEquals(actual, expected);
+ }
+
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
index f93014febf..eced2eb8b4 100644
--- a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -69,7 +70,8 @@ public class InterruptibleScannersIT extends
AccumuloClusterHarness {
Thread thread = new Thread(() -> {
try {
// ensure the scan is running: not perfect, the metadata tables
could be scanned, too.
- String tserver =
client.instanceOperations().getTabletServers().iterator().next();
+ ServerId tserver =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)
+ .iterator().next();
do {
ArrayList<ActiveScan> scans =
new
ArrayList<>(client.instanceOperations().getActiveScans(tserver));
diff --git a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
index 175e342f5e..ff89e81b92 100644
--- a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
@@ -115,7 +116,9 @@ public class LocatorIT extends AccumuloClusterHarness {
ArrayList<Range> ranges = new ArrayList<>();
- HashSet<String> tservers = new
HashSet<>(client.instanceOperations().getTabletServers());
+ HashSet<String> tservers = new HashSet<>();
+ client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)
+ .forEach((s) -> tservers.add(s.toHostPortString()));
// locate won't find any locations, tablets are not hosted
ranges.add(r1);
diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
index 6da4cd9d77..2b245a6ce7 100644
--- a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.test;
-import static
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -143,10 +142,12 @@ public class RecoveryIT extends AccumuloClusterHarness {
// Stop any running Compactors and ScanServers
control.stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).size() == 0, 60_000);
+ Wait.waitFor(() -> getServerContext().getServerPaths()
+ .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
control.stopAllServers(ServerType.SCAN_SERVER);
- Wait.waitFor(() -> ((ClientContext) c).getScanServers().size() == 0,
60_000);
+ Wait.waitFor(() -> getServerContext().getServerPaths()
+ .getScanServer(rg -> true, addr -> true, true).size() == 0, 60_000);
// Kill the TabletServer in resource group that is hosting the table
List<Process> procs = control.getTabletServers(RESOURCE_GROUP);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
index 47365f22ef..91e6868665 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
@@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
@@ -148,9 +147,10 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
"localhost");
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
.getScanServer(rg -> true, addr -> true, true).size() == 1,
30_000);
- Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream().anyMatch(
- (p) ->
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
- == true);
+ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
+ .getScanServer(rg ->
rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME),
+ addr -> true, true)
+ .size() > 0);
assertEquals(ingestedEntryCount, Iterables.size(scanner),
"The scan server scanner should have seen all ingested and flushed
entries");
@@ -166,11 +166,12 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
.getScanServer(rg -> true, addr -> true, true).size() == 2,
30_000);
- Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream().anyMatch(
- (p) ->
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
- == true);
- Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream()
- .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true);
+ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
+ .getScanServer(rg ->
rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME),
+ addr -> true, true)
+ .size() == 1);
+ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
+ .getScanServer(rg -> rg.equals("GROUP1"), addr -> true,
true).size() == 1);
scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
assertEquals(ingestedEntryCount + additionalIngest1,
Iterables.size(scanner),
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
index 8e2fb7b796..2feb238fd2 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
@@ -40,6 +40,7 @@ import
org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Timer;
@@ -67,7 +68,8 @@ public class ScanServerMaxLatencyIT extends
ConfigurableMacBase {
ExecutorService executor = Executors.newCachedThreadPool();
try (var client =
Accumulo.newClient().from(getClientProperties()).build()) {
- Wait.waitFor(() ->
!client.instanceOperations().getScanServers().isEmpty());
+ Wait.waitFor(
+ () ->
!client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty());
var ntc = new NewTableConfiguration();
ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(),
"2s"));
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
index 99694f633a..52a831a502 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
@@ -30,7 +30,7 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -115,7 +115,8 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
}
// ScanServer should stop after the 3rd batch scan closes
- Wait.waitFor(() -> ((ClientContext) client).getScanServers().size() ==
0);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty());
// The ScanServer should clean up the references on normal shutdown
Wait.waitFor(() -> ctx.getAmple().scanServerRefs().list().count() == 0);
diff --git
a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index ae4009ee07..faa1584d06 100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@ -25,10 +25,12 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
@@ -53,10 +55,8 @@ public class TabletServerGivesUpIT extends
ConfigurableMacBase {
@Test
public void test() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
- while (client.instanceOperations().getTabletServers().isEmpty()) {
- // Wait until at least one tablet server is up
- Thread.sleep(100);
- }
+ Wait.waitFor(
+ () ->
!client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty());
final String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
@@ -90,7 +90,7 @@ public class TabletServerGivesUpIT extends
ConfigurableMacBase {
});
backgroundWriter.start();
// wait for the tserver to give up on writing to the WAL
- while (client.instanceOperations().getTabletServers().size() == 1) {
+ while
(client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
1) {
Thread.sleep(SECONDS.toMillis(1));
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
index 782a9f8bec..de7fccefc5 100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
@@ -55,7 +56,7 @@ public class TabletServerHdfsRestartIT extends
ConfigurableMacBase {
public void test() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
// wait until a tablet server is up
- while (client.instanceOperations().getTabletServers().isEmpty()) {
+ while
(client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty())
{
Thread.sleep(50);
}
final String tableName = getUniqueNames(1)[0];
diff --git a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
index 0dd6f24f82..85afbdf2eb 100644
--- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
@@ -127,9 +128,10 @@ public class TotalQueuedIT extends ConfigurableMacBase {
private long getSyncs(AccumuloClient c) throws Exception {
ServerContext context = getServerContext();
- for (String address : c.instanceOperations().getTabletServers()) {
- TabletServerClientService.Client client = ThriftUtil
- .getClient(ThriftClientTypes.TABLET_SERVER,
HostAndPort.fromString(address), context);
+ for (ServerId tserver :
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) {
+ TabletServerClientService.Client client =
+ ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
+ HostAndPort.fromParts(tserver.getHost(), tserver.getPort()),
context);
TabletServerStatus status = client.getTabletServerStatus(null,
context.rpcCreds());
return status.syncs;
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index 5396db64da..b5e41e4139 100644
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
@@ -35,6 +36,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.util.Wait;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.jupiter.api.Test;
@@ -53,21 +55,18 @@ public class TransportCachingIT extends
AccumuloClusterHarness {
public void testCachedTransport() throws InterruptedException {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- List<String> tservers;
-
- while ((tservers =
client.instanceOperations().getTabletServers()).isEmpty()) {
- // sleep until a tablet server is up
- Thread.sleep(50);
- }
+ Wait.waitFor(
+ () ->
!client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty());
ClientContext context = (ClientContext) client;
long rpcTimeout =
ConfigurationTypeHelper.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
- List<ThriftTransportKey> servers = tservers.stream().map(serverStr -> {
- return new ThriftTransportKey(ThriftClientTypes.CLIENT,
HostAndPort.fromString(serverStr),
- rpcTimeout, context);
- }).collect(Collectors.toList());
+ List<ThriftTransportKey> servers =
+
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).stream().map(tsi
-> {
+ return new ThriftTransportKey(ThriftClientTypes.CLIENT,
+ HostAndPort.fromParts(tsi.getHost(), tsi.getPort()),
rpcTimeout, context);
+ }).collect(Collectors.toList());
// only want to use one server for all subsequent test
ThriftTransportKey ttk = servers.get(0);
diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index 60482e020c..7fc9d5c797 100644
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.ScanType;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
@@ -201,7 +202,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
// should eventually see the four zombie scans running against four
tablets
Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4);
- assertEquals(1, c.instanceOperations().getTabletServers().size());
+ assertEquals(1,
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size());
// Start 3 new tablet servers, this should cause the table to balance
and the tablets with
// zombie scans to unload. The Zombie scans should not prevent the table
from unloading. The
@@ -210,7 +211,8 @@ public class ZombieScanIT extends ConfigurableMacBase {
getCluster().getClusterControl().start(ServerType.TABLET_SERVER,
Map.of(), 4);
// Wait for all tablets servers
- Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() ==
4);
+ Wait.waitFor(
+ () ->
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 4);
// The table should eventually balance across the 4 tablet servers
Wait.waitFor(() -> countLocations(table, c) == 4);
@@ -234,15 +236,15 @@ public class ZombieScanIT extends ConfigurableMacBase {
// The zombie scans should migrate with the tablets, taking up more scan
threads in the
// system.
- Set<String> tabletSeversWithZombieScans = new HashSet<>();
- for (String tserver : c.instanceOperations().getTabletServers()) {
+ Set<ServerId> tabletServersWithZombieScans = new HashSet<>();
+ for (ServerId tserver :
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) {
if (c.instanceOperations().getActiveScans(tserver).stream()
.flatMap(activeScan -> activeScan.getSsiList().stream())
.anyMatch(scanIters ->
scanIters.contains(ZombieIterator.class.getName()))) {
- tabletSeversWithZombieScans.add(tserver);
+ tabletServersWithZombieScans.add(tserver);
}
}
- assertEquals(4, tabletSeversWithZombieScans.size());
+ assertEquals(4, tabletServersWithZombieScans.size());
// This check may be outside the scope of this test but works nicely for
this check and is
// simple enough to include
@@ -274,7 +276,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
if (serverType == SCAN_SERVER) {
// Scans will fall back to tablet servers when no scan servers are
present. So wait for scan
// servers to show up in zookeeper. Can remove this in 3.1.
- Wait.waitFor(() -> !c.instanceOperations().getScanServers().isEmpty());
+ Wait.waitFor(() ->
!c.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty());
}
c.tableOperations().create(table);
@@ -352,9 +354,9 @@ public class ZombieScanIT extends ConfigurableMacBase {
private static long countDistinctTabletsScans(String table, AccumuloClient
client)
throws Exception {
- var tservers = client.instanceOperations().getTabletServers();
+ Set<ServerId> tservers =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
long count = 0;
- for (String tserver : tservers) {
+ for (ServerId tserver : tservers) {
count += client.instanceOperations().getActiveScans(tserver).stream()
.filter(activeScan -> activeScan.getTable().equals(table))
.map(activeScan -> activeScan.getTablet()).distinct().count();
@@ -418,7 +420,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
throws AccumuloException, AccumuloSecurityException {
Set<Long> scanIds = new HashSet<>();
Set<ScanType> scanTypes = new HashSet<>();
- for (String tserver : c.instanceOperations().getTabletServers()) {
+ for (ServerId tserver :
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) {
c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> {
scanIds.add(activeScan.getScanid());
scanTypes.add(activeScan.getType());
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index b976aab38e..d569c4f511 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -23,7 +23,6 @@ import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
import static
org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH;
-import static
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -119,7 +118,9 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
@BeforeEach
public void setupMetricsTest() throws Exception {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty());
+ Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
+ .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
+
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index 0db83c044a..7676350790 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.test.fate;
import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
-import static
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -101,7 +100,8 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
// initiated on starting the manager, causing the test to fail. Stopping
the compactor fixes
// this issue.
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000);
+ Wait.waitFor(() -> getServerContext().getServerPaths()
+ .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
}
@Test
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
index f38a6d8bff..84dc9cd773 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.conf.Property;
@@ -93,9 +94,12 @@ public class BalanceInPresenceOfOfflineTableIT extends
AccumuloClusterHarness {
accumuloClient = Accumulo.newClient().from(getClientProps()).build();
// Need at least two tservers -- wait for them to start before failing
- Wait.waitFor(() ->
accumuloClient.instanceOperations().getTabletServers().size() >= 2);
+ Wait.waitFor(
+ () ->
accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
+ >= 2);
- assumeTrue(accumuloClient.instanceOperations().getTabletServers().size()
>= 2,
+ assumeTrue(
+
accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
>= 2,
"Not enough tservers to run test");
// set up splits
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 3f9d151a8c..687c233345 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -71,6 +71,7 @@ import
org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.PluginConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -1176,16 +1177,16 @@ public class CompactionIT extends CompactionBaseIT {
compactions.clear();
do {
- HostAndPort hp = HostAndPort.fromParts(host.getAddress(),
host.getPort());
-
client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) ->
{
- try {
- if (ac.getTable().equals(table1)) {
- compactions.add(ac);
- }
- } catch (TableNotFoundException e1) {
- fail("Table was deleted during test, should not happen");
- }
- });
+ client.instanceOperations().getActiveCompactions(new
ServerId(ServerId.Type.COMPACTOR,
+ host.getResourceGroup(), host.getAddress(),
host.getPort())).forEach((ac) -> {
+ try {
+ if (ac.getTable().equals(table1)) {
+ compactions.add(ac);
+ }
+ } catch (TableNotFoundException e1) {
+ fail("Table was deleted during test, should not happen");
+ }
+ });
Thread.sleep(1000);
} while (compactions.isEmpty());
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
index 5bf6004fe1..84699dde5e 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
@@ -23,10 +23,12 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.UncheckedIOException;
-import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.rpc.clients.TServerClient;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -41,12 +43,12 @@ public class DebugClientConnectionIT extends
AccumuloClusterHarness {
cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2);
}
- private List<String> tservers = null;
+ private Set<ServerId> tservers = null;
@BeforeEach
public void getTServerAddresses() {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- tservers = client.instanceOperations().getTabletServers();
+ tservers =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
}
assertNotNull(tservers);
assertEquals(2, tservers.size());
@@ -54,11 +56,12 @@ public class DebugClientConnectionIT extends
AccumuloClusterHarness {
@Test
public void testPreferredConnection() throws Exception {
- System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0));
+ Iterator<ServerId> tsi = tservers.iterator();
+ System.setProperty(TServerClient.DEBUG_HOST,
tsi.next().toHostPortString());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
assertNotNull(client.instanceOperations().getSiteConfiguration());
}
- System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1));
+ System.setProperty(TServerClient.DEBUG_HOST,
tsi.next().toHostPortString());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
assertNotNull(client.instanceOperations().getSiteConfiguration());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
index ad06b85d37..64b75d9f2e 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -156,7 +157,7 @@ public class HalfDeadTServerIT extends ConfigurableMacBase {
public String test(int seconds, boolean expectTserverDied) throws Exception {
assumeTrue(sharedLibBuilt.get(), "Shared library did not build");
try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
- while (client.instanceOperations().getTabletServers().isEmpty()) {
+ while
(client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty())
{
// wait until the tserver that we need to kill is running
Thread.sleep(50);
}
@@ -191,7 +192,7 @@ public class HalfDeadTServerIT extends ConfigurableMacBase {
cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
Thread.sleep(SECONDS.toMillis(1));
client.tableOperations().create("test_ingest");
- assertEquals(1, client.instanceOperations().getTabletServers().size());
+ assertEquals(1,
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size());
int rows = 100_000;
ingest =
cluster.exec(TestIngest.class, "-c", cluster.getClientPropsPath(),
"--rows", rows + "")
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
index 75b7f6447d..5ed4913f8b 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
@@ -55,6 +55,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
import org.apache.accumulo.core.conf.Property;
@@ -476,7 +477,8 @@ public class ManagerAssignmentIT extends
SharedMiniClusterBase {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 1,
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1,
SECONDS.toMillis(60), SECONDS.toMillis(2));
client.tableOperations().create(tableName);
@@ -552,14 +554,17 @@ public class ManagerAssignmentIT extends
SharedMiniClusterBase {
});
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 0);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
0);
// restart the tablet server for the other tests. Need to call
stopAllServers
// to clear out the process list because we shutdown the TabletServer
outside
// of MAC control.
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 1, 60_000);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1,
+ 60_000);
}
}
@@ -568,7 +573,8 @@ public class ManagerAssignmentIT extends
SharedMiniClusterBase {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 1,
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1,
SECONDS.toMillis(60), SECONDS.toMillis(2));
client.instanceOperations().waitForBalance();
@@ -601,14 +607,17 @@ public class ManagerAssignmentIT extends
SharedMiniClusterBase {
}
});
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 0);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
0);
// restart the tablet server for the other tests. Need to call
stopAllServers
// to clear out the process list because we shutdown the TabletServer
outside
// of MAC control.
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
- Wait.waitFor(() -> client.instanceOperations().getTabletServers().size()
== 1, 60_000);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1,
+ 60_000);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 8f9c38af3c..5464bbcf0e 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -24,10 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.DoubleAdder;
@@ -36,6 +34,7 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -139,15 +138,12 @@ public class MemoryStarvedMajCIT extends
SharedMiniClusterBase {
ClientContext ctx = (ClientContext) client;
- Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size()
== 1, 60_000);
- Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx)
- .get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000);
+ Wait.waitFor(() -> ctx.getServerPaths()
+ .getCompactor(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true)
+ .size() == 1, 60_000);
- Map<String,Set<HostAndPort>> groupedCompactors =
- ExternalCompactionUtil.getCompactorAddrs(ctx);
- List<HostAndPort> compactorAddresses =
- new
ArrayList<>(groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME));
- HostAndPort compactorAddr = compactorAddresses.get(0);
+ ServerId csi =
ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR).iterator().next();
+ HostAndPort compactorAddr = HostAndPort.fromParts(csi.getHost(),
csi.getPort());
TableOperations to = client.tableOperations();
to.create(table);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 8ab1f4da36..02519e10af 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -52,6 +52,7 @@ import
org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -179,11 +180,11 @@ public class ScanIdIT extends AccumuloClusterHarness {
// all scanner have reported at least 1 result, so check for unique scan
ids.
Set<Long> scanIds = new HashSet<>();
- List<String> tservers = client.instanceOperations().getTabletServers();
+ Set<ServerId> tservers =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
log.debug("tablet servers {}", tservers);
- for (String tserver : tservers) {
+ for (ServerId tserver : tservers) {
List<ActiveScan> activeScans = null;
for (int i = 0; i < 10; i++) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index ef5089c961..1a19e0cae6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -139,7 +140,8 @@ public class ScannerIT extends ConfigurableMacBase {
if (serverType == SCAN_SERVER) {
// Scans will fall back to tablet servers when no scan servers are
present. So wait for scan
// servers to show up in zookeeper. Can remove this in 3.1.
- Wait.waitFor(() ->
!accumuloClient.instanceOperations().getScanServers().isEmpty());
+ Wait.waitFor(() -> !accumuloClient.instanceOperations()
+ .getServers(ServerId.Type.SCAN_SERVER).isEmpty());
}
accumuloClient.tableOperations().create(tableName);
@@ -215,18 +217,18 @@ public class ScannerIT extends ConfigurableMacBase {
public static long countActiveScans(AccumuloClient c, ServerType serverType,
String tableName)
throws Exception {
- final Collection<String> servers;
+ final Collection<ServerId> servers;
if (serverType == TABLET_SERVER) {
- servers = c.instanceOperations().getTabletServers();
+ servers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
} else if (serverType == SCAN_SERVER) {
- servers = c.instanceOperations().getScanServers();
+ servers = c.instanceOperations().getServers(ServerId.Type.SCAN_SERVER);
} else {
throw new IllegalArgumentException("Unsupported server type " +
serverType);
}
long count = 0;
- for (String server : servers) {
- count += c.instanceOperations().getActiveScans(server).stream()
+ for (ServerId tserver : servers) {
+ count += c.instanceOperations().getActiveScans(tserver).stream()
.filter(activeScan ->
activeScan.getTable().equals(tableName)).count();
}
return count;
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
index efd3962b8c..2f3288433b 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -145,7 +146,8 @@ public class SessionBlockVerifyIT extends
ScanSessionTimeOutIT {
int sessionsFound = 0;
// we have configured 1 tserver, so we can grab the one and only
- String tserver =
getOnlyElement(c.instanceOperations().getTabletServers());
+ ServerId tserver =
+
getOnlyElement(c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER));
final List<ActiveScan> scans =
c.instanceOperations().getActiveScans(tserver);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
index 60ab7dd662..966c0a82ca 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
@@ -23,11 +23,12 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import java.io.IOException;
import java.time.Duration;
-import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.test.TestIngest;
@@ -117,13 +118,14 @@ public class ShutdownIT extends ConfigurableMacBase {
int x = cluster.exec(TestIngest.class, "-c", cluster.getClientPropsPath(),
"--createTable")
.getProcess().waitFor();
assertEquals(0, x);
- List<String> tabletServers = c.instanceOperations().getTabletServers();
+ Set<ServerId> tabletServers =
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
assertEquals(2, tabletServers.size());
- String doomed = tabletServers.get(0);
+ ServerId doomed = tabletServers.iterator().next();
log.info("Stopping " + doomed);
- assertEquals(0, cluster.exec(Admin.class, "stop",
doomed).getProcess().waitFor());
- tabletServers = c.instanceOperations().getTabletServers();
+ assertEquals(0,
+ cluster.exec(Admin.class, "stop",
doomed.toHostPortString()).getProcess().waitFor());
+ tabletServers =
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
assertEquals(1, tabletServers.size());
- assertNotEquals(tabletServers.get(0), doomed);
+ assertNotEquals(tabletServers.iterator().next(), doomed);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
index 6d8adf332f..c4bd7f1877 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -24,10 +24,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Credentials;
@@ -65,7 +67,7 @@ public class SimpleBalancerFairnessIT extends
ConfigurableMacBase {
TreeSet<Text> splits = TestIngest.getSplitPoints(0, 10000000,
NUM_SPLITS);
log.info("Creating {} splits", splits.size());
c.tableOperations().addSplits("unused", splits);
- List<String> tservers = c.instanceOperations().getTabletServers();
+ Set<ServerId> tservers =
c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
TestIngest.IngestParams params = new
TestIngest.IngestParams(getClientProperties());
params.rows = 5000;
TestIngest.ingest(c, params);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java
index 7f6d07abbb..4e9f9a3c35 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -56,7 +57,8 @@ public class TabletMetadataIT extends ConfigurableMacBase {
@Test
public void getLiveTServersTest() throws Exception {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
- while (c.instanceOperations().getTabletServers().size() != NUM_TSERVERS)
{
+ while
(c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
+ != NUM_TSERVERS) {
log.info("Waiting for tservers to start up...");
Thread.sleep(SECONDS.toMillis(5));
}
@@ -67,7 +69,8 @@ public class TabletMetadataIT extends ConfigurableMacBase {
getCluster().killProcess(TABLET_SERVER,
getCluster().getProcesses().get(TABLET_SERVER).iterator().next());
- while (c.instanceOperations().getTabletServers().size() == NUM_TSERVERS)
{
+ while
(c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
+ == NUM_TSERVERS) {
log.info("Waiting for a tserver to die...");
Thread.sleep(SECONDS.toMillis(5));
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
index e43d0e621b..7e44e6d57a 100644
---
a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
@@ -316,7 +317,8 @@ public class SuspendedTabletsIT extends
AccumuloClusterHarness {
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
- Wait.waitFor(() ->
client.instanceOperations().getTabletServers().size() == 1);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
1);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index d6506c320b..8ced6d6128 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.sample.RowColumnSampler;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -1530,7 +1531,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
String[] lines = ts.output.get().split("\n");
String last = lines[lines.length - 1];
String[] parts = last.split("\\|");
- assertEquals(12, parts.length);
+ assertEquals(13, parts.length);
ts.exec("deletetable -f " + table, true);
}
@@ -1669,18 +1670,19 @@ public class ShellServerIT extends
SharedMiniClusterBase {
continue;
}
String[] parts = scan.split("\\|");
- assertEquals(14, parts.length, "Expected 14 colums, but found " +
parts.length
+ assertEquals(15, parts.length, "Expected 15 colums, but found " +
parts.length
+ " instead for '" + Arrays.toString(parts) + "'");
- String tserver = parts[0].trim();
+ String tserver = parts[1].trim();
// TODO: any way to tell if the client address is accurate? could be
local IP, host,
// loopback...?
String hostPortPattern = ".+:\\d+";
assertMatches(tserver, hostPortPattern);
-
assertTrue(accumuloClient.instanceOperations().getTabletServers().contains(tserver));
+
assertTrue(accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)
+ .stream().anyMatch((srv) ->
srv.toHostPortString().equals(tserver)));
String client = parts[1].trim();
assertMatches(client, hostPortPattern);
// Scan ID should be a long (throwing an exception if it fails to
parse)
- Long r = Long.parseLong(parts[11].trim());
+ Long r = Long.parseLong(parts[12].trim());
assertNotNull(r);
}
}