This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f06db5b626 refactors filtering getServers API call (#4960)
f06db5b626 is described below
commit f06db5b6265025a41584a426a4ede7a7c1d80a29
Author: Keith Turner <[email protected]>
AuthorDate: Tue Oct 8 19:40:32 2024 -0400
refactors filtering getServers API call (#4960)
Changes filtering in the getServers API call so that it can prune
branches while walking the tree of servers in zookeeper.
---
.../core/client/admin/InstanceOperations.java | 8 ++-
.../core/clientImpl/InstanceOperationsImpl.java | 57 +++++++++++++++-------
.../accumulo/core/lock/ServiceLockPaths.java | 7 +--
3 files changed, 50 insertions(+), 22 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 935fcb0283..110812390a 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.client.admin;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -241,10 +242,15 @@ public interface InstanceOperations {
/**
* Returns the servers of a given type that match the given criteria
*
+ * @param resourceGroupPredicate only returns servers where the resource
group matches this
+ * predicate. For the manager it does not have a resoruce group and
this parameters is not
+ * used.
+ * @param hostPortPredicate only returns servers where its host and port
match this predicate.
* @return set of servers of the supplied type matching the supplied test
* @since 4.0.0
*/
- Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test);
+ Set<ServerId> getServers(ServerId.Type type, Predicate<String>
resourceGroupPredicate,
+ BiPredicate<String,Integer> hostPortPredicate);
/**
* List the active scans on a tablet server.
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index fa9dd6ba44..b5646ce294 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -38,9 +38,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
@@ -489,7 +489,7 @@ public class InstanceOperationsImpl implements
InstanceOperations {
throw new IllegalStateException("Multiple servers matching provided
address");
}
case MANAGER:
- Set<ServerId> managers = getServers(type, null);
+ Set<ServerId> managers = getServers(type, rg2 -> true, hp);
if (managers.isEmpty()) {
return null;
} else {
@@ -520,43 +520,64 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Override
public Set<ServerId> getServers(ServerId.Type type) {
- return getServers(type, null);
+ AddressPredicate addressPredicate = addr -> true;
+ return getServers(type, rg -> true, addressPredicate);
}
@Override
- public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId>
test) {
+ public Set<ServerId> getServers(ServerId.Type type, Predicate<String>
resourceGroupPredicate,
+ BiPredicate<String,Integer> hostPortPredicate) {
+ Objects.requireNonNull(type, "Server type was null");
+ Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate
was null");
+ Objects.requireNonNull(hostPortPredicate, "Host port predicate was null");
+
+ AddressPredicate addressPredicate = addr -> {
+ var hp = HostAndPort.fromString(addr);
+ return hostPortPredicate.test(hp.getHost(), hp.getPort());
+ };
+
+ return getServers(type, resourceGroupPredicate, addressPredicate);
+ }
+
+ private Set<ServerId> getServers(ServerId.Type type, Predicate<String>
resourceGroupPredicate,
+ AddressPredicate addressPredicate) {
+
final Set<ServerId> results = new HashSet<>();
+
switch (type) {
case COMPACTOR:
- context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
+ context.getServerPaths().getCompactor(resourceGroupPredicate::test,
addressPredicate, true)
.forEach(c -> results.add(createServerId(type, c)));
break;
case MANAGER:
ServiceLockPath m = context.getServerPaths().getManager(true);
- Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
- String location = null;
- if (sld.isPresent()) {
- location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
- HostAndPort hp = HostAndPort.fromString(location);
- results.add(new ServerId(type,
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
- hp.getPort()));
+ if (m != null) {
+ Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
+ String location = null;
+ if (sld.isPresent()) {
+ location =
sld.orElseThrow().getAddressString(ThriftService.MANAGER);
+ if (addressPredicate.test(location)) {
+ HostAndPort hp = HostAndPort.fromString(location);
+ results.add(new ServerId(type,
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
+ hp.getPort()));
+ }
+ }
}
break;
case SCAN_SERVER:
- context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
+ context.getServerPaths().getScanServer(resourceGroupPredicate::test,
addressPredicate, true)
.forEach(s -> results.add(createServerId(type, s)));
break;
case TABLET_SERVER:
- context.getServerPaths().getTabletServer(rg -> true, addr -> true,
true)
+ context.getServerPaths()
+ .getTabletServer(resourceGroupPredicate::test, addressPredicate,
true)
.forEach(t -> results.add(createServerId(type, t)));
break;
default:
break;
}
- if (test == null) {
- return Collections.unmodifiableSet(results);
- }
- return
results.stream().filter(test).collect(Collectors.toUnmodifiableSet());
+
+ return Collections.unmodifiableSet(results);
}
private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 13752a9ae1..38cad55da5 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
@@ -396,14 +397,14 @@ public class ServiceLockPaths {
if (resourceGroupPredicate.test(group)) {
final List<String> servers = cache.getChildren(typePath + "/" +
group);
for (final String server : servers) {
- final ZcStat stat = new ZcStat();
- final ServiceLockPath slp =
- parse(Optional.of(serverType), typePath + "/" + group + "/" +
server);
if (addressPredicate.test(server)) {
+ final ServiceLockPath slp =
+ parse(Optional.of(serverType), typePath + "/" + group + "/"
+ server);
if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
// Dead TServers don't have lock data
results.add(slp);
} else {
+ final ZcStat stat = new ZcStat();
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache,
slp, stat);
if (!sld.isEmpty()) {
results.add(slp);