This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 14bc7bee48 Expand types of servers that respond to ping command (#4958)
14bc7bee48 is described below
commit 14bc7bee48515d335042cea1cdfb95b1840e161b
Author: Dave Marion <[email protected]>
AuthorDate: Tue Oct 15 10:24:54 2024 -0400
Expand types of servers that respond to ping command (#4958)
Closes #4952
---
.../core/client/admin/InstanceOperations.java | 15 +++++++--
.../core/clientImpl/InstanceOperationsImpl.java | 14 +++++---
.../server/client/ClientServiceHandler.java | 2 +-
.../accumulo/shell/commands/PingCommand.java | 27 ++++++++++++----
.../accumulo/shell/commands/PingIterator.java | 12 +++----
.../apache/accumulo/test/InstanceOperationsIT.java | 36 +++++++++++++++++++++
.../apache/accumulo/test/shell/ShellServerIT.java | 37 ++++++++++++++++++----
7 files changed, 115 insertions(+), 28 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 110812390a..783b353430 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -314,14 +314,23 @@ public interface InstanceOperations {
List<ActiveCompaction> getActiveCompactions() throws AccumuloException,
AccumuloSecurityException;
/**
- * Throws an exception if a tablet server can not be contacted.
+ * Check to see if a server process at the host and port is up and
responding to RPC requests.
*
- * @param tserver The tablet server address. This should be of the form
- * {@code <ip address>:<port>}
+ * @param tserver The server address. This should be of the form {@code <ip
address>:<port>}
+ * @throws AccumuloException if the server cannot be contacted
* @since 1.5.0
*/
void ping(String tserver) throws AccumuloException;
+ /**
+ * Check to see if a server process at the host and port is up and
responding to RPC requests.
+ *
+ * @param server ServerId object for the server to be pinged, only the host
and port is used.
+ * @throws AccumuloException if the server cannot be contacted
+ * @since 4.0.0
+ */
+ void ping(ServerId server) throws AccumuloException;
+
/**
* Test to see if the instance can load the given class as the given type.
This check does not
* consider per table classpaths, see
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index b5646ce294..1ba125ab0f 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -427,15 +428,20 @@ public class InstanceOperationsImpl implements
InstanceOperations {
}
@Override
- public void ping(String tserver) throws AccumuloException {
- try (TTransport transport =
createTransport(AddressUtil.parseAddress(tserver), context)) {
- Client client = createClient(ThriftClientTypes.TABLET_SERVER, transport);
- client.getTabletServerStatus(TraceUtil.traceInfo(), context.rpcCreds());
+ public void ping(String server) throws AccumuloException {
+ try (TTransport transport =
createTransport(AddressUtil.parseAddress(server), context)) {
+ ClientService.Client client = createClient(ThriftClientTypes.CLIENT,
transport);
+ client.ping(context.rpcCreds());
} catch (TException e) {
throw new AccumuloException(e);
}
}
+ @Override
+ public void ping(ServerId server) throws AccumuloException {
+ ping(server.toHostPortString());
+ }
+
@Override
public void waitForBalance() throws AccumuloException {
try {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 61040fc54a..05b62ebe75 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -110,7 +110,7 @@ public class ClientServiceHandler implements
ClientService.Iface {
@Override
public void ping(TCredentials credentials) {
// anybody can call this; no authentication check
- log.info("Manager reports: I just got pinged!");
+ log.info("I just got pinged!");
}
@Override
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
index 006a7e3b12..fb73bd34d7 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java
@@ -31,31 +31,37 @@ import org.apache.commons.cli.Options;
public class PingCommand extends Command {
- private Option tserverOption, disablePaginationOpt;
+ private Option serverOption, tserverOption, disablePaginationOpt;
@Override
public String description() {
- return "ping tablet servers";
+ return "ping compactors, scan servers, or tablet servers";
}
@Override
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws Exception {
- final List<String> tservers = new ArrayList<>();
+ final List<String> servers = new ArrayList<>();
final InstanceOperations instanceOps =
shellState.getAccumuloClient().instanceOperations();
final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt());
if (cl.hasOption(tserverOption.getOpt())) {
- tservers.add(cl.getOptionValue(tserverOption.getOpt()));
+ servers.add(cl.getOptionValue(tserverOption.getOpt()));
+ } else if (cl.hasOption(serverOption.getOpt())) {
+ servers.add(cl.getOptionValue(serverOption.getOpt()));
} else {
+ instanceOps.getServers(ServerId.Type.COMPACTOR)
+ .forEach(s -> servers.add(s.toHostPortString()));
+ instanceOps.getServers(ServerId.Type.SCAN_SERVER)
+ .forEach(s -> servers.add(s.toHostPortString()));
instanceOps.getServers(ServerId.Type.TABLET_SERVER)
- .forEach(s -> tservers.add(s.toHostPortString()));
+ .forEach(s -> servers.add(s.toHostPortString()));
}
- shellState.printLines(new PingIterator(tservers, instanceOps), paginate);
+ shellState.printLines(new PingIterator(servers, instanceOps), paginate);
return 0;
}
@@ -69,7 +75,14 @@ public class PingCommand extends Command {
public Options getOptions() {
final Options opts = new Options();
- tserverOption = new Option("ts", "tabletServer", true, "tablet server to
ping");
+ serverOption =
+ new Option("s", "server", true, "compactor, scan server, or tablet
server address to ping");
+ serverOption.setArgName("server address");
+ opts.addOption(serverOption);
+
+ // Leaving here for backwards compatibility
+ tserverOption = new Option("ts", "tabletServer", true,
+ "compactor, scan server, or tablet server address to ping");
tserverOption.setArgName("tablet server");
opts.addOption(tserverOption);
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java
index bcd3a8a811..c3d81e7f99 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingIterator.java
@@ -29,8 +29,8 @@ class PingIterator implements Iterator<String> {
private Iterator<String> iter;
private InstanceOperations instanceOps;
- PingIterator(List<String> tservers, InstanceOperations instanceOps) {
- iter = tservers.iterator();
+ PingIterator(List<String> servers, InstanceOperations instanceOps) {
+ iter = servers.iterator();
this.instanceOps = instanceOps;
}
@@ -41,15 +41,15 @@ class PingIterator implements Iterator<String> {
@Override
public String next() {
- String tserver = iter.next();
+ String server = iter.next();
try {
- instanceOps.ping(tserver);
+ instanceOps.ping(server);
} catch (AccumuloException e) {
- return tserver + " ERROR " + e.getMessage();
+ return server + " ERROR " + e.getMessage();
}
- return tserver + " OK";
+ return server + " OK";
}
@Override
diff --git
a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
index 5372bad803..3fd18e353d 100644
--- a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
@@ -90,6 +92,40 @@ public class InstanceOperationsIT extends
AccumuloClusterHarness {
}
}
+ @Test
+ public void testPing() throws Exception {
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.COMPACTOR).size() == 3);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).size() == 2);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
1);
+
+ final InstanceOperations io = client.instanceOperations();
+ Set<ServerId> servers = io.getServers(ServerId.Type.COMPACTOR);
+ for (ServerId sid : servers) {
+ io.ping(sid);
+ }
+
+ servers = io.getServers(ServerId.Type.SCAN_SERVER);
+ for (ServerId sid : servers) {
+ io.ping(sid);
+ }
+
+ servers = io.getServers(ServerId.Type.TABLET_SERVER);
+ for (ServerId sid : servers) {
+ io.ping(sid);
+ }
+
+ ServerId fake = new ServerId(ServerId.Type.COMPACTOR,
Constants.DEFAULT_RESOURCE_GROUP_NAME,
+ "localhost", 1024);
+ assertThrows(AccumuloException.class, () -> io.ping(fake));
+ }
+
+ }
+
private void validateAddresses(Collection<String> e, Set<ServerId>
addresses) {
List<String> actual = new ArrayList<>(addresses.size());
addresses.forEach(a -> actual.add(a.toHostPortString()));
diff --git
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index 8ced6d6128..3bd99c5015 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedSet;
import java.util.regex.Pattern;
@@ -1571,16 +1572,38 @@ public class ShellServerIT extends
SharedMiniClusterBase {
@Test
public void ping() throws Exception {
- for (int i = 0; i < 10; i++) {
- ts.exec("ping", true, "OK", true);
- // wait for both tservers to start up
- if (ts.output.get().split("\n").length == 3) {
- break;
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.COMPACTOR).size() == 1);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).size() == 1);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() ==
1);
+
+ Set<ServerId> servers =
client.instanceOperations().getServers(ServerId.Type.COMPACTOR);
+ for (ServerId sid : servers) {
+ ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true);
+ ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true);
+ }
+
+ servers =
client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER);
+ for (ServerId sid : servers) {
+ ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true);
+ ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true);
+ }
+
+ servers =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
+ for (ServerId sid : servers) {
+ ts.exec("ping -s " + sid.toHostPortString(), true, "OK", true);
+ ts.exec("ping -ts " + sid.toHostPortString(), true, "OK", true);
}
- Thread.sleep(SECONDS.toMillis(1));
+ ts.exec("ping", true, "OK", true);
+ // Should be 3, but there is an extra line with a single apostrophe
trailing
+ assertEquals(4, ts.output.get().split("\n").length);
}
- assertEquals(2, ts.output.get().split("\n").length);
+
}
@Test