This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new be5e4751f7 Avoids scanning all servers when looking for a single
server (#4987)
be5e4751f7 is described below
commit be5e4751f7200a07f4f2d262d81a994e9822e203
Author: Keith Turner <[email protected]>
AuthorDate: Thu Oct 17 17:47:36 2024 -0400
Avoids scanning all servers when looking for a single server (#4987)
Modified code that finds a server in zookeeper to avoid scanning all
servers when looking for single server.
fixes #4961
---
.../accumulo/core/clientImpl/ClientContext.java | 7 +-
.../core/clientImpl/InstanceOperationsImpl.java | 27 +++--
.../core/clientImpl/ZookeeperLockChecker.java | 8 +-
.../accumulo/core/lock/ServiceLockPaths.java | 81 ++++++++++++---
.../core/metadata/schema/TabletMetadata.java | 5 +-
.../accumulo/core/rpc/clients/TServerClient.java | 17 ++--
.../util/compaction/ExternalCompactionUtil.java | 9 +-
.../core/clientImpl/ZookeeperLockCheckerTest.java | 6 +-
.../accumulo/core/lock/ServiceLockPathsTest.java | 110 +++++++++++++--------
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 +-
.../accumulo/server/manager/LiveTServerSet.java | 7 +-
.../server/manager/state/DeadServerList.java | 3 +-
.../accumulo/server/util/AccumuloStatus.java | 3 +-
.../org/apache/accumulo/server/util/Admin.java | 6 +-
.../accumulo/server/util/ServiceStatusCmd.java | 7 +-
.../accumulo/server/util/TabletServerLocks.java | 6 +-
.../org/apache/accumulo/server/util/ZooZap.java | 7 +-
.../org/apache/accumulo/server/util/AdminTest.java | 1 -
.../java/org/apache/accumulo/manager/Manager.java | 3 +-
.../java/org/apache/accumulo/test/RecoveryIT.java | 5 +-
.../test/ScanServerConcurrentTabletScanIT.java | 3 +-
.../test/ScanServerGroupConfigurationIT.java | 13 +--
.../org/apache/accumulo/test/ScanServerIT.java | 3 +-
.../accumulo/test/ScanServerMetadataEntriesIT.java | 3 +-
.../accumulo/test/ScanServerMultipleScansIT.java | 3 +-
.../apache/accumulo/test/ScanServerShutdownIT.java | 5 +-
.../CompactionPriorityQueueMetricsIT.java | 3 +-
.../accumulo/test/fate/FateOpsCommandsIT.java | 3 +-
.../test/functional/MemoryStarvedMajCIT.java | 4 +-
.../test/functional/MemoryStarvedScanIT.java | 3 +-
.../functional/TabletManagementIteratorIT.java | 3 +-
.../functional/TabletResourceGroupBalanceIT.java | 3 +-
.../accumulo/test/lock/ServiceLockPathsIT.java | 70 +++++++------
33 files changed, 278 insertions(+), 166 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 65ad0556a0..f5bc596872 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -86,6 +86,7 @@ import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -196,8 +197,8 @@ public class ClientContext implements AccumuloClient {
@Override
public Supplier<Collection<ScanServerInfo>> getScanServers() {
- return () -> getServerPaths().getScanServer(rg -> true, addr ->
true, true).stream()
- .map(entry -> new ScanServerInfo() {
+ return () -> getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true)
+ .stream().map(entry -> new ScanServerInfo() {
@Override
public String getAddress() {
return entry.getServer();
@@ -414,7 +415,7 @@ public class ClientContext implements AccumuloClient {
public Map<String,Pair<UUID,String>> getScanServers() {
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
Set<ServiceLockPath> scanServerPaths =
- getServerPaths().getScanServer(rg -> true, addr -> true, true);
+ getServerPaths().getScanServer(rg -> true, AddressSelector.all(),
true);
for (ServiceLockPath path : scanServerPaths) {
try {
ZcStat stat = new ZcStat();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 1ba125ab0f..60ccc9145d 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -59,7 +59,7 @@ import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
-import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@ -237,7 +237,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Deprecated(since = "4.0.0")
public Set<String> getCompactors() {
Set<String> results = new HashSet<>();
- context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
+ context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
true)
.forEach(t -> results.add(t.getServer()));
return results;
}
@@ -246,7 +246,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Deprecated(since = "4.0.0")
public Set<String> getScanServers() {
Set<String> results = new HashSet<>();
- context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
+ context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(),
true)
.forEach(t -> results.add(t.getServer()));
return results;
}
@@ -255,7 +255,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Deprecated(since = "4.0.0")
public List<String> getTabletServers() {
List<String> results = new ArrayList<>();
- context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true)
.forEach(t -> results.add(t.getServer()));
return results;
}
@@ -482,7 +482,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
final ResourceGroupPredicate rg =
resourceGroup == null ? rgt -> true : rgt -> rgt.equals(resourceGroup);
- final AddressPredicate hp =
AddressPredicate.exact(HostAndPort.fromParts(host, port));
+ final AddressSelector hp =
AddressSelector.exact(HostAndPort.fromParts(host, port));
switch (type) {
case COMPACTOR:
@@ -526,8 +526,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Override
public Set<ServerId> getServers(ServerId.Type type) {
- AddressPredicate addressPredicate = addr -> true;
- return getServers(type, rg -> true, addressPredicate);
+ return getServers(type, rg -> true, AddressSelector.all());
}
@Override
@@ -537,22 +536,22 @@ public class InstanceOperationsImpl implements
InstanceOperations {
Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate
was null");
Objects.requireNonNull(hostPortPredicate, "Host port predicate was null");
- AddressPredicate addressPredicate = addr -> {
+ AddressSelector addressPredicate = AddressSelector.matching(addr -> {
var hp = HostAndPort.fromString(addr);
return hostPortPredicate.test(hp.getHost(), hp.getPort());
- };
+ });
return getServers(type, resourceGroupPredicate, addressPredicate);
}
private Set<ServerId> getServers(ServerId.Type type, Predicate<String>
resourceGroupPredicate,
- AddressPredicate addressPredicate) {
+ AddressSelector addressSelector) {
final Set<ServerId> results = new HashSet<>();
switch (type) {
case COMPACTOR:
- context.getServerPaths().getCompactor(resourceGroupPredicate::test,
addressPredicate, true)
+ context.getServerPaths().getCompactor(resourceGroupPredicate::test,
addressSelector, true)
.forEach(c -> results.add(createServerId(type, c)));
break;
case MANAGER:
@@ -562,7 +561,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
String location = null;
if (sld.isPresent()) {
location =
sld.orElseThrow().getAddressString(ThriftService.MANAGER);
- if (addressPredicate.test(location)) {
+ if (addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type,
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
@@ -571,12 +570,12 @@ public class InstanceOperationsImpl implements
InstanceOperations {
}
break;
case SCAN_SERVER:
- context.getServerPaths().getScanServer(resourceGroupPredicate::test,
addressPredicate, true)
+ context.getServerPaths().getScanServer(resourceGroupPredicate::test,
addressSelector, true)
.forEach(s -> results.add(createServerId(type, s)));
break;
case TABLET_SERVER:
context.getServerPaths()
- .getTabletServer(resourceGroupPredicate::test, addressPredicate,
true)
+ .getTabletServer(resourceGroupPredicate::test, addressSelector,
true)
.forEach(t -> results.add(createServerId(type, t)));
break;
default:
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
index 2f0367c102..23e8001deb 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
@@ -22,7 +22,7 @@ import java.util.Set;
import
org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
import org.apache.accumulo.core.lock.ServiceLock;
-import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import com.google.common.net.HostAndPort;
@@ -39,7 +39,7 @@ public class ZookeeperLockChecker implements
TabletServerLockChecker {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
- ctx.getServerPaths().getTabletServer(rg -> true,
AddressPredicate.exact(hostAndPort), true);
+ ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.exact(hostAndPort), true);
return !tservers.isEmpty();
}
@@ -48,7 +48,7 @@ public class ZookeeperLockChecker implements
TabletServerLockChecker {
// ServiceLockPaths only returns items that have a lock
var hostAndPort = HostAndPort.fromString(server);
Set<ServiceLockPath> tservers =
- ctx.getServerPaths().getTabletServer(rg -> true,
AddressPredicate.exact(hostAndPort), true);
+ ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.exact(hostAndPort), true);
for (ServiceLockPath slp : tservers) {
if (ServiceLock.getSessionId(ctx.getZooCache(), slp) ==
Long.parseLong(session, 16)) {
return true;
@@ -60,7 +60,7 @@ public class ZookeeperLockChecker implements
TabletServerLockChecker {
@Override
public void invalidateCache(String tserver) {
var hostAndPort = HostAndPort.fromString(tserver);
- ctx.getServerPaths().getTabletServer(rg -> true,
AddressPredicate.exact(hostAndPort), false)
+ ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.exact(hostAndPort), false)
.forEach(slp -> {
ctx.getZooCache().clear(slp.toString());
});
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 38cad55da5..fd3d765a3b 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.lock;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -285,7 +286,7 @@ public class ServiceLockPaths {
}
public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate
resourceGroupPredicate,
- AddressPredicate address, boolean withLock) {
+ AddressSelector address, boolean withLock) {
return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address,
withLock);
}
@@ -295,7 +296,8 @@ public class ServiceLockPaths {
* the ZooKeeper path.
*/
public ServiceLockPath getGarbageCollector(boolean withLock) {
- Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr ->
true, withLock);
+ Set<ServiceLockPath> results =
+ get(Constants.ZGC_LOCK, rg -> true, AddressSelector.all(), withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -309,7 +311,8 @@ public class ServiceLockPaths {
* InstanceOperations.getServers(ServerId.Type.MANAGER) to get the location.
*/
public ServiceLockPath getManager(boolean withLock) {
- Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true,
addr -> true, withLock);
+ Set<ServiceLockPath> results =
+ get(Constants.ZMANAGER_LOCK, rg -> true, AddressSelector.all(),
withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -318,7 +321,8 @@ public class ServiceLockPaths {
}
public ServiceLockPath getMonitor(boolean withLock) {
- Set<ServiceLockPath> results = get(Constants.ZMONITOR_LOCK, rg -> true,
addr -> true, withLock);
+ Set<ServiceLockPath> results =
+ get(Constants.ZMONITOR_LOCK, rg -> true, AddressSelector.all(),
withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -327,17 +331,17 @@ public class ServiceLockPaths {
}
public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate
resourceGroupPredicate,
- AddressPredicate address, boolean withLock) {
+ AddressSelector address, boolean withLock) {
return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock);
}
public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate
resourceGroupPredicate,
- AddressPredicate address, boolean withLock) {
+ AddressSelector address, boolean withLock) {
return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock);
}
public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate
resourceGroupPredicate,
- AddressPredicate address, boolean withLock) {
+ AddressSelector address, boolean withLock) {
return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address,
withLock);
}
@@ -345,11 +349,41 @@ public class ServiceLockPaths {
}
- public interface AddressPredicate extends Predicate<String> {
+ public static class AddressSelector {
+ private final Predicate<String> predicate;
+ private final HostAndPort exactAddress;
- static AddressPredicate exact(HostAndPort hostAndPort) {
- Objects.requireNonNull(hostAndPort);
- AddressPredicate predicate = addr ->
hostAndPort.equals(HostAndPort.fromString(addr));
+ private AddressSelector(Predicate<String> predicate, HostAndPort
exactAddress) {
+ Preconditions.checkArgument((predicate == null && exactAddress != null)
+ || (predicate != null && exactAddress == null));
+ if (predicate == null) {
+ String hp = exactAddress.toString();
+ this.predicate = addr -> addr.equals(hp);
+ } else {
+ this.predicate = predicate;
+ }
+ this.exactAddress = exactAddress;
+ }
+
+ public static AddressSelector exact(HostAndPort hostAndPort) {
+ return new AddressSelector(null, hostAndPort);
+ }
+
+ public static AddressSelector matching(Predicate<String> predicate) {
+ return new AddressSelector(predicate, null);
+ }
+
+ private static AddressSelector ALL = new AddressSelector(s -> true, null);
+
+ public static AddressSelector all() {
+ return ALL;
+ }
+
+ public HostAndPort getExactAddress() {
+ return exactAddress;
+ }
+
+ public Predicate<String> getPredicate() {
return predicate;
}
}
@@ -361,18 +395,18 @@ public class ServiceLockPaths {
* @param serverType type of lock, should be something like
Constants.ZTSERVERS or
* Constants.ZMANAGER_LOCK
* @param resourceGroupPredicate only returns servers in resource groups
that pass this predicate
- * @param addressPredicate only return servers that match this predicate
+ * @param addressSelector only return servers that meet this criteria
* @param withLock supply true if you only want to return servers that have
an active lock. Not
* applicable for types that don't use a lock (e.g. dead tservers)
* @return set of ServiceLockPath objects for the paths found based on the
search criteria
*/
private Set<ServiceLockPath> get(final String serverType,
- ResourceGroupPredicate resourceGroupPredicate, AddressPredicate
addressPredicate,
+ ResourceGroupPredicate resourceGroupPredicate, AddressSelector
addressSelector,
boolean withLock) {
Objects.requireNonNull(serverType);
Objects.requireNonNull(resourceGroupPredicate);
- Objects.requireNonNull(addressPredicate);
+ Objects.requireNonNull(addressSelector);
final Set<ServiceLockPath> results = new HashSet<>();
final String typePath = ctx.getZooKeeperRoot() + serverType;
@@ -395,7 +429,24 @@ public class ServiceLockPaths {
final List<String> resourceGroups = cache.getChildren(typePath);
for (final String group : resourceGroups) {
if (resourceGroupPredicate.test(group)) {
- final List<String> servers = cache.getChildren(typePath + "/" +
group);
+ final Collection<String> servers;
+ final Predicate<String> addressPredicate;
+
+ if (addressSelector.getExactAddress() != null) {
+ var server = addressSelector.getExactAddress().toString();
+ if (withLock || cache.get(typePath + "/" + group + "/" + server)
!= null) {
+ // When withLock is true the server in the list may not exist in
zookeeper, if it does
+ // not exist then no lock will be found later when looking for a
lock in zookeeper.
+ servers = List.of(server);
+ } else {
+ servers = List.of();
+ }
+ addressPredicate = s -> true;
+ } else {
+ servers = cache.getChildren(typePath + "/" + group);
+ addressPredicate = addressSelector.getPredicate();
+ }
+
for (final String server : servers) {
if (addressPredicate.test(server)) {
final ServiceLockPath slp =
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index c9f9059b10..795ebfafed 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -633,8 +634,8 @@ public class TabletMetadata {
public static synchronized Set<TServerInstance>
getLiveTServers(ClientContext context) {
final Set<TServerInstance> liveServers = new HashSet<>();
- for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg ->
true, addr -> true,
- true)) {
+ for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg ->
true,
+ AddressSelector.all(), true)) {
checkTabletServer(context, slp).ifPresent(liveServers::add);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 213e8ffec3..e7236a1365 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -39,7 +39,7 @@ import
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
-import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
@@ -88,16 +88,19 @@ public interface TServerClient<C extends TServiceClient> {
// correct one.
HostAndPort hp = HostAndPort.fromString(debugHost);
serverPaths.addAll(
- context.getServerPaths().getCompactor(rg -> true,
AddressPredicate.exact(hp), true));
+ context.getServerPaths().getCompactor(rg -> true,
AddressSelector.exact(hp), true));
serverPaths.addAll(
- context.getServerPaths().getScanServer(rg -> true,
AddressPredicate.exact(hp), true));
+ context.getServerPaths().getScanServer(rg -> true,
AddressSelector.exact(hp), true));
serverPaths.addAll(
- context.getServerPaths().getTabletServer(rg -> true,
AddressPredicate.exact(hp), true));
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.exact(hp), true));
} else {
- serverPaths.addAll(context.getServerPaths().getTabletServer(rg -> true,
addr -> true, true));
+ serverPaths.addAll(
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true));
if (type == ThriftClientTypes.CLIENT) {
- serverPaths.addAll(context.getServerPaths().getCompactor(rg -> true,
addr -> true, true));
- serverPaths.addAll(context.getServerPaths().getScanServer(rg -> true,
addr -> true, true));
+ serverPaths
+ .addAll(context.getServerPaths().getCompactor(rg -> true,
AddressSelector.all(), true));
+ serverPaths.addAll(
+ context.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true));
}
if (serverPaths.isEmpty()) {
if (warned.compareAndSet(false, true)) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 15c3a92019..3d5fc44576 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -40,6 +40,7 @@ import
org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -114,7 +115,7 @@ public class ExternalCompactionUtil {
*/
public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext
context) {
final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>();
- context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
+ context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
true).forEach(slp -> {
groupsAndAddresses.computeIfAbsent(slp.getResourceGroup(), (k) -> new
HashSet<>())
.add(HostAndPort.fromString(slp.getServer()));
});
@@ -201,7 +202,7 @@ public class ExternalCompactionUtil {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build();
- context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
+ context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
true).forEach(slp -> {
final HostAndPort hp = HostAndPort.fromString(slp.getServer());
rcFutures.add(new RunningCompactionFuture(slp,
executor.submit(() -> getRunningCompaction(hp, context))));
@@ -229,7 +230,7 @@ public class ExternalCompactionUtil {
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
List<Future<ExternalCompactionId>> futures = new ArrayList<>();
- context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
+ context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
true).forEach(slp -> {
final HostAndPort hp = HostAndPort.fromString(slp.getServer());
futures.add(executor.submit(() -> getRunningCompactionId(hp, context)));
});
@@ -254,7 +255,7 @@ public class ExternalCompactionUtil {
public static int countCompactors(String groupName, ClientContext context) {
var start = Timer.startNew();
int count = context.getServerPaths()
- .getCompactor(rg -> rg.equals(groupName), addr -> true, true).size();
+ .getCompactor(rg -> rg.equals(groupName), AddressSelector.all(),
true).size();
long elapsed = start.elapsed(MILLISECONDS);
if (elapsed > 100) {
LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count,
groupName);
diff --git
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java
index ed2b6aa3e4..e6c8affbd3 100644
---
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java
@@ -53,10 +53,8 @@ public class ZookeeperLockCheckerTest {
.anyTimes();
expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS))
.andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)).anyTimes();
- expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/"
- +
Constants.DEFAULT_RESOURCE_GROUP_NAME)).andReturn(List.of("server")).anyTimes();
- expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/"
- + Constants.DEFAULT_RESOURCE_GROUP_NAME +
"/server")).andReturn(List.of()).anyTimes();
+ expect(zc.get(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/"
+ + Constants.DEFAULT_RESOURCE_GROUP_NAME + "/server")).andReturn(new
byte[0]).anyTimes();
zc.clear(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/"
+ Constants.DEFAULT_RESOURCE_GROUP_NAME + "/server");
replay(zc);
diff --git
a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
index 27e1e795a2..e35cbd616b 100644
--- a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
@@ -45,7 +45,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
-import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
@@ -382,11 +382,12 @@ public class ServiceLockPathsTest {
() -> ctx.getServerPaths().getCompactor(null, null, true));
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP), null, true));
- assertTrue(ctx.getServerPaths().getCompactor(rg -> true, addr -> true,
true).isEmpty());
+ assertTrue(
+ ctx.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
+ .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.all(), true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressPredicate.exact(hp), true)
+ .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.exact(hp), true)
.isEmpty());
EasyMock.verify(ctx, zc);
@@ -438,11 +439,16 @@ public class ServiceLockPathsTest {
+ HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class)))
.andReturn(sld2.serialize()).anyTimes();
EasyMock.expect(ctx.getServerPaths()).andReturn(new
ServiceLockPaths(ctx)).anyTimes();
+ EasyMock
+ .expect(zc.getChildren(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP
+ "/localhost:1234"))
+ .andReturn(null).anyTimes();
+ EasyMock.expect(zc.get(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP +
"/localhost:1234"))
+ .andReturn(null).anyTimes();
EasyMock.replay(ctx, zc);
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getCompactor(rg -> true, addr -> true, false);
+ ctx.getServerPaths().getCompactor(rg -> true, AddressSelector.all(),
false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZCOMPACTORS, path.getType());
@@ -459,7 +465,7 @@ public class ServiceLockPathsTest {
}
// query for all with locks
- results = ctx.getServerPaths().getCompactor(rg -> true, addr -> true,
true);
+ results = ctx.getServerPaths().getCompactor(rg -> true,
AddressSelector.all(), true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -489,12 +495,12 @@ public class ServiceLockPathsTest {
// query for all in non-existent resource group
results = ctx.getServerPaths().getCompactor(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
- addr -> true, true);
+ AddressSelector.all(), true);
assertEquals(0, results.size());
// query for all in test resource group
- results =
- ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP), addr -> true, true);
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ AddressSelector.all(), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -505,7 +511,7 @@ public class ServiceLockPathsTest {
// query for a specific server
results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(hp), true);
+ AddressSelector.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -515,9 +521,14 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a wrong server
- results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(HostAndPort.fromString("localhost:1234")),
true);
- assertEquals(0, results.size());
+ for (boolean withLock : new boolean[] {true, false}) {
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ AddressSelector.exact(HostAndPort.fromString("localhost:1234")),
withLock);
+ assertEquals(0, results.size());
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ AddressSelector.matching(hp -> hp.equals("localhost:1234")),
withLock);
+ assertEquals(0, results.size());
+ }
EasyMock.verify(ctx, zc);
@@ -539,11 +550,13 @@ public class ServiceLockPathsTest {
() -> ctx.getServerPaths().getScanServer(null, null, true));
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP), null, true));
- assertTrue(ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true).isEmpty());
+ assertTrue(
+ ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(),
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
+ .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.all(), true)
+ .isEmpty());
assertTrue(ctx.getServerPaths()
- .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressPredicate.exact(hp), true)
+ .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.exact(hp), true)
.isEmpty());
EasyMock.verify(ctx, zc);
@@ -594,11 +607,14 @@ public class ServiceLockPathsTest {
ROOT + ZSSERVERS + "/" + DEFAULT_RESOURCE_GROUP_NAME + "/" +
HOSTNAME + "/" + svcLock1),
EasyMock.isA(ZcStat.class))).andReturn(sld2.serialize()).anyTimes();
EasyMock.expect(ctx.getServerPaths()).andReturn(new
ServiceLockPaths(ctx)).anyTimes();
+ EasyMock
+ .expect(zc.getChildren(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP +
"/localhost:1234"))
+ .andReturn(null).anyTimes();
EasyMock.replay(ctx, zc);
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getScanServer(rg -> true, addr -> true, false);
+ ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(),
false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZSSERVERS, path.getType());
@@ -615,7 +631,7 @@ public class ServiceLockPathsTest {
}
// query for all with lock
- results = ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true);
+ results = ctx.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -643,12 +659,12 @@ public class ServiceLockPathsTest {
// query for all in non-existent resource group
results = ctx.getServerPaths().getScanServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
- addr -> true, true);
+ AddressSelector.all(), true);
assertEquals(0, results.size());
// query for all in test resource group
- results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP), addr -> true,
- true);
+ results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ AddressSelector.all(), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -659,7 +675,7 @@ public class ServiceLockPathsTest {
// query for a specific server
results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(hp), true);
+ AddressSelector.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -670,7 +686,7 @@ public class ServiceLockPathsTest {
// query for a wrong server
results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(HostAndPort.fromString("localhost:1234")),
true);
+ AddressSelector.exact(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
@@ -693,11 +709,13 @@ public class ServiceLockPathsTest {
() -> ctx.getServerPaths().getTabletServer(null, null, true));
assertThrows(NullPointerException.class, () -> ctx.getServerPaths()
.getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true));
- assertTrue(ctx.getServerPaths().getTabletServer(rg -> true, addr -> true,
true).isEmpty());
+ assertTrue(
+ ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
+ .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.all(), true)
+ .isEmpty());
assertTrue(ctx.getServerPaths()
- .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressPredicate.exact(hp), true)
+ .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.exact(hp), true)
.isEmpty());
EasyMock.verify(ctx, zc);
@@ -748,11 +766,14 @@ public class ServiceLockPathsTest {
ROOT + ZTSERVERS + "/" + DEFAULT_RESOURCE_GROUP_NAME + "/" +
HOSTNAME + "/" + svcLock1),
EasyMock.isA(ZcStat.class))).andReturn(sld2.serialize()).anyTimes();
EasyMock.expect(ctx.getServerPaths()).andReturn(new
ServiceLockPaths(ctx)).anyTimes();
+ EasyMock
+ .expect(zc.getChildren(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP +
"/localhost:1234"))
+ .andReturn(null).anyTimes();
EasyMock.replay(ctx, zc);
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, false);
+ ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZTSERVERS, path.getType());
@@ -769,7 +790,7 @@ public class ServiceLockPathsTest {
}
// query for all with lock
- results = ctx.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
+ results = ctx.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -797,12 +818,12 @@ public class ServiceLockPathsTest {
// query for all in non-existent resource group
results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
- addr -> true, true);
+ AddressSelector.all(), true);
assertEquals(0, results.size());
// query for all in test resource group
results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- addr -> true, true);
+ AddressSelector.all(), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -813,7 +834,7 @@ public class ServiceLockPathsTest {
// query for a specific server
results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(hp), true);
+ AddressSelector.exact(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -824,7 +845,7 @@ public class ServiceLockPathsTest {
// query for a wrong server
results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(HostAndPort.fromString("localhost:1234")),
true);
+ AddressSelector.exact(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
@@ -847,11 +868,14 @@ public class ServiceLockPathsTest {
() -> ctx.getServerPaths().getDeadTabletServer(null, null, false));
assertThrows(NullPointerException.class, () -> ctx.getServerPaths()
.getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null,
false));
- assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, addr ->
true, false).isEmpty());
+ assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true,
AddressSelector.all(), false)
+ .isEmpty());
assertTrue(ctx.getServerPaths()
- .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
true, false).isEmpty());
- assertTrue(ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(hp), false).isEmpty());
+ .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.all(), false)
+ .isEmpty());
+ assertTrue(ctx.getServerPaths()
+ .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP),
AddressSelector.exact(hp), false)
+ .isEmpty());
EasyMock.verify(ctx, zc);
@@ -882,6 +906,8 @@ public class ServiceLockPathsTest {
EasyMock
.expect(zc.getChildren(ROOT + ZDEADTSERVERS + "/" +
TEST_RESOURCE_GROUP + "/" + HOSTNAME))
.andReturn(List.of(svcLock1, svcLock2)).anyTimes();
+ EasyMock.expect(zc.get(ROOT + ZDEADTSERVERS + "/" + TEST_RESOURCE_GROUP +
"/" + HOSTNAME))
+ .andReturn(new byte[0]).anyTimes();
EasyMock
.expect(zc
.getChildren(ROOT + ZDEADTSERVERS + "/" +
DEFAULT_RESOURCE_GROUP_NAME + "/" + HOSTNAME))
@@ -895,11 +921,13 @@ public class ServiceLockPathsTest {
+ HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class)))
.andReturn(sld2.serialize()).anyTimes();
EasyMock.expect(ctx.getServerPaths()).andReturn(new
ServiceLockPaths(ctx)).anyTimes();
+ EasyMock.expect(zc.get(ROOT + ZDEADTSERVERS + "/" + TEST_RESOURCE_GROUP +
"/localhost:1234"))
+ .andReturn(null).anyTimes();
EasyMock.replay(ctx, zc);
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true,
false);
+ ctx.getServerPaths().getDeadTabletServer(rg -> true,
AddressSelector.all(), false);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -929,12 +957,12 @@ public class ServiceLockPathsTest {
// query for all in non-existent resource group
results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
- addr -> true, false);
+ AddressSelector.all(), false);
assertEquals(0, results.size());
// query for all in test resource group
results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- addr -> true, false);
+ AddressSelector.all(), false);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -946,7 +974,7 @@ public class ServiceLockPathsTest {
// query for a specific server
results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(hp), false);
+ AddressSelector.exact(hp), false);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -958,7 +986,7 @@ public class ServiceLockPathsTest {
// query for a wrong server
results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
- AddressPredicate.exact(HostAndPort.fromString("localhost:1234")),
false);
+ AddressSelector.exact(HostAndPort.fromString("localhost:1234")),
false);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 5fe5656147..e8a55081c3 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -87,6 +87,7 @@ import
org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
@@ -862,7 +863,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int tsActualCount = 0;
while (tsActualCount < tsExpectedCount) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true);
tsActualCount = tservers.size();
log.info(tsActualCount + " of " + tsExpectedCount + " tablet servers
present in ZooKeeper");
Thread.sleep(500);
@@ -871,7 +872,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int ssActualCount = 0;
while (ssActualCount < ssExpectedCount) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getScanServer(rg -> true, addr -> true,
true);
+ context.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true);
ssActualCount = tservers.size();
log.info(ssActualCount + " of " + ssExpectedCount + " scan servers
present in ZooKeeper");
Thread.sleep(500);
@@ -880,7 +881,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int ecActualCount = 0;
while (ecActualCount < ecExpectedCount) {
Set<ServiceLockPath> compactors =
- context.getServerPaths().getCompactor(rg -> true, addr -> true,
true);
+ context.getServerPaths().getCompactor(rg -> true,
AddressSelector.all(), true);
ecActualCount = compactors.size();
log.info(ecActualCount + " of " + ecExpectedCount + " compactors present
in ZooKeeper");
Thread.sleep(500);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 302e1b1161..b6e51b412f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -40,7 +40,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths;
-import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
@@ -232,7 +232,7 @@ public class LiveTServerSet implements Watcher {
final Set<TServerInstance> updates = new HashSet<>();
final Set<TServerInstance> doomed = new HashSet<>();
final Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
false);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), false);
locklessServers.keySet().retainAll(tservers);
@@ -465,7 +465,8 @@ public class LiveTServerSet implements Watcher {
ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2);
return rgp;
}).orElse(rg -> true);
- AddressPredicate addrPredicate =
address.map(AddressPredicate::exact).orElse(addr -> true);
+ AddressSelector addrPredicate =
+ address.map(AddressSelector::exact).orElse(AddressSelector.all());
Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(rgPredicate, addrPredicate,
false);
if (paths.isEmpty() || paths.size() > 1) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
index c204c04d05..966167d527 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.DeadServer;
import org.apache.accumulo.server.ServerContext;
@@ -68,7 +69,7 @@ public class DeadServerList {
List<DeadServer> result = new ArrayList<>();
try {
Set<ServiceLockPath> deadServers =
- ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true,
false);
+ ctx.getServerPaths().getDeadTabletServer(rg -> true,
AddressSelector.all(), false);
for (ServiceLockPath path : deadServers) {
Stat stat = new Stat();
byte[] data;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
index 9b3229d4d6..f37fea7d0d 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.server.util;
import java.util.Set;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
public class AccumuloStatus {
@@ -32,7 +33,7 @@ public class AccumuloStatus {
*/
public static boolean isAccumuloOffline(ClientContext context) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true);
if (!tservers.isEmpty()) {
return false;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 66b2601b13..ace5a00dba 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -78,7 +78,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.lock.ServiceLock;
-import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.TFateId;
@@ -681,8 +681,8 @@ public class Admin implements KeywordExecutable {
static String qualifyWithZooKeeperSessionId(ClientContext context, ZooCache
zooCache,
String hostAndPort) {
var hpObj = HostAndPort.fromString(hostAndPort);
- Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg
-> true,
- ServiceLockPaths.AddressPredicate.exact(hpObj), true);
+ Set<ServiceLockPath> paths =
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.exact(hpObj), true);
if (paths.size() != 1) {
return hostAndPort;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
index 70cf41f254..f950975c80 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
@@ -111,7 +112,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> compactors =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true);
compactors.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.T_SERVER,
hostsByGroups.keySet(),
@@ -128,7 +129,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> scanServers =
- context.getServerPaths().getScanServer(rg -> true, addr -> true, true);
+ context.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true);
scanServers.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.S_SERVER,
hostsByGroups.keySet(),
@@ -155,7 +156,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> compactors =
- context.getServerPaths().getCompactor(rg -> true, addr -> true, true);
+ context.getServerPaths().getCompactor(rg -> true,
AddressSelector.all(), true);
compactors.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.COMPACTOR,
hostsByGroups.keySet(),
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
index d46ffdb840..369be4bee3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
@@ -26,7 +26,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
-import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.server.ServerContext;
@@ -43,7 +43,7 @@ public class TabletServerLocks {
if (delete == null) {
Set<ServiceLockPath> tabletServers =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
false);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), false);
if (tabletServers.isEmpty()) {
System.err.println("No tservers found in ZK");
}
@@ -65,7 +65,7 @@ public class TabletServerLocks {
} else {
var hostAndPort = HostAndPort.fromString(lock);
Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(rg -> true,
- ServiceLockPaths.AddressPredicate.exact(hostAndPort), true);
+ AddressSelector.exact(hostAndPort), true);
Preconditions.checkArgument(paths.size() == 1,
lock + " does not match a single ZooKeeper TabletServer lock.
matches=" + paths);
ServiceLockPath path = paths.iterator().next();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index 26d0e322dd..473e8a1171 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
@@ -110,7 +111,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapTservers) {
try {
Set<ServiceLockPath> tserverLockPaths =
- context.getServerPaths().getTabletServer(rg -> true, addr ->
true, false);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), false);
for (ServiceLockPath tserverPath : tserverLockPaths) {
message("Deleting " + tserverPath + " from zookeeper", opts);
@@ -132,7 +133,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapCompactors) {
Set<ServiceLockPath> compactorLockPaths =
- context.getServerPaths().getCompactor(rg -> true, addr -> true,
false);
+ context.getServerPaths().getCompactor(rg -> true,
AddressSelector.all(), false);
Set<String> compactorResourceGroupPaths = new HashSet<>();
compactorLockPaths.forEach(p -> compactorResourceGroupPaths
.add(p.toString().substring(0, p.toString().lastIndexOf('/'))));
@@ -150,7 +151,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapScanServers) {
try {
Set<ServiceLockPath> sserverLockPaths =
- context.getServerPaths().getScanServer(rg -> true, addr ->
true, false);
+ context.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), false);
for (ServiceLockPath sserverPath : sserverLockPaths) {
message("Deleting " + sserverPath + " from zookeeper", opts);
if (!zoo.getChildren(sserverPath.toString()).isEmpty()) {
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
index f8ce2fadca..d861bf3781 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -131,7 +131,6 @@ public class AdminTest {
EasyMock.expect(ctx.getZooKeeperRoot()).andReturn("/accumulo/id").anyTimes();
EasyMock.expect(ctx.getZooCache()).andReturn(zc).anyTimes();
EasyMock.expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME));
- EasyMock.expect(zc.getChildren(group)).andReturn(List.of(server));
EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.emptyList());
EasyMock.expect(ctx.getServerPaths()).andReturn(new
ServiceLockPaths(ctx)).anyTimes();
EasyMock.replay(ctx, zc);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8118b2d57c..20e72ca3af 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -88,6 +88,7 @@ import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
@@ -645,7 +646,7 @@ public class Manager extends AbstractServer
while (stillManager()) {
try {
Set<ServiceLockPath> scanServerPaths =
- getContext().getServerPaths().getScanServer(rg -> true, addr ->
true, false);
+ getContext().getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), false);
for (ServiceLockPath path : scanServerPaths) {
ZcStat stat = new ZcStat();
diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
index 2b245a6ce7..a631cf642a 100644
--- a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
@@ -38,6 +38,7 @@ import
org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
@@ -143,11 +144,11 @@ public class RecoveryIT extends AccumuloClusterHarness {
// Stop any running Compactors and ScanServers
control.stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getServerContext().getServerPaths()
- .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
+ .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(),
60_000);
control.stopAllServers(ServerType.SCAN_SERVER);
Wait.waitFor(() -> getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).size() == 0, 60_000);
+ .getScanServer(rg -> true, AddressSelector.all(), true).size() == 0,
60_000);
// Kill the TabletServer in resource group that is hosting the table
List<Process> procs = control.getTabletServers(RESOURCE_GROUP);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
index f80b6364fd..4f1deb88f7 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -92,7 +93,7 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
1, null);
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).isEmpty());
+ .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
index 91e6868665..fcbc4a2fe6 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
@@ -29,6 +29,7 @@ import
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.scan.ScanServerSelector;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
@@ -124,7 +125,7 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
// Ensure no scan servers running
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).isEmpty());
+ .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
@@ -146,10 +147,10 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
// Start a ScanServer. No group specified, should be in the default
group.
getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
"localhost");
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).size() == 1,
30_000);
+ .getScanServer(rg -> true, AddressSelector.all(), true).size() ==
1, 30_000);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
.getScanServer(rg ->
rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME),
- addr -> true, true)
+ AddressSelector.all(), true)
.size() > 0);
assertEquals(ingestedEntryCount, Iterables.size(scanner),
@@ -165,13 +166,13 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
.addScanServerResourceGroup("GROUP1", 1);
getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).size() == 2,
30_000);
+ .getScanServer(rg -> true, AddressSelector.all(), true).size() ==
2, 30_000);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
.getScanServer(rg ->
rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME),
- addr -> true, true)
+ AddressSelector.all(), true)
.size() == 1);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> rg.equals("GROUP1"), addr -> true,
true).size() == 1);
+ .getScanServer(rg -> rg.equals("GROUP1"), AddressSelector.all(),
true).size() == 1);
scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
assertEquals(ingestedEntryCount + additionalIngest1,
Iterables.size(scanner),
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 0bfa121a9e..690ee6d15f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -59,6 +59,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
@@ -115,7 +116,7 @@ public class ScanServerIT extends SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).isEmpty());
+ .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty());
}
@AfterAll
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
index 176fc7dd66..81e133ad45 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.Reference;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -91,7 +92,7 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).isEmpty());
+ .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
index 4c2e37a18e..063b9c5529 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -89,7 +90,7 @@ public class ScanServerMultipleScansIT extends
SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(rg -> true, addr -> true, true).isEmpty());
+ .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty());
}
@AfterAll
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
index 52a831a502..59fba1b2ad 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -85,8 +86,8 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
ServerContext ctx = getCluster().getServerContext();
- Wait.waitFor(
- () -> !ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true).isEmpty());
+ Wait.waitFor(() -> !ctx.getServerPaths().getScanServer(rg -> true,
AddressSelector.all(), true)
+ .isEmpty());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index d569c4f511..d7d923cddf 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -119,7 +120,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
public void setupMetricsTest() throws Exception {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
+ .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(),
60_000);
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index 7676350790..491fc7a2b4 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -101,7 +102,7 @@ public abstract class FateOpsCommandsIT extends
ConfigurableMacBase
// this issue.
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getServerContext().getServerPaths()
- .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000);
+ .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(),
60_000);
}
@Test
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 5464bbcf0e..87cf984f90 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
@@ -139,7 +140,8 @@ public class MemoryStarvedMajCIT extends
SharedMiniClusterBase {
ClientContext ctx = (ClientContext) client;
Wait.waitFor(() -> ctx.getServerPaths()
- .getCompactor(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true)
+ .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
+ AddressSelector.all(), true)
.size() == 1, 60_000);
ServerId csi =
ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR).iterator().next();
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 6f219be23f..1320a6018f 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@ -200,7 +201,7 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
final ZooCache zc = context.getZooCache();
Set<ServiceLockPath> servers =
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
+ context.getServerPaths().getTabletServer(rg -> true,
AddressSelector.all(), true);
for (ServiceLockPath server : servers) {
Optional<ServiceLockData> data = zc.getLockData(server);
if (data != null && data.isPresent()) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 02f2ede2b3..3d44ede9e0 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.state.TabletManagement;
import org.apache.accumulo.core.manager.state.tables.TableState;
@@ -576,7 +577,7 @@ public class TabletManagementIteratorIT extends
AccumuloClusterHarness {
HashSet<TServerInstance> tservers = new HashSet<>();
for (ServiceLockPath tserver : context.getServerPaths().getTabletServer(rg
-> true,
- addr -> true, true)) {
+ AddressSelector.all(), true)) {
try {
long sessionId = ServiceLock.getSessionId(context.getZooCache(),
tserver);
tservers.add(new TServerInstance(tserver.getServer(), sessionId));
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
index ce8d3fee0f..52ee70ea48 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
@@ -44,6 +44,7 @@ import
org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -97,7 +98,7 @@ public class TabletResourceGroupBalanceIT extends
SharedMiniClusterBase {
Map<String,String> tservers = new HashMap<>();
for (ServiceLockPath tserver : cluster.getServerContext().getServerPaths()
- .getTabletServer(rg -> true, addr -> true, true)) {
+ .getTabletServer(rg -> true, AddressSelector.all(), true)) {
tservers.put(tserver.getServer(), tserver.getResourceGroup());
}
return tservers;
diff --git
a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
index dd2e2d079f..36b5a7af00 100644
--- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -51,40 +52,49 @@ public class ServiceLockPathsIT extends
AccumuloClusterHarness {
assertNotNull(paths.getGarbageCollector(true));
assertNotNull(paths.getManager(true));
assertNull(paths.getMonitor(true)); // monitor not started
- assertEquals(2, paths.getTabletServer(rg -> true, addr -> true,
true).size());
- assertEquals(1, paths
- .getTabletServer(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true)
- .size());
- assertEquals(1, paths.getTabletServer(rg -> rg.equals("TTEST"), addr ->
true, true).size());
- assertEquals(0, paths.getTabletServer(rg -> rg.equals("FAKE"), addr ->
true, true).size());
- assertEquals(0, paths.getTabletServer(rg -> rg.equals("CTEST"), addr ->
true, true).size());
- assertEquals(0, paths.getTabletServer(rg -> rg.equals("STEST"), addr ->
true, true).size());
-
- assertEquals(4, paths.getCompactor(rg -> true, addr -> true, true).size());
- assertEquals(1, paths
- .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
addr -> true, true)
- .size());
- assertEquals(3, paths.getCompactor(rg -> rg.equals("CTEST"), addr -> true,
true).size());
- assertEquals(0, paths.getCompactor(rg -> rg.equals("FAKE"), addr -> true,
true).size());
- assertEquals(0, paths.getCompactor(rg -> rg.equals("TTEST"), addr -> true,
true).size());
- assertEquals(0, paths.getCompactor(rg -> rg.equals("STEST"), addr -> true,
true).size());
-
- assertEquals(3, paths.getScanServer(rg -> true, addr -> true,
true).size());
- assertEquals(1, paths
- .getScanServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
addr -> true, true)
- .size());
- assertEquals(2, paths.getScanServer(rg -> rg.equals("STEST"), addr ->
true, true).size());
- assertEquals(0, paths.getScanServer(rg -> rg.equals("FAKE"), addr -> true,
true).size());
- assertEquals(0, paths.getScanServer(rg -> rg.equals("CTEST"), addr ->
true, true).size());
- assertEquals(0, paths.getScanServer(rg -> rg.equals("TTEST"), addr ->
true, true).size());
+ assertEquals(2, paths.getTabletServer(rg -> true, AddressSelector.all(),
true).size());
+ assertEquals(1, paths.getTabletServer(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
+ AddressSelector.all(), true).size());
+ assertEquals(1,
+ paths.getTabletServer(rg -> rg.equals("TTEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getTabletServer(rg -> rg.equals("FAKE"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getTabletServer(rg -> rg.equals("CTEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getTabletServer(rg -> rg.equals("STEST"), AddressSelector.all(),
true).size());
+
+ assertEquals(4, paths.getCompactor(rg -> true, AddressSelector.all(),
true).size());
+ assertEquals(1, paths.getCompactor(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
+ AddressSelector.all(), true).size());
+ assertEquals(3,
+ paths.getCompactor(rg -> rg.equals("CTEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getCompactor(rg -> rg.equals("FAKE"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getCompactor(rg -> rg.equals("TTEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getCompactor(rg -> rg.equals("STEST"), AddressSelector.all(),
true).size());
+
+ assertEquals(3, paths.getScanServer(rg -> true, AddressSelector.all(),
true).size());
+ assertEquals(1, paths.getScanServer(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
+ AddressSelector.all(), true).size());
+ assertEquals(2,
+ paths.getScanServer(rg -> rg.equals("STEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getScanServer(rg -> rg.equals("FAKE"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getScanServer(rg -> rg.equals("CTEST"), AddressSelector.all(),
true).size());
+ assertEquals(0,
+ paths.getScanServer(rg -> rg.equals("TTEST"), AddressSelector.all(),
true).size());
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() -> paths.getCompactor(rg -> true, addr -> true,
true).size() == 0);
+ Wait.waitFor(() -> paths.getCompactor(rg -> true, AddressSelector.all(),
true).size() == 0);
getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER);
- Wait.waitFor(() -> paths.getScanServer(rg -> true, addr -> true,
true).size() == 0);
+ Wait.waitFor(() -> paths.getScanServer(rg -> true, AddressSelector.all(),
true).size() == 0);
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
@@ -96,8 +106,8 @@ public class ServiceLockPathsIT extends
AccumuloClusterHarness {
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
- Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true,
true).size() == 0);
- Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true,
false).size() == 2);
+ Wait.waitFor(() -> paths.getTabletServer(rg -> true,
AddressSelector.all(), true).size() == 0);
+ Wait.waitFor(() -> paths.getTabletServer(rg -> true,
AddressSelector.all(), false).size() == 2);
}