This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ac1c11dd8 IMPALA-14460: Keep http connections open in impala-shell
ac1c11dd8 is described below
commit ac1c11dd8256e8a81e138f43663de06610441d41
Author: Michael Smith <[email protected]>
AuthorDate: Tue Jan 6 10:13:32 2026 -0800
IMPALA-14460: Keep http connections open in impala-shell
Leave HS2-HTTP connections open and retry on 401 or EPIPE failures to
re-use connections, greatly reducing the number of client connections
needed with the HS2-HTTP protocol. Adds a 'use_new_http_connection'
impala-shell option to restore the old behavior of using a new
connection for each rpc.
Existing test_shell_interactive_reconnect tests that ImpalaShell - the
library implementing the impala-shell CLI - will automatically establish
a new connection with all protocols. Prior to this patch, after
restarting impalad you'd see
2026-01-06 11:13:08 [Warning] close session RPC failed:
<class 'impala_shell.shell_exceptions.RPCException'>
ERROR: Invalid session id: be40a2618203ff7b:beacd4b5d28f7692
Connection lost, reconnecting...
Warning: --connect_timeout_ms is currently ignored with HTTP transport.
Opened TCP connection to localhost:28001
If you instead introduce a load balancer like haproxy and restart the
lb, there's no apparent break because impala-shell would always
establish a new connection.
With this patch, when impalad is restarted we still see the lost session
2026-01-06 11:20:43 [Exception] type=<class 'BrokenPipeError'> in
PingImpalaHS2Service. Num remaining tries: 3 [Errno 32] Broken pipe
Connection closed, reconnecting...
2026-01-06 11:20:43 [Warning] close session RPC failed:
<class 'impala_shell.shell_exceptions.RPCException'>
ERROR: Invalid session id: 6e494c76a9a58278:dbb7016cb5999385
Connection lost, reconnecting...
Warning: --connect_timeout_ms is currently ignored with HTTP transport.
Opened TCP connection to localhost:28000
If the lb is restarted, we now see that the connection is reopened
2026-01-06 11:24:02 [Exception] type=<class 'BrokenPipeError'> in
PingImpalaHS2Service. Num remaining tries: 3 [Errno 32] Broken pipe
Connection closed, reconnecting...
Query: ...
Triggering a retry due to 401 Unauthorized requires Kerberos, since
Basic and Bearer auth always send the Authorization header; it shows
2026-01-06 17:02:27 [Exception] type=
<class 'http.client.RemoteDisconnected'> in ExecuteStatement.
Remote end closed connection without response
2026-01-06 17:02:27 [Exception] type=
<class 'http.client.RemoteDisconnected'> when listing query options.
Num remaining tries: 3 Remote end closed connection without response
2026-01-06 17:02:27 [Exception] type=<class 'ConnectionRefusedError'>
in ExecuteStatement. [Errno 111] Connection refused
2026-01-06 17:02:27 [Exception] type=<class 'ConnectionRefusedError'>
when listing query options. Num remaining tries: 2 [Errno 111]
Connection refused
Connection closed, reconnecting...
Cookies expired, restarting authentication...
Preserving cookies: impala.auth
Connected to localhost:28005
Updates tests that count RPCs via number of connections as re-use means
they're no longer linked. Tests now rely on connection count, which
verifies we're re-using connections.
Adds testReconnect to use a proxy where we can interrupt the
existing connection, which will sometimes trigger "Connection closed,
reconnecting..." I didn't find a way to trigger it consistently in this
test environment.
Adds tests using Kerberos authentication to trigger cookie retry and
"Cookie expired, restarting authentication..."
Generated-by: Github Copilot (GPT-4.1)
Change-Id: Iafb3fc39817e93c691cd993902c6d939a7235a03
Reviewed-on: http://gerrit.cloudera.org:8080/23831
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
---
.../impala/customcluster/LdapImpalaShellTest.java | 93 +++++++++++++---
.../customcluster/LdapKerberosImpalaShellTest.java | 46 ++++++++
.../LdapSearchBindImpalaShellTest.java | 12 ++-
.../LdapSimpleBindImpalaShellTest.java | 11 +-
.../impala/testutil/InterruptibleProxyServer.java | 119 +++++++++++++++++++++
shell/impala_shell/ImpalaHttpClient.py | 62 +++++++++--
shell/impala_shell/impala_client.py | 9 +-
shell/impala_shell/impala_shell.py | 7 +-
shell/impala_shell/option_parser.py | 5 +
tests/custom_cluster/test_hs2_fault_injection.py | 3 +
tests/custom_cluster/test_shell_commandline.py | 24 ++---
tests/custom_cluster/test_shell_jwt_auth.py | 35 +++---
tests/custom_cluster/test_shell_oauth_auth.py | 31 +++---
tests/shell/test_shell_interactive.py | 6 +-
14 files changed, 380 insertions(+), 83 deletions(-)
diff --git
a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
index c7bfe8ce6..1e62919a5 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
@@ -32,16 +32,20 @@ import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.directory.server.annotations.CreateLdapServer;
import org.apache.directory.server.annotations.CreateTransport;
import org.apache.directory.server.core.annotations.ApplyLdifFiles;
import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.apache.impala.testutil.InterruptibleProxyServer;
import org.apache.impala.testutil.WebClient;
import org.junit.After;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Impala shell connectivity tests for LDAP authentication. This class
contains the common
@@ -51,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
transports = { @CreateTransport(protocol = "LDAP", address = "localhost")
})
@ApplyLdifFiles({"users.ldif"})
public class LdapImpalaShellTest {
+ private final static Logger LOG =
LoggerFactory.getLogger(LdapImpalaShellTest.class);
+
@ClassRule
public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
@@ -110,15 +116,19 @@ public class LdapImpalaShellTest {
return protocolsToTest;
}
+ protected String authMethod() {
+ return "basic";
+ }
+
private void verifyMetrics(Range<Long> expectedBasicSuccess,
Range<Long> expectedBasicFailure, Range<Long> expectedCookieSuccess,
Range<Long> expectedCookieFailure) throws Exception {
- long actualBasicSuccess = (long) client_.getMetric(
-
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-success");
+ long actualBasicSuccess = (long) client_.getMetric("impala.thrift-server"
+ + ".hiveserver2-http-frontend.total-" + authMethod() +
"-auth-success");
assertTrue("Expected: " + expectedBasicSuccess + ", Actual: " +
actualBasicSuccess,
expectedBasicSuccess.contains(actualBasicSuccess));
- long actualBasicFailure = (long) client_.getMetric(
-
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-failure");
+ long actualBasicFailure = (long) client_.getMetric("impala.thrift-server"
+ + ".hiveserver2-http-frontend.total-" + authMethod() +
"-auth-failure");
assertTrue("Expected: " + expectedBasicFailure + ", Actual: " +
actualBasicFailure,
expectedBasicFailure.contains(actualBasicFailure));
@@ -304,14 +314,16 @@ public class LdapImpalaShellTest {
/**
* Tests cookie rotation during a query does not interrupt the session.
*/
- protected void testCookieRefreshImpl(File keyFile) throws Exception {
+ protected void testCookieRefreshImpl(File keyFile, String[] env) throws
Exception {
String query = "select sleep(3000)";
- String[] command =
- buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1,
"/cliservice");
+ String[] command = ArrayUtils.add(
+ buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1,
"/cliservice"),
+ "--use_new_http_connection");
+
final RunShellCommand.Output[] resultHolder = new
RunShellCommand.Output[1];
Thread thread = new Thread(() -> {
try {
- resultHolder[0] = RunShellCommand.Run(command,
+ resultHolder[0] = RunShellCommand.Run(command, env,
/* shouldSucceed */ true, /* sleep returns true */ "true",
"Starting Impala Shell with LDAP-based authentication");
} catch (Throwable e) {
@@ -328,23 +340,72 @@ public class LdapImpalaShellTest {
// Loaded authentication key from ...
// Invalid cookie provided
// Closed session
- retryUntilSuccess(() -> {
- long success = (long) client_.getMetric(
-
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-success");
- if (success < 1L) throw new Exception("Authentication not yet
succeeded.");
- return null;
- }, 20, 100);
+ try {
+ retryUntilSuccess(() -> {
+ long success = (long) client_.getMetric("impala.thrift-server."
+ + "hiveserver2-http-frontend.total-" + authMethod() +
"-auth-success");
+ if (success < 1L) throw new Exception("Authentication not yet
succeeded.");
+ return null;
+ }, 20, 100);
+ writeCookieSecret(keyFile);
+ thread.join(5000);
+ } finally {
+ if (resultHolder[0] != null) LOG.info(resultHolder[0].stderr);
+ }
- writeCookieSecret(keyFile);
- thread.join();
RunShellCommand.Output result = resultHolder[0];
assertTrue(result.stderr,
result.stderr.contains("Preserving cookies: impala.auth"));
+ assertTrue(result.stderr, result.stderr.contains("Fetched 1 row"));
// Cookie auth should fail once due to key change, requiring two basic
auths.
// Cookie auth is expected to succeed at least once, possibly many times.
verifyMetrics(Range.closed(2L, 2L), zero, Range.atLeast(1L), one);
}
+ /**
+ * Tests that an interrupted connection reconnects.
+ */
+ protected void testReconnectImpl(String[] env) throws Exception {
+ // impala-shell now re-uses connections. Add an interruptible proxy to
force new
+ // connections to be created.
+ final RunShellCommand.Output[] resultHolder = new
RunShellCommand.Output[1];
+ try (InterruptibleProxyServer proxy =
+ new InterruptibleProxyServer("localhost", 28000)) {
+ proxy.start();
+
+ String query = "select sleep(3000)";
+ String[] command = ArrayUtils.add(
+ buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1,
"/cliservice"),
+ "--impalad=localhost:" + proxy.getLocalPort());
+
+ Thread thread = new Thread(() -> {
+ try {
+ resultHolder[0] = RunShellCommand.Run(command, env,
+ /* shouldSucceed */ true, /* sleep returns true */ "true",
+ "Starting Impala Shell with LDAP-based authentication");
+ } catch (Throwable e) {
+ resultHolder[0] = new RunShellCommand.Output("", e.getMessage());
+ }
+ });
+ thread.start();
+
+ // Wait until the query is started before closing the connection.
+ retryUntilSuccess(() -> {
+ long success = (long)
client_.getMetric("impala-server.num-queries-registered");
+ if (success < 1L) throw new Exception("Query not yet registered.");
+ return null;
+ }, 20, 100);
+ proxy.closeConnections();
+ thread.join(5000);
+ } finally {
+ if (resultHolder[0] != null) LOG.info(resultHolder[0].stderr);
+ }
+
+ // Query should succeed.
+ RunShellCommand.Output result = resultHolder[0];
+ assertTrue(result.stderr, result.stderr.contains("Fetched 1 row"));
+ }
+
/**
* Tests the LDAP user and group filter configs.
*/
diff --git
a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
index 3dd27e1c3..b94ed9642 100644
---
a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
+++
b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
@@ -24,6 +24,7 @@ import
org.apache.directory.server.core.annotations.CreatePartition;
import org.junit.Assume;
import org.junit.Test;
+import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,18 @@ import static org.junit.Assert.assertEquals;
@ApplyLdifFiles({"users.ldif"})
public class LdapKerberosImpalaShellTest extends
LdapKerberosImpalaShellTestBase {
+ @Override
+ protected String authMethod() {
+ return "negotiate";
+ }
+
+ @Override
+ protected String[] buildCommand(
+ String query, String protocol, String user, String password, String
httpPath) {
+ return new String[] {"impala-shell.sh", "--protocol=" + protocol,
"--kerberos",
+ "--user=" + user, "--query=" + query, "--http_path=" + httpPath};
+ }
+
/**
* Tests Kerberos authentication with custom LDAP user and group filter
configs
* with search bind enabled and group filter check disabled.
@@ -435,6 +448,39 @@ public class LdapKerberosImpalaShellTest extends
LdapKerberosImpalaShellTestBase
/* shouldSucceed */ true);
}
+ /**
+ * Tests cookie rotation during a query does not interrupt the session.
+ */
+ @Test
+ public void testCookieRefresh() throws Exception {
+ File cookieSecretFile = getCookieSecretFile();
+ Map<String, String> flags = mergeFlags(
+ kerberosKdcEnvironment.getKerberosAuthFlags(),
+ ImmutableMap.of("cookie_secret_file",
cookieSecretFile.getCanonicalPath())
+ );
+ int ret = startImpalaCluster(flagsToArgs(flags));
+ assertEquals(ret, 0);
+
+ String credentialsCacheFilePath =
+
kerberosKdcEnvironment.createUserPrincipalAndCredentialsCache(TEST_USER_1);
+ testCookieRefreshImpl(cookieSecretFile,
+ kerberosKdcEnvironment.getImpalaShellEnv(credentialsCacheFilePath));
+ }
+
+ /**
+ * Tests that an interrupted connection reconnects.
+ */
+ @Test
+ public void testReconnect() throws Exception {
+ Map<String, String> flags = kerberosKdcEnvironment.getKerberosAuthFlags();
+ int ret = startImpalaCluster(flagsToArgs(flags));
+ assertEquals(ret, 0);
+
+ String credentialsCacheFilePath =
+
kerberosKdcEnvironment.createUserPrincipalAndCredentialsCache(TEST_USER_1);
+
testReconnectImpl(kerberosKdcEnvironment.getImpalaShellEnv(credentialsCacheFilePath));
+ }
+
/**
* Tests Kerberos authentication with custom LDAP user and group filter
configs
* with simple bind enabled and group filter check disabled.
diff --git
a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
index 6e9e0992f..947ea02d0 100644
---
a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
+++
b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
@@ -123,7 +123,17 @@ public class LdapSearchBindImpalaShellTest extends
LdapImpalaShellTest {
setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
+ "--ldap_user_filter=(&(objectClass=person)(cn={0})) "
+ "--cookie_secret_file=%s", cookieSecretFile.getCanonicalPath()));
- testCookieRefreshImpl(cookieSecretFile);
+ testCookieRefreshImpl(cookieSecretFile, null);
+ }
+
+ /**
+ * Tests that an interrupted connection reconnects.
+ */
+ @Test
+ public void testReconnect() throws Exception {
+ setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
+ + "--ldap_user_filter=(&(objectClass=person)(cn={0}))"));
+ testReconnectImpl(null);
}
/**
diff --git
a/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
b/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
index 39a4780fb..47381bb8d 100644
---
a/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
+++
b/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
@@ -117,7 +117,16 @@ public class LdapSimpleBindImpalaShellTest extends
LdapImpalaShellTest {
public void testCookieRefresh() throws Exception {
File cookieSecretFile = getCookieSecretFile();
setUp(String.format("--cookie_secret_file=%s",
cookieSecretFile.getCanonicalPath()));
- testCookieRefreshImpl(cookieSecretFile);
+ testCookieRefreshImpl(cookieSecretFile, null);
+ }
+
+ /**
+ * Tests that an interrupted connection reconnects.
+ */
+ @Test
+ public void testReconnect() throws Exception {
+ setUp("");
+ testReconnectImpl(null);
}
/**
diff --git
a/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java
b/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java
new file mode 100644
index 000000000..dbdb63a2e
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.testutil;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class InterruptibleProxyServer implements AutoCloseable {
+ private final String targetHost;
+ private final int targetPort;
+ private ServerSocket serverSocket;
+ private final Set<Socket> clientSockets = Collections.synchronizedSet(new
HashSet<>());
+ private volatile boolean running = false;
+ private Thread acceptThread;
+
+ public InterruptibleProxyServer(String host, int port) {
+ this.targetHost = host;
+ this.targetPort = port;
+ }
+
+ public void start() throws IOException {
+ serverSocket = new ServerSocket();
+ serverSocket.bind(null);
+ running = true;
+ acceptThread = new Thread(this::acceptLoop,
"InterruptibleProxyServer-Accept");
+ acceptThread.start();
+ }
+
+ public int getLocalPort() {
+ return serverSocket.getLocalPort();
+ }
+
+ private void acceptLoop() {
+ while (running) {
+ try {
+ Socket client = serverSocket.accept();
+ clientSockets.add(client);
+ Thread proxyThread =
+ new Thread(() -> handleClient(client),
"InterruptibleProxyServer-Proxy");
+ proxyThread.start();
+ } catch (IOException e) {
+ if (running)
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void handleClient(Socket client) {
+ try (Socket target = new Socket(targetHost, targetPort)) {
+ clientSockets.add(target);
+ Thread t1 = new Thread(() -> forward(client, target));
+ Thread t2 = new Thread(() -> forward(target, client));
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ } catch (Exception e) {
+ // Ignore
+ } finally {
+ clientSockets.remove(client);
+ try {
+ client.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ private void forward(Socket in, Socket out) {
+ try {
+ byte[] buf = new byte[4096];
+ int len;
+ while ((len = in.getInputStream().read(buf)) != -1) {
+ out.getOutputStream().write(buf, 0, len);
+ out.getOutputStream().flush();
+ }
+ } catch (IOException ignored) {
+ }
+ }
+
+ public void closeConnections() {
+ synchronized (clientSockets) {
+ for (Socket s : clientSockets) {
+ try {
+ s.close();
+ } catch (IOException ignored) {
+ }
+ }
+ clientSockets.clear();
+ }
+ }
+
+ public void close() throws IOException {
+ running = false;
+ if (serverSocket != null)
+ serverSocket.close();
+ if (acceptThread != null)
+ acceptThread.interrupt();
+ closeConnections();
+ }
+}
\ No newline at end of file
diff --git a/shell/impala_shell/ImpalaHttpClient.py
b/shell/impala_shell/ImpalaHttpClient.py
index 38788a9f4..ae9eda609 100644
--- a/shell/impala_shell/ImpalaHttpClient.py
+++ b/shell/impala_shell/ImpalaHttpClient.py
@@ -21,8 +21,10 @@ import base64
from collections import namedtuple
import datetime
from io import BytesIO
+import errno
import os
import os.path
+import socket
import sys
import six
@@ -54,7 +56,7 @@ class ImpalaHttpClient(TTransportBase):
MIN_REQUEST_SIZE_FOR_EXPECT = 1024
def __init__(self, uri_or_host, ssl_context=None, http_cookie_names=None,
- socket_timeout_s=None, verbose=False):
+ socket_timeout_s=None, verbose=False, reuse_connection=True):
"""To properly authenticate against an HTTPS server, provide an
ssl_context created
with ssl.create_default_context() to validate the server certificate.
@@ -63,6 +65,11 @@ class ImpalaHttpClient(TTransportBase):
these names is returned in an http response by the server or an
intermediate proxy
then it will be included in each subsequent request for the same
connection. If it
is set as wildcards, all cookies in an http response will be preserved.
+
+ If reuse_connection is set to True, the underlying HTTP connection will be
reused
+ for multiple requests; it will retry establishing a connection on socket
error in case
+ the server closed it while idle. If set to False, the connection will be
closed and
+ reopened for each request.
"""
parsed = urllib.parse.urlparse(uri_or_host)
self.scheme = parsed.scheme
@@ -123,6 +130,7 @@ class ImpalaHttpClient(TTransportBase):
self.__kerb_service = None
self.__add_custom_headers_funcs = []
self.__verbose = verbose
+ self.__reuse_connection = reuse_connection
@staticmethod
def basic_proxy_auth_header(proxy):
@@ -343,12 +351,13 @@ class ImpalaHttpClient(TTransportBase):
self.__wbuf.write(buf)
def flush(self):
- # Send HTTP request and receive response.
- # Return True if the client should retry this method.
- def sendRequestRecvResp(data):
- if self.isOpen():
+ # Send HTTP request headers. This is repeatable, so if there's a
connection error
+ # like when the connection has been closed it's safe to retry.
+ def sendRequestHeaders(data_len):
+ if not self.__reuse_connection and self.isOpen():
self.close()
- self.open()
+ if not self.isOpen():
+ self.open()
# HTTP request
if self.using_proxy() and self.scheme == "http":
@@ -360,7 +369,6 @@ class ImpalaHttpClient(TTransportBase):
# Write headers
self.__http.putheader('Content-Type', 'application/x-thrift')
- data_len = len(data)
self.__http.putheader('Content-Length', str(data_len))
if data_len > ImpalaHttpClient.MIN_REQUEST_SIZE_FOR_EXPECT:
# Add the 'Expect' header to large requests. Note that we do not
explicitly wait
@@ -384,6 +392,9 @@ class ImpalaHttpClient(TTransportBase):
self.__http.endheaders()
+ # Complete the request by sending data and getting the response. Return
True if the
+ # client should retry this method due to a '401 Unauthorized' response.
+ def sendDataRecvResp(data):
# Write payload
self.__http.send(data)
@@ -404,12 +415,41 @@ class ImpalaHttpClient(TTransportBase):
# Pull data out of buffer
data = self.__wbuf.getvalue()
+ data_len = len(data)
self.__wbuf = BytesIO()
- retry = sendRequestRecvResp(data)
- if retry:
- # Received "401 Unauthorized" response. Delete HTTP cookies and then
retry.
- sendRequestRecvResp(data)
+ # Send the request headers, retrying once if the connection was closed.
Sending
+ # headers is the earliest point that http_client allows us to detect a
closed
+ # connection.
+ retry_because_disconnected = False
+ try:
+ sendRequestHeaders(data_len)
+ except http_client.CannotSendRequest:
+ retry_because_disconnected = True
+ except socket.error as e:
+ if e.errno not in [errno.EPIPE, errno.ECONNRESET]:
+ raise
+ retry_because_disconnected = True
+
+ if retry_because_disconnected and self.__reuse_connection:
+ if self.__verbose:
+ print('Connection closed, reconnecting...', file=sys.stderr)
+ # The underlying socket is broken. Try to reconnect and then retry.
+ self.close()
+ sendRequestHeaders(data_len)
+
+ # Send the data and receive the response. We no longer retry on socket
errors as the
+ # request may have already been partially processed by the server and we
don't want to
+ # repeat it. We do retry on 401 Unauthorized if we sent cookies as they
may have
+ # expired and we don't send the Authorization header with cookies for
Kerberos.
+ retry_because_401 = sendDataRecvResp(data)
+ if retry_because_401:
+ if self.__verbose:
+ print('Cookies expired, restarting authentication...', file=sys.stderr)
+ # Received "401 Unauthorized" response and cookies have been cleaned.
Retry with
+ # the same connection.
+ sendRequestHeaders(data_len)
+ sendDataRecvResp(data)
if self.code >= 300:
# Report any http response code that is not 1XX (informational response)
or
diff --git a/shell/impala_shell/impala_client.py
b/shell/impala_shell/impala_client.py
index 10bc1b963..ea6de5045 100644
--- a/shell/impala_shell/impala_client.py
+++ b/shell/impala_shell/impala_client.py
@@ -160,7 +160,7 @@ class ImpalaClient(object):
verbose=True, use_http_base_transport=False, http_path=None,
http_cookie_names=None, http_socket_timeout_s=None,
value_converter=None,
connect_max_tries=4, rpc_stdout=False, rpc_file=None,
http_tracing=True,
- jwt=None, oauth=None, hs2_x_forward=None):
+ jwt=None, oauth=None, hs2_x_forward=None,
reuse_http_connection=True):
self.connected = False
self.impalad_host = impalad[0]
self.impalad_port = int(impalad[1])
@@ -198,6 +198,7 @@ class ImpalaClient(object):
self.rpc_file = rpc_file
# In h2s-http clients only, the value of the X-Forwarded-For http header.
self.hs2_x_forward = hs2_x_forward
+ self.reuse_http_connection = reuse_http_connection
def connect(self):
"""Creates a connection to an Impalad instance. Returns a tuple with the
impala
@@ -444,12 +445,14 @@ class ImpalaClient(object):
transport = ImpalaHttpClient(url, ssl_context=ssl_ctx,
http_cookie_names=self.http_cookie_names,
socket_timeout_s=self.http_socket_timeout_s,
- verbose=self.verbose)
+ verbose=self.verbose,
+ reuse_connection=self.reuse_http_connection)
else:
url = "http://{0}/{1}".format(host_and_port, self.http_path)
transport = ImpalaHttpClient(url,
http_cookie_names=self.http_cookie_names,
socket_timeout_s=self.http_socket_timeout_s,
- verbose=self.verbose)
+ verbose=self.verbose,
+ reuse_connection=self.reuse_http_connection)
if self.use_ldap:
# Set the BASIC authorization
diff --git a/shell/impala_shell/impala_shell.py
b/shell/impala_shell/impala_shell.py
index d8ed39fca..93d133f8d 100644
--- a/shell/impala_shell/impala_shell.py
+++ b/shell/impala_shell/impala_shell.py
@@ -315,6 +315,7 @@ class ImpalaShell(cmd.Cmd, object):
self.http_cookie_names = options.http_cookie_names
self.http_tracing = not options.no_http_tracing
self.hs2_x_forward = options.hs2_x_forward
+ self.reuse_http_connection = not options.use_new_http_connection
# Due to a readline bug in centos/rhel7, importing it causes control
characters to be
# printed. This breaks any scripting against the shell in non-interactive
mode. Since
@@ -681,7 +682,8 @@ class ImpalaShell(cmd.Cmd, object):
value_converter=value_converter,
rpc_stdout=self.rpc_stdout,
rpc_file=self.rpc_file,
http_tracing=self.http_tracing,
jwt=self.jwt, oauth=self.oauth,
- hs2_x_forward=self.hs2_x_forward)
+ hs2_x_forward=self.hs2_x_forward,
+ reuse_http_connection=self.reuse_http_connection)
if protocol == 'hs2':
return ImpalaHS2Client(self.impalad, self.fetch_size,
self.kerberos_host_fqdn,
self.use_kerberos, self.kerberos_service_name,
self.use_ssl,
@@ -703,7 +705,8 @@ class ImpalaShell(cmd.Cmd, object):
connect_max_tries=self.connect_max_tries,
rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file,
http_tracing=self.http_tracing, jwt=self.jwt,
oauth=self.oauth,
- hs2_x_forward=self.hs2_x_forward)
+ hs2_x_forward=self.hs2_x_forward,
+ reuse_http_connection=self.reuse_http_connection)
elif protocol == 'beeswax':
return ImpalaBeeswaxClient(self.impalad, self.fetch_size,
self.kerberos_host_fqdn,
self.use_kerberos, self.kerberos_service_name,
self.use_ssl,
diff --git a/shell/impala_shell/option_parser.py
b/shell/impala_shell/option_parser.py
index fbd39db11..e8c918c8b 100644
--- a/shell/impala_shell/option_parser.py
+++ b/shell/impala_shell/option_parser.py
@@ -381,6 +381,11 @@ def get_option_parser(defaults):
"execution, even if query does not expect to fetch any
rows. "
"This is the default behavior when using beeswax protocol.
"
"Default to false for other Impala protocol.")
+ parser.add_option("--use_new_http_connection",
dest="use_new_http_connection",
+ action="store_true",
+ help="If set, a new underlying HTTP connection will be
used for each "
+ "request in hs2-http protocol. By default, the underlying
HTTP "
+ "connection is reused for multiple requests.")
# add default values to the help text
for option in parser.option_list:
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py
b/tests/custom_cluster/test_hs2_fault_injection.py
index 132a1b003..0c4e79162 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -76,6 +76,9 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
def _check_code(self):
if self.code >= 300:
+ # Read response like in case of an actual >=300 status code to allow
+ # reusing the connection.
+ self.readBody()
# Report any http response code that is not 1XX (informational response)
or
# 2XX (successful).
raise HttpError(self.code, self.message, self.body, self.headers)
diff --git a/tests/custom_cluster/test_shell_commandline.py
b/tests/custom_cluster/test_shell_commandline.py
index ec80dfdd8..77d5d4d8b 100644
--- a/tests/custom_cluster/test_shell_commandline.py
+++ b/tests/custom_cluster/test_shell_commandline.py
@@ -68,8 +68,7 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
request_id_base = ""
request_id_serialnum = 0
session_id = ""
- query_id = ""
- last_known_query_id = ""
+ known_query_ids = []
tracing_lines_count = 0
request_id_re =
re.compile("x-request-id=([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
@@ -119,17 +118,16 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
assert session_id == m.group(1), \
"session id expected '{0}', actual '{1}'".format(session_id,
m.group(1))
- # The query_id is generated by impala and must be the same for the
- # duration of the query.
+ # The query_id is generated by impala and must be the same for the
duration of
+ # the query. With connection re-use, the next query ID may appear
immediately,
+ # so verify each query ID appears contiguously in the log.
m = query_id_re.search(line)
- if m is None:
- query_id = ""
- else:
- if query_id == "":
- query_id = m.group(1)
- last_known_query_id = query_id
+ if m is not None:
+ query_id = m.group(1)
+ if query_id not in known_query_ids:
+ known_query_ids.append(query_id)
else:
- assert query_id == m.group(1), \
+ assert known_query_ids[-1] == query_id, \
"query id expected '{0}', actual '{1}'".format(query_id,
m.group(1))
# Assert that multiple HTTP connection tracing log lines were found.
@@ -141,9 +139,9 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
# from the impala query profile.
m = profile_query_id_re.search(result.stdout)
if m is not None:
- assert last_known_query_id == m.group(1), \
+ assert known_query_ids[-1] == m.group(1), \
"impala query profile id, expected '{0}', actual '{1}'" \
- .format(last_known_query_id, m.group(1))
+ .format(known_query_ids[-1], m.group(1))
else:
pytest.fail("did not find Impala query id in shell stdout")
diff --git a/tests/custom_cluster/test_shell_jwt_auth.py
b/tests/custom_cluster/test_shell_jwt_auth.py
index f37a9ab7c..8857b7f9f 100644
--- a/tests/custom_cluster/test_shell_jwt_auth.py
+++ b/tests/custom_cluster/test_shell_jwt_auth.py
@@ -71,7 +71,7 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
def test_jwt_auth_valid(self, vector):
"""Asserts the Impala shell can authenticate to Impala using JWT
authentication.
Also executes a query to ensure the authentication was successful."""
- before_rpc_count = self.__get_rpc_count()
+ self.__assert_success_fail_metric()
# Run a query and wait for it to complete.
args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
@@ -89,11 +89,11 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
# will happen via cookie auth, hence this count will be 1.
self.__assert_success_fail_metric(success_count=1)
- # Total cookie auth success should be 1 less than total rpc_count
- # since after the 1st rpc count, the cookie is set and no more jwt token
- # verification happens.
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- assert cookie_auth_count == query_rpc_count - 1, "Incorrect Cookie Auth
Count"
+ # Total cookie auth success should be 1 less than total number of RPCs
since after
+ # the 1st RPC, the cookie is set and no more jwt token verification
happens. However
+ # counting total number of RPCs is not trivial or stable, so ensure we
have multiple;
+ # we perform at least 10 RPCs during a query.
+ assert cookie_auth_count > 10, "Incorrect Cookie Auth Count"
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -122,7 +122,8 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
def test_jwt_auth_expired(self, vector):
"""Asserts the Impala shell fails to authenticate when it presents a JWT
that has a
valid signature but is expired."""
- before_rpc_count = self.__get_rpc_count()
+ before_connection_count = self.__get_connection_count()
+ self.__assert_success_fail_metric()
args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
'cat {0}'.format(TestImpalaShellJWTAuth.JWT_EXPIRED_PATH),
@@ -130,10 +131,10 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
result = run_impala_shell_cmd(vector, args, expect_success=False)
# Ensure the Impala coordinator is correctly reporting the jwt auth metrics
- # must be done before the cluster shuts down since it calls to the
coordinator
- self.__wait_for_rpc_count(before_rpc_count + 1)
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- self.__assert_success_fail_metric(fail_count=query_rpc_count)
+ # must be done before the cluster shuts down since it calls to the
coordinator.
+ self.__wait_for_connection_count(before_connection_count + 1)
+ query_connection_count = self.__get_connection_count() -
before_connection_count
+ self.__assert_success_fail_metric(fail_count=query_connection_count)
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -166,7 +167,7 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
def test_jwt_auth_invalid_jwk(self, vector):
"""Asserts the Impala shell fails to authenticate when it presents a JWT
that has a
valid signature but is expired."""
- before_rpc_count = self.__get_rpc_count()
+ before_connection_count = self.__get_connection_count()
args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
'cat {0}'.format(TestImpalaShellJWTAuth.JWT_INVALID_JWK),
@@ -175,9 +176,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
# Ensure the Impala coordinator is correctly reporting the jwt auth metrics
# must be done before the cluster shuts down since it calls to the
coordinator
- self.__wait_for_rpc_count(before_rpc_count + 1)
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- self.__assert_success_fail_metric(fail_count=query_rpc_count)
+ self.__wait_for_connection_count(before_connection_count + 1)
+ query_connection_count = self.__get_connection_count() -
before_connection_count
+ self.__assert_success_fail_metric(fail_count=query_connection_count)
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -213,9 +214,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
assert actual[1] == fail_count, "Expected JWT auth failure count to be
'{}' but " \
"was '{}'".format(fail_count, actual[1])
- def __get_rpc_count(self):
+ def __get_connection_count(self):
return
self.cluster.get_first_impalad().service.get_metric_value(self.HS2_HTTP_CONNS)
- def __wait_for_rpc_count(self, expected_count):
+ def __wait_for_connection_count(self, expected_count):
self.cluster.get_first_impalad().service.wait_for_metric_value(self.HS2_HTTP_CONNS,
expected_count, allow_greater=True)
diff --git a/tests/custom_cluster/test_shell_oauth_auth.py
b/tests/custom_cluster/test_shell_oauth_auth.py
index 4f6f788e7..b0a3d67e6 100644
--- a/tests/custom_cluster/test_shell_oauth_auth.py
+++ b/tests/custom_cluster/test_shell_oauth_auth.py
@@ -73,7 +73,6 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
def test_oauth_auth_valid(self, vector):
"""Asserts the Impala shell can authenticate to Impala using OAuth
authentication.
Also executes a query to ensure the authentication was successful."""
- before_rpc_count = self.__get_rpc_count()
# Run a query and wait for it to complete.
args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
@@ -91,11 +90,11 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
# will happen via cookie auth, hence this count will be 1.
self.__assert_success_fail_metric(success_count=1)
- # Total cookie auth success should be 1 less than total rpc_count
- # since after the 1st rpc count, the cookie is set and no more jwt token
- # verification happens.
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- assert cookie_auth_count == query_rpc_count - 1, "Incorrect Cookie Auth
Count"
+ # Total cookie auth success should be 1 less than total number of RPCs
since after
+ # the 1st RPC, the cookie is set and no more jwt token verification
happens. However
+ # counting total number of RPCs is not trivial or stable, so ensure we
have multiple;
+ # we perform at least 10 RPCs during a query.
+ assert cookie_auth_count > 10, "Incorrect Cookie Auth Count"
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -124,7 +123,7 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
def test_oauth_auth_expired(self, vector):
"""Asserts the Impala shell fails to authenticate when it presents an
OAuth token
that has a valid signature but is expired."""
- before_rpc_count = self.__get_rpc_count()
+ before_connection_count = self.__get_connection_count()
args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
'cat {0}'.format(TestImpalaShellOAuthAuth.OAUTH_EXPIRED_PATH),
@@ -133,9 +132,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
# Ensure the Impala coordinator is correctly reporting the OAuth auth
metrics
# must be done before the cluster shuts down since it calls to the
coordinator
- self.__wait_for_rpc_count(before_rpc_count + 1)
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- self.__assert_success_fail_metric(fail_count=query_rpc_count)
+ self.__wait_for_connection_count(before_connection_count + 1)
+ query_connection_count = self.__get_connection_count() -
before_connection_count
+ self.__assert_success_fail_metric(fail_count=query_connection_count)
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -168,7 +167,7 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
def test_oauth_auth_invalid_jwk(self, vector):
"""Asserts the Impala shell fails to authenticate when it presents an
OAuth token
that has a valid signature but is expired."""
- before_rpc_count = self.__get_rpc_count()
+ before_connection_count = self.__get_connection_count()
args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
'cat {0}'.format(TestImpalaShellOAuthAuth.OAUTH_INVALID_JWK),
@@ -177,9 +176,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
# Ensure the Impala coordinator is correctly reporting the OAuth auth
metrics
# must be done before the cluster shuts down since it calls to the
coordinator
- self.__wait_for_rpc_count(before_rpc_count + 1)
- query_rpc_count = self.__get_rpc_count() - before_rpc_count
- self.__assert_success_fail_metric(fail_count=query_rpc_count)
+ self.__wait_for_connection_count(before_connection_count + 1)
+ query_connection_count = self.__get_connection_count() -
before_connection_count
+ self.__assert_success_fail_metric(fail_count=query_connection_count)
# Shut down cluster to ensure logs flush to disk.
self._stop_impala_cluster()
@@ -295,9 +294,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
assert actual[1] == fail_count, "Expected OAuth auth failure count to be
'{}' but " \
"was '{}'".format(fail_count, actual[1])
- def __get_rpc_count(self):
+ def __get_connection_count(self):
return
self.cluster.get_first_impalad().service.get_metric_value(self.HS2_HTTP_CONNS)
- def __wait_for_rpc_count(self, expected_count):
+ def __wait_for_connection_count(self, expected_count):
self.cluster.get_first_impalad().service.wait_for_metric_value(self.HS2_HTTP_CONNS,
expected_count, allow_greater=True)
diff --git a/tests/shell/test_shell_interactive.py
b/tests/shell/test_shell_interactive.py
index 66d3e9256..be56147f2 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -499,16 +499,16 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
"""Test that a disconnected shell does not try to reconnect if quitting"""
result = run_impala_shell_interactive(vector, 'quit;',
shell_args=['-ifoo'],
wait_until_connected=False)
- assert "reconnect" not in result.stderr
+ assert "Connection lost, reconnecting" not in result.stderr
result = run_impala_shell_interactive(vector, 'exit;',
shell_args=['-ifoo'],
wait_until_connected=False)
- assert "reconnect" not in result.stderr
+ assert "Connection lost, reconnecting" not in result.stderr
# Null case: This is not quitting, so it will result in an attempt to
reconnect.
result = run_impala_shell_interactive(vector, 'show tables;',
shell_args=['-ifoo'],
wait_until_connected=False)
- assert "reconnect" in result.stderr
+ assert "Connection lost, reconnecting" in result.stderr
def test_bash_cmd_timing(self, vector):
"""Test existence of time output in bash commands run from shell"""