This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new a3c9bf7800 updated ServiceLockPaths to use Predicates (#4943)
a3c9bf7800 is described below
commit a3c9bf78003259f7363f2392a94d84668a2c0f80
Author: Keith Turner <[email protected]>
AuthorDate: Fri Oct 4 09:37:09 2024 -0400
updated ServiceLockPaths to use Predicates (#4943)
Updated ServiceLockPaths to use Predicates for resource group and
hostname matching. Hoping this can lead to more advanced and efficient
server filtering in the shell and monitor.
---
.../accumulo/core/clientImpl/ClientContext.java | 2 +-
.../core/clientImpl/InstanceOperationsImpl.java | 3 +-
.../core/clientImpl/ZookeeperLockChecker.java | 15 +--
.../accumulo/core/lock/ServiceLockPaths.java | 55 +++++-----
.../core/metadata/schema/TabletMetadata.java | 4 +-
.../accumulo/core/rpc/clients/TServerClient.java | 9 +-
.../util/compaction/ExternalCompactionUtil.java | 4 +-
.../accumulo/core/lock/ServiceLockPathsTest.java | 116 ++++++++++-----------
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 +-
.../accumulo/server/manager/LiveTServerSet.java | 14 ++-
.../server/manager/state/DeadServerList.java | 3 +-
.../accumulo/server/util/AccumuloStatus.java | 3 +-
.../org/apache/accumulo/server/util/Admin.java | 5 +-
.../accumulo/server/util/ServiceStatusCmd.java | 7 +-
.../accumulo/server/util/TabletServerLocks.java | 7 +-
.../org/apache/accumulo/server/util/ZooZap.java | 7 +-
.../java/org/apache/accumulo/manager/Manager.java | 4 +-
.../test/ScanServerConcurrentTabletScanIT.java | 3 +-
.../test/ScanServerGroupConfigurationIT.java | 7 +-
.../org/apache/accumulo/test/ScanServerIT.java | 3 +-
.../accumulo/test/ScanServerMetadataEntriesIT.java | 3 +-
.../accumulo/test/ScanServerMultipleScansIT.java | 3 +-
.../apache/accumulo/test/ScanServerShutdownIT.java | 5 +-
.../test/functional/MemoryStarvedScanIT.java | 2 +-
.../functional/TabletManagementIteratorIT.java | 5 +-
.../functional/TabletResourceGroupBalanceIT.java | 3 +-
.../accumulo/test/lock/ServiceLockPathsIT.java | 47 ++++-----
27 files changed, 171 insertions(+), 175 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 83fdd4a0c9..1369094328 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -406,7 +406,7 @@ public class ClientContext implements AccumuloClient {
public Map<String,Pair<UUID,String>> getScanServers() {
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
Set<ServiceLockPath> scanServerPaths =
- getServerPaths().getScanServer(Optional.empty(), Optional.empty(),
true);
+ getServerPaths().getScanServer(rg -> true, addr -> true, true);
for (ServiceLockPath path : scanServerPaths) {
try {
ZcStat stat = new ZcStat();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index a6e7d25fbc..8ea74e90bb 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -236,7 +235,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Override
public List<String> getTabletServers() {
Set<ServiceLockPath> paths =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
List<String> results = new ArrayList<>();
paths.forEach(p -> {
if (!p.getServer().equals("manager")) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
index e91b64ea4b..72e4e15d77 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.clientImpl;
-import java.util.Optional;
import java.util.Set;
import
org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
@@ -37,16 +36,18 @@ public class ZookeeperLockChecker implements
TabletServerLockChecker {
public boolean doesTabletServerLockExist(String server) {
// ServiceLockPaths only returns items that have a lock
- Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(Optional.empty(),
- Optional.of(HostAndPort.fromString(server)), true);
+ var hostAndPort = HostAndPort.fromString(server);
+ Set<ServiceLockPath> tservers =
+ ctx.getServerPaths().getTabletServer(rg -> true, addr ->
addr.equals(hostAndPort), true);
return !tservers.isEmpty();
}
@Override
public boolean isLockHeld(String server, String session) {
// ServiceLockPaths only returns items that have a lock
- Set<ServiceLockPath> tservers =
ctx.getServerPaths().getTabletServer(Optional.empty(),
- Optional.of(HostAndPort.fromString(server)), true);
+ var hostAndPort = HostAndPort.fromString(server);
+ Set<ServiceLockPath> tservers =
+ ctx.getServerPaths().getTabletServer(rg -> true, addr ->
addr.equals(hostAndPort), true);
for (ServiceLockPath slp : tservers) {
if (ServiceLock.getSessionId(ctx.getZooCache(), slp) ==
Long.parseLong(session, 16)) {
return true;
@@ -57,8 +58,8 @@ public class ZookeeperLockChecker implements
TabletServerLockChecker {
@Override
public void invalidateCache(String tserver) {
- ctx.getServerPaths()
- .getTabletServer(Optional.empty(),
Optional.of(HostAndPort.fromString(tserver)), false)
+ var hostAndPort = HostAndPort.fromString(tserver);
+ ctx.getServerPaths().getTabletServer(rg -> true, addr ->
addr.equals(hostAndPort), false)
.forEach(slp -> {
ctx.getZooCache().clear(slp.toString());
});
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 62da673356..30c94e9ab4 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -282,14 +283,13 @@ public class ServiceLockPaths {
serverAddress.toString());
}
- public Set<ServiceLockPath> getCompactor(Optional<String> resourceGroup,
- Optional<HostAndPort> address, boolean withLock) {
- return get(Constants.ZCOMPACTORS, resourceGroup, address, withLock);
+ public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate
resourceGroupPredicate,
+ Predicate<HostAndPort> address, boolean withLock) {
+ return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address,
withLock);
}
public ServiceLockPath getGarbageCollector(boolean withLock) {
- Set<ServiceLockPath> results =
- get(Constants.ZGC_LOCK, Optional.empty(), Optional.empty(), withLock);
+ Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr ->
true, withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -298,8 +298,7 @@ public class ServiceLockPaths {
}
public ServiceLockPath getManager(boolean withLock) {
- Set<ServiceLockPath> results =
- get(Constants.ZMANAGER_LOCK, Optional.empty(), Optional.empty(),
withLock);
+ Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true,
addr -> true, withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -308,8 +307,7 @@ public class ServiceLockPaths {
}
public ServiceLockPath getMonitor(boolean withLock) {
- Set<ServiceLockPath> results =
- get(Constants.ZMONITOR_LOCK, Optional.empty(), Optional.empty(),
withLock);
+ Set<ServiceLockPath> results = get(Constants.ZMONITOR_LOCK, rg -> true,
addr -> true, withLock);
if (results.isEmpty()) {
return null;
} else {
@@ -317,19 +315,23 @@ public class ServiceLockPaths {
}
}
- public Set<ServiceLockPath> getScanServer(Optional<String> resourceGroup,
- Optional<HostAndPort> address, boolean withLock) {
- return get(Constants.ZSSERVERS, resourceGroup, address, withLock);
+ public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate
resourceGroupPredicate,
+ Predicate<HostAndPort> address, boolean withLock) {
+ return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock);
}
- public Set<ServiceLockPath> getTabletServer(Optional<String> resourceGroup,
- Optional<HostAndPort> address, boolean withLock) {
- return get(Constants.ZTSERVERS, resourceGroup, address, withLock);
+ public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate
resourceGroupPredicate,
+ Predicate<HostAndPort> address, boolean withLock) {
+ return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock);
}
- public Set<ServiceLockPath> getDeadTabletServer(Optional<String>
resourceGroup,
- Optional<HostAndPort> address, boolean withLock) {
- return get(Constants.ZDEADTSERVERS, resourceGroup, address, withLock);
+ public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate
resourceGroupPredicate,
+ Predicate<HostAndPort> address, boolean withLock) {
+ return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address,
withLock);
+ }
+
+ public interface ResourceGroupPredicate extends Predicate<String> {
+
}
/**
@@ -338,18 +340,19 @@ public class ServiceLockPaths {
*
* @param serverType type of lock, should be something like
Constants.ZTSERVERS or
* Constants.ZMANAGER_LOCK
- * @param resourceGroup name of resource group, if empty will return all
resource groups
- * @param address address of server (host:port), if empty will return all
addresses
+ * @param resourceGroupPredicate only returns servers in resource groups
that pass this predicate
+ * @param addressPredicate only return servers that match this predicate
* @param withLock supply true if you only want to return servers that have
an active lock. Not
* applicable for types that don't use a lock (e.g. dead tservers)
* @return set of ServiceLockPath objects for the paths found based on the
search criteria
*/
- private Set<ServiceLockPath> get(final String serverType, Optional<String>
resourceGroup,
- Optional<HostAndPort> address, boolean withLock) {
+ private Set<ServiceLockPath> get(final String serverType,
+ ResourceGroupPredicate resourceGroupPredicate, Predicate<HostAndPort>
addressPredicate,
+ boolean withLock) {
Objects.requireNonNull(serverType);
- Objects.requireNonNull(resourceGroup);
- Objects.requireNonNull(address);
+ Objects.requireNonNull(resourceGroupPredicate);
+ Objects.requireNonNull(addressPredicate);
final Set<ServiceLockPath> results = new HashSet<>();
final String typePath = ctx.getZooKeeperRoot() + serverType;
@@ -371,13 +374,13 @@ public class ServiceLockPaths {
|| serverType.equals(Constants.ZTSERVERS) ||
serverType.equals(Constants.ZDEADTSERVERS)) {
final List<String> resourceGroups = cache.getChildren(typePath);
for (final String group : resourceGroups) {
- if (resourceGroup.isEmpty() ||
resourceGroup.orElseThrow().equals(group)) {
+ if (resourceGroupPredicate.test(group)) {
final List<String> servers = cache.getChildren(typePath + "/" +
group);
for (final String server : servers) {
final ZcStat stat = new ZcStat();
final ServiceLockPath slp =
parse(Optional.of(serverType), typePath + "/" + group + "/" +
server);
- if (address.isEmpty() ||
address.orElseThrow().toString().equals(server)) {
+ if (addressPredicate.test(HostAndPort.fromString(server))) {
if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
// Dead TServers don't have lock data
results.add(slp);
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index f3e61aadd1..c9f9059b10 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -633,8 +633,8 @@ public class TabletMetadata {
public static synchronized Set<TServerInstance>
getLiveTServers(ClientContext context) {
final Set<TServerInstance> liveServers = new HashSet<>();
- for (ServiceLockPath slp :
context.getServerPaths().getTabletServer(Optional.empty(),
- Optional.empty(), true)) {
+ for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg ->
true, addr -> true,
+ true)) {
checkTabletServer(context, slp).ifPresent(liveServers::add);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 2c70796095..20436898fe 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -74,12 +74,9 @@ public interface TServerClient<C extends TServiceClient> {
final long rpcTimeout = context.getClientTimeoutInMillis();
final ZooCache zc = context.getZooCache();
final List<ServiceLockPath> serverPaths = new ArrayList<>();
- serverPaths
- .addAll(context.getServerPaths().getCompactor(Optional.empty(),
Optional.empty(), true));
- serverPaths
- .addAll(context.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), true));
- serverPaths
- .addAll(context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true));
+ serverPaths.addAll(context.getServerPaths().getCompactor(rg -> true, addr
-> true, true));
+ serverPaths.addAll(context.getServerPaths().getScanServer(rg -> true, addr
-> true, true));
+ serverPaths.addAll(context.getServerPaths().getTabletServer(rg -> true,
addr -> true, true));
if (serverPaths.isEmpty()) {
if (warned.compareAndSet(false, true)) {
LOG.warn(
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 72a70ce106..7dafb38929 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -115,7 +115,7 @@ public class ExternalCompactionUtil {
*/
public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext
context) {
final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>();
- context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(),
true).forEach(slp -> {
+ context.getServerPaths().getCompactor(rg -> true, addr -> true,
true).forEach(slp -> {
groupsAndAddresses.computeIfAbsent(slp.getResourceGroup(), (k) -> new
HashSet<>())
.add(HostAndPort.fromString(slp.getServer()));
});
@@ -257,7 +257,7 @@ public class ExternalCompactionUtil {
public static int countCompactors(String groupName, ClientContext context) {
var start = Timer.startNew();
int count = context.getServerPaths()
- .getCompactor(Optional.of(groupName), Optional.empty(), true).size();
+ .getCompactor(rg -> rg.equals(groupName), addr -> true, true).size();
long elapsed = start.elapsed(MILLISECONDS);
if (elapsed > 100) {
LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count,
groupName);
diff --git
a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
index 51da0571d1..3f91c756b7 100644
--- a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java
@@ -380,13 +380,13 @@ public class ServiceLockPathsTest {
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getCompactor(null, null, true));
assertThrows(NullPointerException.class,
- () ->
ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP), null,
true));
- assertTrue(
- ctx.getServerPaths().getCompactor(Optional.empty(), Optional.empty(),
true).isEmpty());
+ () -> ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP), null, true));
+ assertTrue(ctx.getServerPaths().getCompactor(rg -> true, addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(),
true).isEmpty());
+ .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp),
true).isEmpty());
+ .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
addr.equals(hp), true)
+ .isEmpty());
EasyMock.verify(ctx, zc);
@@ -441,7 +441,7 @@ public class ServiceLockPathsTest {
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getCompactor(Optional.empty(), Optional.empty(),
false);
+ ctx.getServerPaths().getCompactor(rg -> true, addr -> true, false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZCOMPACTORS, path.getType());
@@ -458,7 +458,7 @@ public class ServiceLockPathsTest {
}
// query for all with locks
- results = ctx.getServerPaths().getCompactor(Optional.empty(),
Optional.empty(), true);
+ results = ctx.getServerPaths().getCompactor(rg -> true, addr -> true,
true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -487,13 +487,13 @@ public class ServiceLockPathsTest {
}
// query for all in non-existent resource group
- results =
ctx.getServerPaths().getCompactor(Optional.of("FAKE_RESOURCE_GROUP"),
- Optional.empty(), true);
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
+ addr -> true, true);
assertEquals(0, results.size());
// query for all in test resource group
results =
- ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP),
Optional.empty(), true);
+ ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP), addr -> true, true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -503,8 +503,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a specific server
- results =
- ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP),
Optional.of(hp), true);
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -514,8 +514,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a wrong server
- results =
ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(HostAndPort.fromString("localhost:1234")), true);
+ results = ctx.getServerPaths().getCompactor(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
@@ -537,13 +537,13 @@ public class ServiceLockPathsTest {
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getScanServer(null, null, true));
assertThrows(NullPointerException.class,
- () ->
ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP), null,
true));
- assertTrue(
- ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(),
true).isEmpty());
+ () -> ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP), null, true));
+ assertTrue(ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(),
true).isEmpty());
+ .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp),
true).isEmpty());
+ .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
addr.equals(hp), true)
+ .isEmpty());
EasyMock.verify(ctx, zc);
@@ -597,7 +597,7 @@ public class ServiceLockPathsTest {
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(),
false);
+ ctx.getServerPaths().getScanServer(rg -> true, addr -> true, false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZSSERVERS, path.getType());
@@ -614,7 +614,7 @@ public class ServiceLockPathsTest {
}
// query for all with lock
- results = ctx.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), true);
+ results = ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -641,12 +641,12 @@ public class ServiceLockPathsTest {
}
// query for all in non-existent resource group
- results =
ctx.getServerPaths().getScanServer(Optional.of("FAKE_RESOURCE_GROUP"),
- Optional.empty(), true);
+ results = ctx.getServerPaths().getScanServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
+ addr -> true, true);
assertEquals(0, results.size());
// query for all in test resource group
- results =
ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP),
Optional.empty(),
+ results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true);
assertEquals(1, results.size());
iter = results.iterator();
@@ -657,8 +657,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a specific server
- results =
- ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP),
Optional.of(hp), true);
+ results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -668,8 +668,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a wrong server
- results =
ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(HostAndPort.fromString("localhost:1234")), true);
+ results = ctx.getServerPaths().getScanServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
@@ -690,14 +690,14 @@ public class ServiceLockPathsTest {
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getTabletServer(null, null, true));
- assertThrows(NullPointerException.class,
- () ->
ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP), null,
true));
- assertTrue(
- ctx.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true).isEmpty());
+ assertThrows(NullPointerException.class, () -> ctx.getServerPaths()
+ .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true));
+ assertTrue(ctx.getServerPaths().getTabletServer(rg -> true, addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(),
true).isEmpty());
+ .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true,
true).isEmpty());
assertTrue(ctx.getServerPaths()
- .getTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp),
true).isEmpty());
+ .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
addr.equals(hp), true)
+ .isEmpty());
EasyMock.verify(ctx, zc);
@@ -751,7 +751,7 @@ public class ServiceLockPathsTest {
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), false);
+ ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, false);
assertEquals(4, results.size());
for (ServiceLockPath path : results) {
assertEquals(ZTSERVERS, path.getType());
@@ -768,7 +768,7 @@ public class ServiceLockPathsTest {
}
// query for all with lock
- results = ctx.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ results = ctx.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -795,13 +795,13 @@ public class ServiceLockPathsTest {
}
// query for all in non-existent resource group
- results =
ctx.getServerPaths().getTabletServer(Optional.of("FAKE_RESOURCE_GROUP"),
- Optional.empty(), true);
+ results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
+ addr -> true, true);
assertEquals(0, results.size());
// query for all in test resource group
- results =
ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.empty(), true);
+ results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> true, true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -811,8 +811,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a specific server
- results =
ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(hp), true);
+ results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(hp), true);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -822,8 +822,8 @@ public class ServiceLockPathsTest {
assertEquals(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" +
HOSTNAME, slp1.toString());
// query for a wrong server
- results =
ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(HostAndPort.fromString("localhost:1234")), true);
+ results = ctx.getServerPaths().getTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
@@ -845,13 +845,13 @@ public class ServiceLockPathsTest {
assertThrows(NullPointerException.class,
() -> ctx.getServerPaths().getDeadTabletServer(null, null, false));
assertThrows(NullPointerException.class, () -> ctx.getServerPaths()
- .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), null, false));
- assertTrue(ctx.getServerPaths().getDeadTabletServer(Optional.empty(),
Optional.empty(), false)
- .isEmpty());
+ .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null,
false));
+ assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, addr ->
true, false).isEmpty());
assertTrue(ctx.getServerPaths()
- .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP),
Optional.empty(), false).isEmpty());
+ .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
true, false).isEmpty());
assertTrue(ctx.getServerPaths()
- .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP),
Optional.of(hp), false).isEmpty());
+ .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr ->
addr.equals(hp), false)
+ .isEmpty());
EasyMock.verify(ctx, zc);
@@ -899,7 +899,7 @@ public class ServiceLockPathsTest {
// query for all
Set<ServiceLockPath> results =
- ctx.getServerPaths().getDeadTabletServer(Optional.empty(),
Optional.empty(), false);
+ ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true,
false);
assertEquals(2, results.size());
Iterator<ServiceLockPath> iter = results.iterator();
ServiceLockPath slp1 = iter.next();
@@ -928,13 +928,13 @@ public class ServiceLockPathsTest {
}
// query for all in non-existent resource group
- results =
ctx.getServerPaths().getDeadTabletServer(Optional.of("FAKE_RESOURCE_GROUP"),
- Optional.empty(), false);
+ results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals("FAKE_RESOURCE_GROUP"),
+ addr -> true, false);
assertEquals(0, results.size());
// query for all in test resource group
- results =
ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.empty(), false);
+ results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> true, false);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -945,8 +945,8 @@ public class ServiceLockPathsTest {
slp1.toString());
// query for a specific server
- results =
ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(hp), false);
+ results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(hp), false);
assertEquals(1, results.size());
iter = results.iterator();
slp1 = iter.next();
@@ -957,8 +957,8 @@ public class ServiceLockPathsTest {
slp1.toString());
// query for a wrong server
- results =
ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP),
- Optional.of(HostAndPort.fromString("localhost:1234")), false);
+ results = ctx.getServerPaths().getDeadTabletServer(rg ->
rg.equals(TEST_RESOURCE_GROUP),
+ addr -> addr.equals(HostAndPort.fromString("localhost:1234")), false);
assertEquals(0, results.size());
EasyMock.verify(ctx, zc);
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 9970fd387e..5fe5656147 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -44,7 +44,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -863,7 +862,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int tsActualCount = 0;
while (tsActualCount < tsExpectedCount) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
tsActualCount = tservers.size();
log.info(tsActualCount + " of " + tsExpectedCount + " tablet servers
present in ZooKeeper");
Thread.sleep(500);
@@ -872,7 +871,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int ssActualCount = 0;
while (ssActualCount < ssExpectedCount) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getScanServer(rg -> true, addr -> true,
true);
ssActualCount = tservers.size();
log.info(ssActualCount + " of " + ssExpectedCount + " scan servers
present in ZooKeeper");
Thread.sleep(500);
@@ -881,7 +880,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
int ecActualCount = 0;
while (ecActualCount < ecExpectedCount) {
Set<ServiceLockPath> compactors =
- context.getServerPaths().getCompactor(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getCompactor(rg -> true, addr -> true,
true);
ecActualCount = compactors.size();
log.info(ecActualCount + " of " + ecExpectedCount + " compactors present
in ZooKeeper");
Thread.sleep(500);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 305b277bc2..e5ed27b6c7 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -30,6 +30,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -40,6 +41,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -230,7 +232,7 @@ public class LiveTServerSet implements Watcher {
final Set<TServerInstance> updates = new HashSet<>();
final Set<TServerInstance> doomed = new HashSet<>();
final Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), false);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
false);
locklessServers.keySet().retainAll(tservers);
@@ -459,8 +461,16 @@ public class LiveTServerSet implements Watcher {
}
current.remove(address.orElseThrow().toString());
+ ResourceGroupPredicate rgPredicate = resourceGroup.map(rg -> {
+ ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2);
+ return rgp;
+ }).orElse(rg -> true);
+ Predicate<HostAndPort> addrPredicate = address.map(addr -> {
+ Predicate<HostAndPort> ap = addr2 -> addr.equals(addr2);
+ return ap;
+ }).orElse(addr -> true);
Set<ServiceLockPath> paths =
- context.getServerPaths().getTabletServer(resourceGroup, address,
false);
+ context.getServerPaths().getTabletServer(rgPredicate, addrPredicate,
false);
if (paths.isEmpty() || paths.size() > 1) {
log.error("Zero or many zookeeper entries match input arguments.");
} else {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
index bc26293954..c204c04d05 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java
@@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import org.apache.accumulo.core.Constants;
@@ -69,7 +68,7 @@ public class DeadServerList {
List<DeadServer> result = new ArrayList<>();
try {
Set<ServiceLockPath> deadServers =
- ctx.getServerPaths().getDeadTabletServer(Optional.empty(),
Optional.empty(), false);
+ ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true,
false);
for (ServiceLockPath path : deadServers) {
Stat stat = new Stat();
byte[] data;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
index 24b75b89bc..9b3229d4d6 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.server.util;
-import java.util.Optional;
import java.util.Set;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -33,7 +32,7 @@ public class AccumuloStatus {
*/
public static boolean isAccumuloOffline(ClientContext context) {
Set<ServiceLockPath> tservers =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
if (!tservers.isEmpty()) {
return false;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 5e26567ac4..f1a6c492d4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -678,8 +678,9 @@ public class Admin implements KeywordExecutable {
*/
static String qualifyWithZooKeeperSessionId(ClientContext context, ZooCache
zooCache,
String hostAndPort) {
- Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(Optional.empty(),
- Optional.of(HostAndPort.fromString(hostAndPort)), true);
+ var hpObj = HostAndPort.fromString(hostAndPort);
+ Set<ServiceLockPath> paths =
+ context.getServerPaths().getTabletServer(rg -> true, addr ->
addr.equals(hpObj), true);
if (paths.size() != 1) {
return hostAndPort;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
index c14dfe42f4..70cf41f254 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.server.util;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -112,7 +111,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> compactors =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
compactors.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.T_SERVER,
hostsByGroups.keySet(),
@@ -129,7 +128,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> scanServers =
- context.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getScanServer(rg -> true, addr -> true, true);
scanServers.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.S_SERVER,
hostsByGroups.keySet(),
@@ -156,7 +155,7 @@ public class ServiceStatusCmd {
final AtomicInteger errors = new AtomicInteger(0);
final Map<String,Set<String>> hostsByGroups = new TreeMap<>();
final Set<ServiceLockPath> compactors =
- context.getServerPaths().getCompactor(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getCompactor(rg -> true, addr -> true, true);
compactors.forEach(c -> hostsByGroups
.computeIfAbsent(c.getResourceGroup(), (k) -> new
TreeSet<>()).add(c.getServer()));
return new StatusSummary(ServiceStatusReport.ReportKey.COMPACTOR,
hostsByGroups.keySet(),
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
index ab18fc13d0..32255b1e19 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
@@ -42,7 +42,7 @@ public class TabletServerLocks {
if (delete == null) {
Set<ServiceLockPath> tabletServers =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), false);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
false);
if (tabletServers.isEmpty()) {
System.err.println("No tservers found in ZK");
}
@@ -62,8 +62,9 @@ public class TabletServerLocks {
if (lock == null) {
printUsage();
} else {
- Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(Optional.empty(),
- Optional.of(HostAndPort.fromString(lock)), true);
+ var hostAndPort = HostAndPort.fromString(lock);
+ Set<ServiceLockPath> paths =
context.getServerPaths().getTabletServer(rg -> true,
+ addr -> addr.equals(hostAndPort), true);
Preconditions.checkArgument(paths.size() == 1,
lock + " does not match a single ZooKeeper TabletServer lock.
matches=" + paths);
ServiceLockPath path = paths.iterator().next();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index a24b873eb1..26d0e322dd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.server.util;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import org.apache.accumulo.core.cli.Help;
@@ -111,7 +110,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapTservers) {
try {
Set<ServiceLockPath> tserverLockPaths =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), false);
+ context.getServerPaths().getTabletServer(rg -> true, addr ->
true, false);
for (ServiceLockPath tserverPath : tserverLockPaths) {
message("Deleting " + tserverPath + " from zookeeper", opts);
@@ -133,7 +132,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapCompactors) {
Set<ServiceLockPath> compactorLockPaths =
- context.getServerPaths().getCompactor(Optional.empty(),
Optional.empty(), false);
+ context.getServerPaths().getCompactor(rg -> true, addr -> true,
false);
Set<String> compactorResourceGroupPaths = new HashSet<>();
compactorLockPaths.forEach(p -> compactorResourceGroupPaths
.add(p.toString().substring(0, p.toString().lastIndexOf('/'))));
@@ -151,7 +150,7 @@ public class ZooZap implements KeywordExecutable {
if (opts.zapScanServers) {
try {
Set<ServiceLockPath> sserverLockPaths =
- context.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), false);
+ context.getServerPaths().getScanServer(rg -> true, addr ->
true, false);
for (ServiceLockPath sserverPath : sserverLockPaths) {
message("Deleting " + sserverPath + " from zookeeper", opts);
if (!zoo.getChildren(sserverPath.toString()).isEmpty()) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index ae75437225..7fd30d6003 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -644,8 +644,8 @@ public class Manager extends AbstractServer
while (stillManager()) {
try {
- Set<ServiceLockPath> scanServerPaths = getContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), false);
+ Set<ServiceLockPath> scanServerPaths =
+ getContext().getServerPaths().getScanServer(rg -> true, addr ->
true, false);
for (ServiceLockPath path : scanServerPaths) {
ZcStat stat = new ZcStat();
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
index 6022f53b62..f80b6364fd 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Properties;
import org.apache.accumulo.core.client.Accumulo;
@@ -93,7 +92,7 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
1, null);
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty());
+ .getScanServer(rg -> true, addr -> true, true).isEmpty());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
index 54a641c7ac..47365f22ef 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Map;
-import java.util.Optional;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -126,7 +125,7 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
// Ensure no scan servers running
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty());
+ .getScanServer(rg -> true, addr -> true, true).isEmpty());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
@@ -148,7 +147,7 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
// Start a ScanServer. No group specified, should be in the default
group.
getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
"localhost");
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).size() ==
1, 30_000);
+ .getScanServer(rg -> true, addr -> true, true).size() == 1,
30_000);
Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream().anyMatch(
(p) ->
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
== true);
@@ -166,7 +165,7 @@ public class ScanServerGroupConfigurationIT extends
SharedMiniClusterBase {
.addScanServerResourceGroup("GROUP1", 1);
getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
Wait.waitFor(() -> getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).size() ==
2, 30_000);
+ .getScanServer(rg -> true, addr -> true, true).size() == 2,
30_000);
Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream().anyMatch(
(p) ->
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
== true);
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 3b12abaa59..0bfa121a9e 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -116,7 +115,7 @@ public class ScanServerIT extends SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty());
+ .getScanServer(rg -> true, addr -> true, true).isEmpty());
}
@AfterAll
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
index a6ea8d5648..176fc7dd66 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -92,7 +91,7 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty());
+ .getScanServer(rg -> true, addr -> true, true).isEmpty());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
index 4df1a4b8ad..4c2e37a18e 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
@@ -90,7 +89,7 @@ public class ScanServerMultipleScansIT extends
SharedMiniClusterBase {
"localhost");
Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths()
- .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty());
+ .getScanServer(rg -> true, addr -> true, true).isEmpty());
}
@AfterAll
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
index 7f53579b6d..99694f633a 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
-import java.util.Optional;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -86,8 +85,8 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
ServerContext ctx = getCluster().getServerContext();
- Wait.waitFor(() -> !ctx.getServerPaths().getScanServer(Optional.empty(),
Optional.empty(), true)
- .isEmpty());
+ Wait.waitFor(
+ () -> !ctx.getServerPaths().getScanServer(rg -> true, addr -> true,
true).isEmpty());
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index ffef68465d..6f219be23f 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -200,7 +200,7 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
final ZooCache zc = context.getZooCache();
Set<ServiceLockPath> servers =
- context.getServerPaths().getTabletServer(Optional.empty(),
Optional.empty(), true);
+ context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true);
for (ServiceLockPath server : servers) {
Optional<ServiceLockData> data = zc.getLockData(server);
if (data != null && data.isPresent()) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 0364bcdcc6..02f2ede2b3 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -36,7 +36,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -576,8 +575,8 @@ public class TabletManagementIteratorIT extends
AccumuloClusterHarness {
tableId -> context.getTableState(tableId) == TableState.ONLINE);
HashSet<TServerInstance> tservers = new HashSet<>();
- for (ServiceLockPath tserver :
context.getServerPaths().getTabletServer(Optional.empty(),
- Optional.empty(), true)) {
+ for (ServiceLockPath tserver : context.getServerPaths().getTabletServer(rg
-> true,
+ addr -> true, true)) {
try {
long sessionId = ServiceLock.getSessionId(context.getZooCache(),
tserver);
tservers.add(new TServerInstance(tserver.getServer(), sessionId));
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
index 36bceaa383..ce8d3fee0f 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
@@ -98,7 +97,7 @@ public class TabletResourceGroupBalanceIT extends
SharedMiniClusterBase {
Map<String,String> tservers = new HashMap<>();
for (ServiceLockPath tserver : cluster.getServerContext().getServerPaths()
- .getTabletServer(Optional.empty(), Optional.empty(), true)) {
+ .getTabletServer(rg -> true, addr -> true, true)) {
tservers.put(tserver.getServer(), tserver.getResourceGroup());
}
return tservers;
diff --git
a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
index a39b85466a..dd2e2d079f 100644
--- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java
@@ -22,8 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import java.util.Optional;
-
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.lock.ServiceLockPaths;
@@ -53,40 +51,40 @@ public class ServiceLockPathsIT extends
AccumuloClusterHarness {
assertNotNull(paths.getGarbageCollector(true));
assertNotNull(paths.getManager(true));
assertNull(paths.getMonitor(true)); // monitor not started
- assertEquals(2, paths.getTabletServer(Optional.empty(), Optional.empty(),
true).size());
+ assertEquals(2, paths.getTabletServer(rg -> true, addr -> true,
true).size());
assertEquals(1, paths
- .getTabletServer(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME),
Optional.empty(), true)
+ .getTabletServer(rg ->
rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true)
.size());
- assertEquals(1, paths.getTabletServer(Optional.of("TTEST"),
Optional.empty(), true).size());
- assertEquals(0, paths.getTabletServer(Optional.of("FAKE"),
Optional.empty(), true).size());
- assertEquals(0, paths.getTabletServer(Optional.of("CTEST"),
Optional.empty(), true).size());
- assertEquals(0, paths.getTabletServer(Optional.of("STEST"),
Optional.empty(), true).size());
+ assertEquals(1, paths.getTabletServer(rg -> rg.equals("TTEST"), addr ->
true, true).size());
+ assertEquals(0, paths.getTabletServer(rg -> rg.equals("FAKE"), addr ->
true, true).size());
+ assertEquals(0, paths.getTabletServer(rg -> rg.equals("CTEST"), addr ->
true, true).size());
+ assertEquals(0, paths.getTabletServer(rg -> rg.equals("STEST"), addr ->
true, true).size());
- assertEquals(4, paths.getCompactor(Optional.empty(), Optional.empty(),
true).size());
+ assertEquals(4, paths.getCompactor(rg -> true, addr -> true, true).size());
assertEquals(1, paths
- .getCompactor(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME),
Optional.empty(), true)
+ .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
addr -> true, true)
.size());
- assertEquals(3, paths.getCompactor(Optional.of("CTEST"), Optional.empty(),
true).size());
- assertEquals(0, paths.getCompactor(Optional.of("FAKE"), Optional.empty(),
true).size());
- assertEquals(0, paths.getCompactor(Optional.of("TTEST"), Optional.empty(),
true).size());
- assertEquals(0, paths.getCompactor(Optional.of("STEST"), Optional.empty(),
true).size());
+ assertEquals(3, paths.getCompactor(rg -> rg.equals("CTEST"), addr -> true,
true).size());
+ assertEquals(0, paths.getCompactor(rg -> rg.equals("FAKE"), addr -> true,
true).size());
+ assertEquals(0, paths.getCompactor(rg -> rg.equals("TTEST"), addr -> true,
true).size());
+ assertEquals(0, paths.getCompactor(rg -> rg.equals("STEST"), addr -> true,
true).size());
- assertEquals(3, paths.getScanServer(Optional.empty(), Optional.empty(),
true).size());
+ assertEquals(3, paths.getScanServer(rg -> true, addr -> true,
true).size());
assertEquals(1, paths
- .getScanServer(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME),
Optional.empty(), true)
+ .getScanServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME),
addr -> true, true)
.size());
- assertEquals(2, paths.getScanServer(Optional.of("STEST"),
Optional.empty(), true).size());
- assertEquals(0, paths.getScanServer(Optional.of("FAKE"), Optional.empty(),
true).size());
- assertEquals(0, paths.getScanServer(Optional.of("CTEST"),
Optional.empty(), true).size());
- assertEquals(0, paths.getScanServer(Optional.of("TTEST"),
Optional.empty(), true).size());
+ assertEquals(2, paths.getScanServer(rg -> rg.equals("STEST"), addr ->
true, true).size());
+ assertEquals(0, paths.getScanServer(rg -> rg.equals("FAKE"), addr -> true,
true).size());
+ assertEquals(0, paths.getScanServer(rg -> rg.equals("CTEST"), addr ->
true, true).size());
+ assertEquals(0, paths.getScanServer(rg -> rg.equals("TTEST"), addr ->
true, true).size());
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
- Wait.waitFor(() -> paths.getCompactor(Optional.empty(), Optional.empty(),
true).size() == 0);
+ Wait.waitFor(() -> paths.getCompactor(rg -> true, addr -> true,
true).size() == 0);
getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER);
- Wait.waitFor(() -> paths.getScanServer(Optional.empty(), Optional.empty(),
true).size() == 0);
+ Wait.waitFor(() -> paths.getScanServer(rg -> true, addr -> true,
true).size() == 0);
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
@@ -98,9 +96,8 @@ public class ServiceLockPathsIT extends
AccumuloClusterHarness {
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
- Wait.waitFor(() -> paths.getTabletServer(Optional.empty(),
Optional.empty(), true).size() == 0);
- Wait.waitFor(
- () -> paths.getTabletServer(Optional.empty(), Optional.empty(),
false).size() == 2);
+ Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true,
true).size() == 0);
+ Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true,
false).size() == 2);
}