This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 0c4c31625b Enable user to specify specific server for Thrift client
calls (#4880)
0c4c31625b is described below
commit 0c4c31625b2eb6680eef4818b61b324a1914a794
Author: Dave Marion <[email protected]>
AuthorDate: Thu Oct 3 16:12:00 2024 -0400
Enable user to specify specific server for Thrift client calls (#4880)
Allow the user to set a system property to the address of a server
to use when making calls to the Client Thrift API. Example:
org.apache.accumulo.client.rpc.debug.host="localhost:1234"
Closes #4823
---
.../accumulo/core/rpc/clients/TServerClient.java | 77 ++++++++++++++++------
.../test/functional/DebugClientConnectionIT.java | 71 ++++++++++++++++++++
2 files changed, 127 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 4027f4b0c9..c09f46ab00 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -23,6 +23,8 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -54,6 +56,8 @@ import com.google.common.net.HostAndPort;
public interface TServerClient<C extends TServiceClient> {
+ static final String DEBUG_HOST = "org.apache.accumulo.client.rpc.debug.host";
+
Pair<String,C> getThriftServerConnection(ClientContext context, boolean
preferCachedConnections)
throws TTransportException;
@@ -62,7 +66,9 @@ public interface TServerClient<C extends TServiceClient> {
ThriftService service) throws TTransportException {
checkArgument(context != null, "context is null");
- if (preferCachedConnections) {
+ final String debugHost = System.getProperty(DEBUG_HOST, null);
+
+ if (preferCachedConnections && debugHost == null) {
Pair<String,TTransport> cachedTransport =
context.getTransportPool().getAnyCachedTransport(type);
if (cachedTransport != null) {
@@ -79,28 +85,40 @@ public interface TServerClient<C extends TServiceClient> {
final ZooCache zc = context.getZooCache();
final List<String> serverPaths = new ArrayList<>();
- zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
- serverPaths.add(tserverZooPath + "/" + tserverAddress);
- });
- if (type == ThriftClientTypes.CLIENT) {
- zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
- serverPaths.add(sserverZooPath + "/" + sserverAddress);
- });
+ if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+ // add all three paths to the set even though they may not be correct.
+ // The entire set will be checked in the code below to validate
+ // that the path is correct and the lock is held and will return the
+ // correct one.
+ serverPaths.add(tserverZooPath + "/" + debugHost);
+ serverPaths.add(sserverZooPath + "/" + debugHost);
zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
- zc.getChildren(compactorZooPath + "/" +
compactorGroup).forEach(compactorAddress -> {
- serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" +
compactorAddress);
- });
+ serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" +
debugHost);
});
- }
-
- if (serverPaths.isEmpty()) {
- if (warned.compareAndSet(false, true)) {
- LOG.warn(
- "There are no servers serving the {} api: check that zookeeper and
accumulo are running.",
- type);
+ } else {
+ zc.getChildren(tserverZooPath).forEach(tserverAddress -> {
+ serverPaths.add(tserverZooPath + "/" + tserverAddress);
+ });
+ if (type == ThriftClientTypes.CLIENT) {
+ zc.getChildren(sserverZooPath).forEach(sserverAddress -> {
+ serverPaths.add(sserverZooPath + "/" + sserverAddress);
+ });
+ zc.getChildren(compactorZooPath).forEach(compactorGroup -> {
+ zc.getChildren(compactorZooPath + "/" +
compactorGroup).forEach(compactorAddress -> {
+ serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" +
compactorAddress);
+ });
+ });
+ }
+ if (serverPaths.isEmpty()) {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn(
+ "There are no servers serving the {} api: check that zookeeper
and accumulo are running.",
+ type);
+ }
+ throw new TTransportException("There are no servers for type: " +
type);
}
- throw new TTransportException("There are no servers for type: " + type);
}
+
Collections.shuffle(serverPaths, RANDOM.get());
for (String serverPath : serverPaths) {
@@ -113,10 +131,19 @@ public interface TServerClient<C extends TServiceClient> {
TTransport transport =
context.getTransportPool().getTransport(type,
tserverClientAddress, rpcTimeout, context,
preferCachedConnections);
C client = ThriftUtil.createClient(type, transport);
+ if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+ LOG.info("Connecting to debug host: {}", debugHost);
+ }
warned.set(false);
return new Pair<String,C>(tserverClientAddress.toString(), client);
} catch (TTransportException e) {
- LOG.trace("Error creating transport to {}", tserverClientAddress);
+ if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+ LOG.error(
+ "Error creating transport to debug host: {}. If this server
is down, then you will need to remove or change the system property {}.",
+ debugHost, DEBUG_HOST);
+ } else {
+ LOG.trace("Error creating transport to {}",
tserverClientAddress);
+ }
continue;
}
}
@@ -127,7 +154,15 @@ public interface TServerClient<C extends TServiceClient> {
LOG.warn("Failed to find an available server in the list of servers: {}
for API type: {}",
serverPaths, type);
}
- throw new TTransportException("Failed to connect to any server for API
type " + type);
+ // Need to throw a different exception, when a TTransportException is
+ // thrown below, then the operation will be retried endlessly.
+ if (type == ThriftClientTypes.CLIENT && debugHost != null) {
+ throw new UncheckedIOException("Error creating transport to debug host:
" + debugHost
+ + ". If this server is down, then you will need to remove or change
the system property "
+ + DEBUG_HOST + ".", new IOException(""));
+ } else {
+ throw new TTransportException("Failed to connect to any server for API
type " + type);
+ }
}
default <R> R execute(Logger LOG, ClientContext context, Exec<R,C> exec)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
new file mode 100644
index 0000000000..9769f4c10e
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.UncheckedIOException;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.rpc.clients.TServerClient;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class DebugClientConnectionIT extends AccumuloClusterHarness {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setNumTservers(2);
+ }
+
+ private List<String> tservers = null;
+
+ @BeforeEach
+ public void getTServerAddresses() {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ tservers = client.instanceOperations().getTabletServers();
+ }
+ assertNotNull(tservers);
+ assertEquals(2, tservers.size());
+ }
+
+ @Test
+ public void testPreferredConnection() throws Exception {
+ System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0));
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ assertNotNull(client.instanceOperations().getSiteConfiguration());
+ }
+ System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1));
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ assertNotNull(client.instanceOperations().getSiteConfiguration());
+ }
+ System.setProperty(TServerClient.DEBUG_HOST, "localhost:1");
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ assertThrows(UncheckedIOException.class,
+ () -> client.instanceOperations().getSiteConfiguration());
+ }
+ }
+}