This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cca1aa9e IMPALA-13820: add ipv6 support for webui/hs2/hs2-http/beeswax
5cca1aa9e is described below

commit 5cca1aa9e5e1133640fb0e75630c719ebaa63ac1
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Sun Feb 23 23:53:48 2025 +0100

    IMPALA-13820: add ipv6 support for webui/hs2/hs2-http/beeswax
    
    Main changes:
    - added flag external_interface to override hostname for
      beeswax/hs2/hs2-http port to allow testing ipv6 on these
      interfaces without forcing ipv6 on internal communication
    - compile Squeasel with USE_IPV6 to allow ipv6 on webui (webui
      interface can be configured with existing flag webserver_interface)
    - fixed the handling of [<ipv6addr>].<port> style addresses in
      impala-shell (e.g. [::1]:21050) and test framework
    - improved handling of custom clusters in test framework to
      allow webui/ImpalaTestSuite's clients to work with non
      standard settings (also fixes these clients with SSL)
    
    Using ipv4 vs ipv6 vs dual stack can be configured by setting
    the interface to bind to with flag webserver_interface and
    external_interface. The Thrift server behind hs2/hs2-http/beeswax
    only accepts a single host name and uses the first address
    returned by getaddrinfo() that it can successfully bind to. This
    means that unless an ipv6 address is used (like ::1) the behavior
    will depend on the order of addresses returned by getaddrinfo():
    
https://github.com/apache/thrift/blob/63b7a263fc669c56fedca5d9a7310902d98df335/lib/cpp/src/thrift/transport/TServerSocket.cpp#L481
    For dual stack the only way currently is to bind to "::",
    as the Thrift server can only listen a single socket.
    
    Testing:
    - added custom cluster tests for ipv6 only/dual interface
      with and without SSL
    - manually tested in dual stack environment with client on a
      different host
    - among clients impala-shell and impyla are tested, but not
      JDBC/ODBC
    - no tests yet on truly ipv6 only environment, as internal
      communication (e.g. krpc) is not ready for ipv6
    
    To test manually the dev cluster can be started with ipv6 support:
    dual mode:
    bin/start-impala-cluster.py --impalad_args="--external_interface=:: 
--webserver_interface=::" --catalogd_args="--webserver_interface=::" 
--state_store_args="--webserver_interface=::"
    
    ipv6 only:
    bin/start-impala-cluster.py --impalad_args="--external_interface=::1 
--webserver_interface=::1" --catalogd_args="--webserver_interface=::1" 
--state_store_args="--webserver_interface=::1"
    
    Change-Id: I51ac66c568cc9bb06f4a3915db07a53c100109b6
    Reviewed-on: http://gerrit.cloudera.org:8080/22527
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/thrift-server.cc                        |   7 +-
 be/src/rpc/thrift-server.h                         |  13 +-
 be/src/service/impala-server.cc                    |   8 +
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/network-util.cc                        |  40 +++-
 be/src/util/network-util.h                         |   8 +-
 be/src/util/webserver.cc                           |  24 +-
 shell/impala_shell/impala_client.py                |  10 +-
 shell/impala_shell/impala_shell.py                 |  31 ++-
 tests/beeswax/impala_beeswax.py                    |  13 +-
 tests/common/custom_cluster_test_suite.py          |   7 +
 tests/common/impala_cluster.py                     |  17 +-
 tests/common/impala_connection.py                  |   5 +-
 tests/common/impala_service.py                     |  75 +++---
 tests/common/impala_test_suite.py                  |  66 ++++--
 tests/common/network.py                            |  48 ++++
 tests/custom_cluster/test_client_ssl.py            |  26 +--
 .../custom_cluster/test_event_processing_error.py  |   6 +-
 tests/custom_cluster/test_ipv6.py                  | 251 +++++++++++++++++++++
 tests/custom_cluster/test_redaction.py             |   4 +
 tests/metadata/test_event_processing.py            |  15 +-
 tests/metadata/test_event_processing_base.py       |  17 +-
 tests/stress/test_acid_stress.py                   |   6 +-
 tests/stress/test_insert_stress.py                 |  12 +-
 tests/stress/test_merge_stress.py                  |  12 +-
 tests/stress/test_update_stress.py                 |  24 +-
 26 files changed, 594 insertions(+), 153 deletions(-)

diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 181741664..f8cab8e48 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -270,9 +270,10 @@ ThriftServer::ThriftServer(const string& name,
     const std::shared_ptr<TProcessor>& processor, int port, AuthProvider* 
auth_provider,
     MetricGroup* metrics, int max_concurrent_connections, int64_t 
queue_timeout_ms,
     int64_t idle_poll_period_ms, TransportType transport_type,
-    bool is_external_facing)
+    bool is_external_facing, string host)
   : started_(false),
     port_(port),
+    host_(std::move(host)),
     ssl_enabled_(false),
     max_concurrent_connections_(max_concurrent_connections),
     queue_timeout_ms_(queue_timeout_ms),
@@ -342,7 +343,7 @@ Status 
ThriftServer::CreateSocket(std::shared_ptr<TServerSocket>* socket) {
       socket_factory->loadCertificate(certificate_path_.c_str());
       socket_factory->loadPrivateKey(private_key_path_.c_str());
       ImpalaKeepAliveServerSocket<TSSLServerSocket>* server_socket =
-          new ImpalaKeepAliveServerSocket<TSSLServerSocket>(port_, 
socket_factory);
+          new ImpalaKeepAliveServerSocket<TSSLServerSocket>(host_, port_, 
socket_factory);
       server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
           keepalive_retry_period_s_, keepalive_retry_count_);
       socket->reset(server_socket);
@@ -351,7 +352,7 @@ Status 
ThriftServer::CreateSocket(std::shared_ptr<TServerSocket>* socket) {
     }
   } else {
     ImpalaKeepAliveServerSocket<TServerSocket>* server_socket =
-        new ImpalaKeepAliveServerSocket<TServerSocket>(port_);
+        new ImpalaKeepAliveServerSocket<TServerSocket>(host_, port_);
     server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
         keepalive_retry_period_s_, keepalive_retry_count_);
     socket->reset(server_socket);
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 820526ca1..a6fe327a7 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -309,7 +309,7 @@ class ThriftServer {
       int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
       int64_t idle_poll_period_ms = 0,
       TransportType server_transport = TransportType::BINARY,
-      bool is_external_facing = true);
+      bool is_external_facing = true, std::string host = "");
 
   /// Enables secure access over SSL. Must be called before Start(). The first 
three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and 
private key
@@ -345,6 +345,9 @@ class ThriftServer {
   /// replaced with whatever port number the server is listening on.
   int port_;
 
+  /// The host name to bind with.
+  string host_;
+
   /// True if the server socket only accepts SSL connections
   bool ssl_enabled_;
 
@@ -546,6 +549,11 @@ class ThriftServerBuilder {
     return *this;
   }
 
+  ThriftServerBuilder& host(const string& host) {
+    host_ = host;
+    return *this;
+  }
+
   /// Constructs a new ThriftServer and puts it in 'server', if construction 
was
   /// successful, returns an error otherwise. In the error case, 'server' will 
not have
   /// been set and will not need to be freed, otherwise the caller assumes 
ownership of
@@ -554,7 +562,7 @@ class ThriftServerBuilder {
     std::unique_ptr<ThriftServer> ptr(
         new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
             max_concurrent_connections_, queue_timeout_ms_, 
idle_poll_period_ms_,
-            server_transport_type_, is_external_facing_));
+            server_transport_type_, is_external_facing_, host_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, 
cipher_list_,
@@ -572,6 +580,7 @@ class ThriftServerBuilder {
   int max_concurrent_connections_ = 0;
   std::string name_;
   std::shared_ptr<apache::thrift::TProcessor> processor_;
+  std::string host_;
   int port_ = 0;
   ThriftServer::TransportType server_transport_type_ =
       ThriftServer::TransportType::BINARY;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a8bfd5ace..9584405ad 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -157,6 +157,9 @@ DEFINE_bool(enable_external_fe_http, false,
     "if true enables http transport for external_fe_port otherwise binary 
transport is "
     "used");
 
+DEFINE_string(external_interface, "",
+    "Host name to bind with in beeswax/hs2/hs2-http. \"::\" allows IPv6 (dual 
stack).");
+
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
 DEFINE_string(default_query_options, "", "key=value pair of default query 
options for"
@@ -3238,6 +3241,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
           .keepalive(FLAGS_client_keepalive_probe_period_s,
               FLAGS_client_keepalive_retry_period_s,
               FLAGS_client_keepalive_retry_count)
+          .host(FLAGS_external_interface)
           .Build(&server));
       beeswax_server_.reset(server);
       beeswax_server_->SetConnectionHandler(this);
@@ -3270,6 +3274,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
           .keepalive(FLAGS_client_keepalive_probe_period_s,
               FLAGS_client_keepalive_retry_period_s,
               FLAGS_client_keepalive_retry_count)
+          .host(FLAGS_external_interface)
           .Build(&server));
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
@@ -3305,6 +3310,8 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
                   FLAGS_client_keepalive_retry_period_s,
                   FLAGS_client_keepalive_retry_count)
               .Build(&server));
+      // FLAGS_external_interface is not passed to external external_fe_port. 
If this is
+      // needed (e.g. for dual stack) then another subtask can be added to 
IMPALA-13819.
       external_fe_server_.reset(server);
       external_fe_server_->SetConnectionHandler(this);
     }
@@ -3338,6 +3345,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
               .keepalive(FLAGS_client_keepalive_probe_period_s,
                   FLAGS_client_keepalive_retry_period_s,
                   FLAGS_client_keepalive_retry_count)
+              .host(FLAGS_external_interface)
               .Build(&http_server));
       hs2_http_server_.reset(http_server);
       hs2_http_server_->SetConnectionHandler(this);
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 4468f3ba7..e254620b4 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -163,6 +163,8 @@ add_dependencies(Util gen-deps)
 # Squeasel requires C99 compatibility to build.
 SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c
   PROPERTIES COMPILE_FLAGS -std=c99)
+SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c
+  PROPERTIES COMPILE_FLAGS -DUSE_IPV6)
 
 # shared library which provides native logging support to JVMs over JNI.
 add_library(loggingsupport SHARED
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index b59cd408b..61b499eab 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -53,6 +53,7 @@ using std::random_device;
 namespace impala {
 
 const string LOCALHOST_IP_STR("127.0.0.1");
+const string LOCALHOST_IP_V6_STR("::1");
 
 Status GetHostname(string* hostname) {
   char name[HOST_NAME_MAX];
@@ -67,33 +68,44 @@ Status GetHostname(string* hostname) {
   return Status::OK();
 }
 
-Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip, bool ipv6){
   // Try to resolve via the operating system.
   vector<IpAddr> addresses;
   addrinfo hints;
   memset(&hints, 0, sizeof(struct addrinfo));
-  hints.ai_family = AF_INET; // IPv4 addresses only
+  hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
   hints.ai_socktype = SOCK_STREAM;
 
   struct addrinfo* addr_info;
   if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) {
     stringstream ss;
-    ss << "Could not find IPv4 address for: " << hostname;
+    ss << "Could not find IPv" << (ipv6 ? 6 : 4) << " address for: " << 
hostname;
     return Status(ss.str());
   }
 
   addrinfo* it = addr_info;
   while (it != NULL) {
-    char addr_buf[64];
-    const char* result =
-        inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 
64);
+    char addr_buf[INET6_ADDRSTRLEN];
+    const char* result = nullptr;
+    bool is_ipv6_addr = it->ai_family == AF_INET6;
+    if (is_ipv6_addr) {
+      result = inet_ntop(AF_INET6, &((sockaddr_in6*)it->ai_addr)->sin6_addr,
+          addr_buf, sizeof(addr_buf));
+    } else {
+      result = inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr,
+          addr_buf, sizeof(addr_buf));
+    }
+
     if (result == NULL) {
       stringstream ss;
-      ss << "Could not convert IPv4 address for: " << hostname;
+      ss << "Could not convert IPv" << (is_ipv6_addr ? 6: 4)
+          << "address for: " << hostname;
       freeaddrinfo(addr_info);
       return Status(ss.str());
     }
-    addresses.push_back(string(addr_buf));
+    if (is_ipv6_addr == ipv6) {
+      addresses.push_back(string(addr_buf));
+    }
     it = it->ai_next;
   }
 
@@ -101,7 +113,7 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* 
ip){
 
   if (addresses.empty()) {
     stringstream ss;
-    ss << "Could not convert IPv4 address for: " << hostname;
+    ss << "Could not convert IPv" << (ipv6 ? 6 : 4) << " address for: " << 
hostname;
     return Status(ss.str());
   }
 
@@ -130,7 +142,7 @@ bool IsResolvedAddress(const NetworkAddressPB& addr) {
 
 bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) {
   for (const string& candidate: addresses) {
-    if (candidate != LOCALHOST_IP_STR) {
+    if (candidate != LOCALHOST_IP_STR && candidate != LOCALHOST_IP_V6_STR) {
       *addr = candidate;
       return true;
     }
@@ -226,7 +238,13 @@ bool IsWildcardAddress(const string& ipaddress) {
 
 string TNetworkAddressToString(const TNetworkAddress& address) {
   stringstream ss;
-  ss << address.hostname << ":" << dec << address.port;
+  if (address.hostname.find(':') == string::npos) {
+    // IPv4
+    ss << address.hostname << ":" << dec << address.port;
+  } else {
+    // IPv6
+    ss << "[" << address.hostname << "]:" << dec << address.port;
+  }
   return ss.str();
 }
 
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 0facbb191..88071cc6a 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -42,9 +42,11 @@ bool IsResolvedAddress(const NetworkAddressPB& addr);
 
 /// Looks up all IP addresses associated with a given hostname and returns one 
of them via
 /// 'address'. If the IP addresses of a host don't change, then subsequent 
calls will
-/// always return the same address. Returns an error status if any system call 
failed,
-/// otherwise OK. Even if OK is returned, addresses may still be of zero 
length.
-Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) 
WARN_UNUSED_RESULT;
+/// always return the same address. Returns an error status if any system call 
failed or
+/// no address was found, otherwise OK.
+/// Returns only ipv6 addresses if 'ipv6' is true, otherwise only ipv4 
addresses.
+Status HostnameToIpAddr(
+    const Hostname& hostname, IpAddr* ip, bool ipv6=false) WARN_UNUSED_RESULT;
 
 /// Finds the first non-localhost IP address in the given list. Returns
 /// true if such an address was found, false otherwise.
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 2d8ac1ac5..be7e3300d 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -408,17 +408,25 @@ bool Webserver::IsSecure() const {
 Status Webserver::Start() {
   LOG(INFO) << "Starting webserver on " << 
TNetworkAddressToString(http_address_);
 
-  IpAddr ip;
-  RETURN_IF_ERROR(HostnameToIpAddr(http_address_.hostname, &ip));
-  stringstream listening_spec;
-  listening_spec << ip << ":" << http_address_.port;
+  IpAddr ipv4, ipv6;
+  Status ip_v6_status = HostnameToIpAddr(http_address_.hostname, &ipv6, true);
+  Status ip_v4_status = HostnameToIpAddr(http_address_.hostname, &ipv4, false);
+  if (!ip_v4_status.ok() && !ip_v6_status.ok()) return ip_v6_status;
 
-  if (IsSecure()) {
-    LOG(INFO) << "Webserver: Enabling HTTPS support";
-    // Squeasel makes sockets with 's' suffixes accept SSL traffic only
-    listening_spec << "s";
+  if (IsSecure()) LOG(INFO) << "Webserver: Enabling HTTPS support";
+  stringstream listening_spec;
+  if (ip_v6_status.ok()) {
+    listening_spec << "[" << ipv6 << "]:" << http_address_.port;
+    if (IsSecure()) listening_spec << "s";
+  }
+  if (ip_v4_status.ok()) {
+    if (ip_v6_status.ok()) listening_spec << ",";
+    listening_spec << ipv4 << ":" << http_address_.port;
+    if (IsSecure()) listening_spec << "s";
   }
   string listening_str = listening_spec.str();
+  LOG(INFO) << "Starting webserver listening to " << listening_str;
+
   vector<const char*> options;
 
   if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) {
diff --git a/shell/impala_shell/impala_client.py 
b/shell/impala_shell/impala_client.py
index 8e6164746..7fae660bd 100755
--- a/shell/impala_shell/impala_client.py
+++ b/shell/impala_shell/impala_client.py
@@ -429,8 +429,7 @@ class ImpalaClient(object):
     # symptoms in case of a problematic remote endpoint. It's better to have a 
finite
     # timeout so that in case of any connection errors, the client retries 
have a better
     # chance of succeeding.
-
-    host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
+    host_and_port = self._to_host_port(self.impalad_host, self.impalad_port)
     assert self.http_path
     # ImpalaHttpClient relies on the URI scheme (http vs https) to open an 
appropriate
     # connection to the server.
@@ -582,6 +581,13 @@ class ImpalaClient(object):
       num_deleted_rows = sum([int(k) for k in 
dml_result.rows_deleted.values()])
     return (num_rows, num_deleted_rows, dml_result.num_row_errors)
 
+  @staticmethod
+  def _to_host_port(host, port):
+    # Wrap ipv6 addresses in brackets.
+    is_ipv6_address = ":" in host
+    fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}"
+    return fmt.format(host, port)
+
 
 class ImpalaHS2Client(ImpalaClient):
   """Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
diff --git a/shell/impala_shell/impala_shell.py 
b/shell/impala_shell/impala_shell.py
index 6de925ccb..2445e8629 100755
--- a/shell/impala_shell/impala_shell.py
+++ b/shell/impala_shell/impala_shell.py
@@ -1001,6 +1001,31 @@ class ImpalaShell(cmd.Cmd, object):
     """Exit the impala shell"""
     return self.do_quit(args)
 
+  @staticmethod
+  def __parse_host_port(addr):
+    """Checks if the host name also contains a port and separates the two.
+    Returns either [host] or [host, port]. Detects if host is an ipv6 address 
like "[::]"
+    and removes the brackets from it.
+    """
+    split_by_colon = addr.split(':')
+    ipv6addr = len(split_by_colon) > 2
+    host_port = None
+    if ipv6addr:
+      if addr[0] == "[":
+        parts = addr.split("]")
+        host_port = [parts[0][1:]]
+        has_port = parts[1] != ""
+        if has_port:
+          if parts[1][0] != ":": return None
+          host_port.append(parts[1][1:])
+      else:
+        host_port = [addr]
+    else:
+      host_port = [val for val in split_by_colon if val.strip()]
+      # validate the connection string.
+      if ':' in addr and len(host_port) != 2: return None
+    return host_port
+
   def do_connect(self, args):
     """Connect to an Impalad instance:
     Usage: connect, defaults to the fqdn of the localhost and the protocol's 
default port
@@ -1023,10 +1048,10 @@ class ImpalaShell(cmd.Cmd, object):
 
     if not args: args = socket.getfqdn()
     tokens = args.split(" ")
-    # validate the connection string.
-    host_port = [val for val in tokens[0].split(':') if val.strip()]
+    addr = tokens[0]
+    host_port = self.__parse_host_port(addr)
     protocol = options.protocol.lower()
-    if (':' in tokens[0] and len(host_port) != 2):
+    if not host_port:
       print("Connection string must either be empty, or of the form "
             "<hostname[:port]>", file=sys.stderr)
       return CmdStatus.ERROR
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index ac0eba84c..0ccc78159 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -35,6 +35,7 @@ import sys
 import time
 
 from builtins import filter, map
+
 from thrift.protocol import TBinaryProtocol
 from thrift.Thrift import TApplicationException
 from thrift.transport.TTransport import TTransportException
@@ -42,6 +43,7 @@ from thrift.transport.TTransport import TTransportException
 from impala_thrift_gen.beeswax import BeeswaxService
 from impala_thrift_gen.beeswax.BeeswaxService import QueryState
 from impala_thrift_gen.ImpalaService import ImpalaService
+from tests.common.network import split_host_port
 from tests.util.thrift_util import create_transport
 
 LOG = logging.getLogger('impala_beeswax')
@@ -115,12 +117,9 @@ class ImpalaBeeswaxClient(object):
   def __init__(self, impalad, use_kerberos=False, user=None, password=None,
                use_ssl=False):
     self.connected = False
-    split_impalad = impalad.split(":")
-    assert len(split_impalad) in [1, 2]
-    self.impalad_host = split_impalad[0]
-    self.impalad_port = 21000  # Default beeswax port
-    if len(split_impalad) == 2:
-      self.impalad_port = int(split_impalad[1])
+    host, port = split_host_port(impalad)
+    self.impalad_host = host
+    self.impalad_port = port if port else 21000  # Default beeswax port
     self.imp_service = None
     self.transport = None
     self.use_kerberos = use_kerberos
@@ -170,7 +169,7 @@ class ImpalaBeeswaxClient(object):
 
   def close_connection(self):
     """Close the transport if it's still open"""
-    if self.transport:
+    if self.transport and self.connected:
       self.transport.close()
     self.connected = False
 
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index f39072050..5cf512c70 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -614,6 +614,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
       # Failure tests expect cluster to be initialised even if 
start-impala-cluster fails.
       cls.cluster = ImpalaCluster.get_e2e_test_cluster()
+      cls.impalad_test_service = cls.create_impala_service()
 
     PREVIOUS_CMD_STR = cmd_str
 
@@ -650,3 +651,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       for impalad in cls.cluster.impalads:
         impalad.service.wait_for_num_known_live_backends(expected_num_impalads,
                                                          
timeout=impalad_timeout_s)
+
+  @classmethod
+  def create_impala_service(cls):
+    """Override ImpalaTestSuite to return 1st impalad of custom cluster.
+    Returns None if no impalad was started."""
+    return cls.cluster.impalads[0].service if cls.cluster.impalads else None
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index c6c44bc76..96c436735 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -570,14 +570,21 @@ class BaseImpalaProcess(Process):
 
   def _get_webserver_certificate_file(self):
     # TODO: if this is containerised, the path will likely not be the same on 
the host.
+    # TODO: what we need in the client is the CA, not the server cert
     return self._get_arg_value("webserver_certificate_file", "")
 
+  def _get_ssl_client_ca_certificate(self):
+    return self._get_arg_value("ssl_client_ca_certificate", "")
+
   def _get_hostname(self):
     return self._get_arg_value("hostname", socket.gethostname())
 
   def _get_webserver_interface(self):
     return self._get_arg_value("webserver_interface", socket.gethostname())
 
+  def _get_external_interface(self):
+    return self._get_arg_value("external_interface", socket.gethostname())
+
   def _get_arg_value(self, arg_name, default=None):
     """Gets the argument value for given argument name"""
     for arg in self.cmd:
@@ -600,10 +607,12 @@ class BaseImpalaProcess(Process):
 class ImpaladProcess(BaseImpalaProcess):
   def __init__(self, cmd, container_id=None, port_map=None):
     super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
+    self.external_interface = self._get_external_interface()
     self.service = ImpaladService(self.hostname, self.webserver_interface,
+        self.external_interface,
         self.get_webserver_port(), self.__get_beeswax_port(),
         self.__get_krpc_port(), self.__get_hs2_port(), 
self.__get_hs2_http_port(),
-        self._get_webserver_certificate_file())
+        self._get_webserver_certificate_file(), 
self._get_ssl_client_ca_certificate())
 
   def _get_default_webserver_port(self):
     return DEFAULT_IMPALAD_WEBSERVER_PORT
@@ -685,7 +694,7 @@ class StateStoreProcess(BaseImpalaProcess):
     super(StateStoreProcess, self).__init__(cmd, container_id, port_map)
     self.service = StateStoredService(self.hostname, self.webserver_interface,
         self.get_webserver_port(), self._get_webserver_certificate_file(),
-        self.__get_port())
+        self._get_ssl_client_ca_certificate(), self.__get_port())
 
   def _get_default_webserver_port(self):
     return DEFAULT_STATESTORED_WEBSERVER_PORT
@@ -712,6 +721,7 @@ class CatalogdProcess(BaseImpalaProcess):
     super(CatalogdProcess, self).__init__(cmd, container_id, port_map)
     self.service = CatalogdService(self.hostname, self.webserver_interface,
         self.get_webserver_port(), self._get_webserver_certificate_file(),
+        self._get_ssl_client_ca_certificate(),
         self.__get_port())
 
   def _get_default_webserver_port(self):
@@ -743,7 +753,8 @@ class AdmissiondProcess(BaseImpalaProcess):
   def __init__(self, cmd, container_id=None, port_map=None):
     super(AdmissiondProcess, self).__init__(cmd, container_id, port_map)
     self.service = AdmissiondService(self.hostname, self.webserver_interface,
-        self.get_webserver_port(), self._get_webserver_certificate_file())
+        self.get_webserver_port(), self._get_webserver_certificate_file(),
+        self._get_ssl_client_ca_certificate())
 
   def _get_default_webserver_port(self):
     return DEFAULT_ADMISSIOND_WEBSERVER_PORT
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index f57c1f3a3..9aad86e66 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -40,6 +40,7 @@ from tests.beeswax.impala_beeswax import (
     ImpalaBeeswaxException,
 )
 import tests.common
+from tests.common.network import split_host_port
 from tests.common.patterns import LOG_FORMAT
 from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
 from tests.util.thrift_util import op_handle_to_query_id, 
session_handle_to_session_id
@@ -610,13 +611,13 @@ class ImpylaHS2Connection(ImpalaConnection):
     return self.__cursor
 
   def connect(self):
-    host, port = self.__host_port.split(":")
+    host, port = split_host_port(self.__host_port)
     conn_kwargs = {}
     if self._is_hive:
       conn_kwargs['auth_mechanism'] = 'PLAIN'
     try:
       self.__impyla_conn = impyla.connect(
-        host=host, port=int(port), 
use_http_transport=self.__use_http_transport,
+        host=host, port=port, use_http_transport=self.__use_http_transport,
         http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs)
       self.log_client("connected to {0} with impyla {1}".format(
         self.__host_port, self.get_test_protocol()))
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index ae9707ff8..fe808a499 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -35,6 +35,7 @@ from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 
 from tests.common.impala_connection import create_connection, 
create_ldap_connection
+from tests.common.network import to_host_port, CERT_TO_CA_MAP
 from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
 
 LOG = logging.getLogger('impala_service')
@@ -48,7 +49,7 @@ WEBSERVER_PASSWORD = 
os.environ.get('IMPALA_WEBSERVER_PASSWORD', None)
 # TODO: Refactor the retry/timeout logic into a common place.
 class BaseImpalaService(object):
   def __init__(self, hostname, webserver_interface, webserver_port,
-      webserver_certificate_file):
+      webserver_certificate_file, ssl_client_ca_certificate_file):
     self.hostname = hostname
     self.webserver_interface = webserver_interface
     if webserver_interface == "":
@@ -56,6 +57,7 @@ class BaseImpalaService(object):
       self.webserver_interface = hostname
     self.webserver_port = webserver_port
     self.webserver_certificate_file = webserver_certificate_file
+    self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file
     self.webserver_username_password = None
     if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None:
       self.webserver_username_password = (WEBSERVER_USERNAME, 
WEBSERVER_PASSWORD)
@@ -68,10 +70,14 @@ class BaseImpalaService(object):
         protocol = "http"
         if self.webserver_certificate_file != "":
           protocol = "https"
-        url = "%s://%s:%d/%s" % \
-            (protocol, self.webserver_interface, int(self.webserver_port), 
page_name)
-        return requests.get(url, verify=self.webserver_certificate_file,
-            auth=self.webserver_username_password)
+        host_port = to_host_port(self.webserver_interface, self.webserver_port)
+        url = "%s://%s/%s" % (protocol, host_port, page_name)
+        cert = self.webserver_certificate_file
+        # Instead of cert use its CA cert if available.
+        file_part = cert.split("/")[-1]
+        if file_part in CERT_TO_CA_MAP:
+          cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part])
+        return requests.get(url, verify=cert, 
auth=self.webserver_username_password)
       except Exception as e:
         LOG.info("Debug webpage not yet available: %s", str(e))
       sleep(interval)
@@ -260,11 +266,14 @@ class BaseImpalaService(object):
 # Allows for interacting with an Impalad instance to perform operations such 
as creating
 # new connections or accessing the debug webpage.
 class ImpaladService(BaseImpalaService):
-  def __init__(self, hostname, webserver_interface="", webserver_port=25000,
-      beeswax_port=21000, krpc_port=27000, hs2_port=21050,
-      hs2_http_port=28000, webserver_certificate_file=""):
+  def __init__(self, hostname, webserver_interface="", external_interface="",
+      webserver_port=25000, beeswax_port=21000, krpc_port=27000, 
hs2_port=21050,
+      hs2_http_port=28000, webserver_certificate_file="",
+      ssl_client_ca_certificate_file=""):
     super(ImpaladService, self).__init__(
-        hostname, webserver_interface, webserver_port, 
webserver_certificate_file)
+        hostname, webserver_interface, webserver_port, 
webserver_certificate_file,
+        ssl_client_ca_certificate_file)
+    self.external_interface = external_interface if external_interface else 
hostname
     self.beeswax_port = beeswax_port
     self.krpc_port = krpc_port
     self.hs2_port = hs2_port
@@ -444,30 +453,32 @@ class ImpaladService(BaseImpalaService):
       sleep(interval)
     return False
 
-  def is_port_open(self, port):
+  def use_ssl_for_clients(self):
+    return self.ssl_client_ca_certificate_file != ""
+
+  def is_port_open(self, host, port):
     try:
-      sock = socket.create_connection((self.hostname, port), timeout=1)
+      sock = socket.create_connection((host, port), timeout=1)
       sock.close()
       return True
     except Exception:
       return False
 
   def webserver_port_is_open(self):
-    return self.is_port_open(self.webserver_port)
+    return self.is_port_open(self.webserver_interface, self.webserver_port)
 
   def create_beeswax_client(self, use_kerberos=False):
     """Creates a new beeswax client connection to the impalad.
     DEPRECATED: Use create_hs2_client() instead."""
-    LOG.warning('beeswax protocol is deprecated.')
-    client = create_connection('%s:%d' % (self.hostname, self.beeswax_port),
-                               use_kerberos, BEESWAX)
+    client = create_connection(to_host_port(self.external_interface, 
self.beeswax_port),
+                               use_kerberos, BEESWAX, 
use_ssl=self.use_ssl_for_clients())
     client.connect()
     return client
 
   def beeswax_port_is_open(self):
     """Test if the beeswax port is open. Does not need to authenticate."""
     # Check if the port is open first to avoid chatty logging of Thrift 
connection.
-    if not self.is_port_open(self.beeswax_port): return False
+    if not self.is_port_open(self.external_interface, self.beeswax_port): 
return False
 
     try:
       # The beeswax client will connect successfully even if not authenticated.
@@ -475,31 +486,31 @@ class ImpaladService(BaseImpalaService):
       client.close()
       return True
     except Exception as e:
-      LOG.info(e)
       return False
 
   def create_ldap_beeswax_client(self, user, password, use_ssl=False):
-    client = create_ldap_connection('%s:%d' % (self.hostname, 
self.beeswax_port),
+    client = create_ldap_connection(to_host_port(self.hostname, 
self.beeswax_port),
                                     user=user, password=password, 
use_ssl=use_ssl)
     client.connect()
     return client
 
   def create_hs2_client(self, user=None):
     """Creates a new HS2 client connection to the impalad"""
-    client = create_connection('%s:%d' % (self.hostname, self.hs2_port),
-                               protocol=HS2, user=user)
+    client = create_connection('%s:%d' % (self.external_interface, 
self.hs2_port),
+                               protocol=HS2, user=user,
+                               use_ssl=self.use_ssl_for_clients())
     client.connect()
     return client
 
   def hs2_port_is_open(self):
     """Test if the HS2 port is open. Does not need to authenticate."""
     # Check if the port is open first to avoid chatty logging of Thrift 
connection.
-    if not self.is_port_open(self.hs2_port): return False
+    if not self.is_port_open(self.external_interface, self.hs2_port): return 
False
 
     # Impyla will try to authenticate as part of connecting, so preserve 
previous logic
     # that uses the HS2 thrift code directly.
     try:
-      sock = TSocket(self.hostname, self.hs2_port)
+      sock = TSocket(self.external_interface, self.hs2_port)
       transport = TBufferedTransport(sock)
       transport.open()
       transport.close()
@@ -510,7 +521,7 @@ class ImpaladService(BaseImpalaService):
 
   def hs2_http_port_is_open(self):
     # Only check if the port is open, do not create Thrift transport.
-    return self.is_port_open(self.hs2_http_port)
+    return self.is_port_open(self.external_interface, self.hs2_http_port)
 
   def create_client(self, protocol):
     """Creates a new client connection for given protocol to this impalad"""
@@ -520,7 +531,8 @@ class ImpaladService(BaseImpalaService):
     if protocol == BEESWAX:
       LOG.warning('beeswax protocol is deprecated.')
       port = self.beeswax_port
-    client = create_connection('%s:%d' % (self.hostname, port), 
protocol=protocol)
+    client = create_connection(to_host_port(self.external_interface, port),
+                               protocol=protocol)
     client.connect()
     return client
 
@@ -537,9 +549,10 @@ class ImpaladService(BaseImpalaService):
 # accessing the debug webpage.
 class StateStoredService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface, webserver_port,
-      webserver_certificate_file, service_port):
+      webserver_certificate_file, ssl_client_ca_certificate_file, 
service_port):
     super(StateStoredService, self).__init__(
-        hostname, webserver_interface, webserver_port, 
webserver_certificate_file)
+        hostname, webserver_interface, webserver_port, 
webserver_certificate_file,
+        ssl_client_ca_certificate_file)
     self.service_port = service_port
 
   def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1):
@@ -554,9 +567,10 @@ class StateStoredService(BaseImpalaService):
 # accessing the debug webpage.
 class CatalogdService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface, webserver_port,
-      webserver_certificate_file, service_port):
+      webserver_certificate_file, ssl_client_ca_certificate_file, 
service_port):
     super(CatalogdService, self).__init__(
-        hostname, webserver_interface, webserver_port, 
webserver_certificate_file)
+        hostname, webserver_interface, webserver_port, 
webserver_certificate_file,
+        ssl_client_ca_certificate_file)
     self.service_port = service_port
 
   def get_catalog_version(self, timeout=10, interval=1):
@@ -579,6 +593,7 @@ class CatalogdService(BaseImpalaService):
 
 class AdmissiondService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface, webserver_port,
-      webserver_certificate_file):
+      webserver_certificate_file, ssl_client_ca_certificate_file):
     super(AdmissiondService, self).__init__(
-        hostname, webserver_interface, webserver_port, 
webserver_certificate_file)
+        hostname, webserver_interface, webserver_port, 
webserver_certificate_file,
+        ssl_client_ca_certificate_file)
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 0ae933bbd..77a5bc253 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -49,6 +49,7 @@ from tests.common.environ import (
 from tests.common.errors import Timeout
 from tests.common.impala_connection import create_connection
 from tests.common.impala_service import ImpaladService
+from tests.common.network import to_host_port
 from tests.common.test_dimensions import (
     ALL_BATCH_SIZES,
     ALL_DISABLE_CODEGEN_OPTIONS,
@@ -265,7 +266,11 @@ class ImpalaTestSuite(BaseTestSuite):
     cls.hs2_http_client = None
     cls.hive_client = None
     cls.hive_transport = None
+
+    # In case of custom cluster tests this returns the 1st impalad or None if 
nothing
+    # is started.
     cls.impalad_test_service = cls.create_impala_service()
+    ImpalaTestSuite.impalad_test_service = cls.impalad_test_service
 
     # Override the shell history path so that commands run by any tests
     # don't write any history into the developer's file.
@@ -377,10 +382,12 @@ class ImpalaTestSuite(BaseTestSuite):
     if protocol is None:
       protocol = cls.default_test_protocol()
     if host_port is None:
-      host_port = cls.__get_default_host_port(protocol)
+      host = cls.impalad_test_service.external_interface
+      host_port = to_host_port(host, cls._get_default_port(protocol))
+    use_ssl = cls.impalad_test_service.use_ssl_for_clients()
     client = create_connection(
         host_port=host_port, use_kerberos=pytest.config.option.use_kerberos,
-        protocol=protocol, is_hive=is_hive, user=user)
+        protocol=protocol, is_hive=is_hive, user=user, use_ssl=use_ssl)
     client.connect()
     return client
 
@@ -413,7 +420,7 @@ class ImpalaTestSuite(BaseTestSuite):
       host, port = host_port.split(':')
       port = str(int(port) + nth)
       host_port = host + ':' + port
-    return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
+    return cls.create_impala_client(host_port, protocol=protocol)
 
   @classmethod
   def create_impala_clients(cls):
@@ -448,6 +455,11 @@ class ImpalaTestSuite(BaseTestSuite):
   @classmethod
   def close_impala_clients(cls):
     """Closes Impala clients created by create_impala_clients()."""
+    # cls.client should be equal to one of belove, unless test method 
implicitly override.
+    # Closing twice would lead to error in some clients (impyla+SSL).
+    if cls.client not in (cls.beeswax_client, cls.hs2_client, 
cls.hs2_http_client):
+      cls.client.close()
+    cls.client = None
     if cls.beeswax_client:
       cls.beeswax_client.close()
       cls.beeswax_client = None
@@ -457,11 +469,6 @@ class ImpalaTestSuite(BaseTestSuite):
     if cls.hs2_http_client:
       cls.hs2_http_client.close()
       cls.hs2_http_client = None
-    # cls.client should be equal to one of above, unless test method 
implicitly override.
-    # Closing twice should be OK.
-    if cls.client:
-      cls.client.close()
-      cls.client = None
 
   @classmethod
   def default_impala_client(cls, protocol):
@@ -474,13 +481,13 @@ class ImpalaTestSuite(BaseTestSuite):
     raise Exception("unknown protocol: {0}".format(protocol))
 
   @classmethod
-  def __get_default_host_port(cls, protocol):
+  def _get_default_port(cls, protocol):
     if protocol == BEESWAX:
-      return IMPALAD
+      return IMPALAD_BEESWAX_PORT
     elif protocol == HS2_HTTP:
-      return IMPALAD_HS2_HTTP_HOST_PORT
+      return IMPALAD_HS2_HTTP_PORT
     elif protocol == HS2:
-      return IMPALAD_HS2_HOST_PORT
+      return IMPALAD_HS2_PORT
     else:
       raise NotImplementedError("Not yet implemented: protocol=" + protocol)
 
@@ -497,13 +504,8 @@ class ImpalaTestSuite(BaseTestSuite):
       raise NotImplementedError("Not yet implemented: protocol=" + protocol)
 
   @classmethod
-  def create_impala_service(
-      cls, host_port=IMPALAD, webserver_interface="", webserver_port=25000):
-    host, port = host_port.split(':')
-    if webserver_interface == "":
-      webserver_interface = host
-    return ImpaladService(host, beeswax_port=port,
-        webserver_interface=webserver_interface, webserver_port=webserver_port)
+  def create_impala_service(cls):
+    return ImpaladService(IMPALAD_HOSTNAME)
 
   @classmethod
   def create_hdfs_client(cls):
@@ -773,7 +775,7 @@ class ImpalaTestSuite(BaseTestSuite):
     target_impalad_clients = list()
     if multiple_impalad:
       target_impalad_clients =\
-          [ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
+          [self.create_impala_client(host_port, protocol=protocol)
            for host_port in self.__get_cluster_host_ports(protocol)]
     else:
       target_impalad_clients = [self.default_impala_client(protocol)]
@@ -822,7 +824,7 @@ class ImpalaTestSuite(BaseTestSuite):
       Helper to execute a query block in Hive. No special handling of query
       options is done, since we use a separate session for each block.
       """
-      h = ImpalaTestSuite.create_impala_client(HIVE_HS2_HOST_PORT, 
protocol=HS2,
+      h = self.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2,
               is_hive=True)
       try:
         result = None
@@ -1788,3 +1790,25 @@ class ImpalaTestSuite(BaseTestSuite):
           break
         properties[fields[1].rstrip()] = fields[2].rstrip()
     return properties
+
+  # Checks if an Impala connection is functional.
+  @staticmethod
+  def check_connection(conn):
+    res = conn.execute("select 1 + 1")
+    assert res.data == ["2"]
+
+  # Checks connections for all protocols.
+  def check_connections(cls, expected_count=3):
+    # default client must exist
+    cls.check_connection(cls.client)
+    count = 0
+    if cls.beeswax_client:
+      cls.check_connection(cls.beeswax_client)
+      count += 1
+    if cls.hs2_client:
+      cls.check_connection(cls.hs2_client)
+      count += 1
+    if cls.hs2_http_client:
+      cls.check_connection(cls.hs2_http_client)
+      count += 1
+    assert count == expected_count
diff --git a/tests/common/network.py b/tests/common/network.py
index 55502b4eb..7a2b11814 100644
--- a/tests/common/network.py
+++ b/tests/common/network.py
@@ -19,7 +19,9 @@
 
 from __future__ import absolute_import, division, print_function
 import socket
+import ssl
 
+from tests.common.environ import IS_REDHAT_DERIVATIVE
 
 # Retrieves the host external IP rather than localhost/127.0.0.1 so we have an 
IP that
 # Impala will consider distinct from storage backends to force remote 
scheduling.
@@ -30,3 +32,49 @@ def get_external_ip():
   # Timeout=0 means it doesn't need to resolve.
   s.connect(('10.254.254.254', 1))
   return s.getsockname()[0]
+
+
+def split_host_port(host_port):
+  """Checks if the host name also contains a port and separates the two.
+  Returns either (host, None) or (host, port). Detects if host is an ipv6 
address
+  like "[::]" and removes the brackets from it.
+  """
+  is_ipv6_address = host_port[0] == "["
+  if is_ipv6_address:
+    parts = host_port[1:].split("]")
+    if len(parts) == 1 or not parts[1]:
+      return (parts[0], None)
+    return (parts[0], int(parts[1][1:]))
+  else:
+    parts = host_port.split(":")
+    if len(parts) == 1:
+      return (parts[0], None)
+    return (parts[0], int(parts[1]))
+
+
+def to_host_port(host, port):
+  is_ipv6_address = ":" in host
+  fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}"
+  return fmt.format(host, port)
+
+
+CERT_TO_CA_MAP = {
+  "wildcard-cert.pem": "wildcardCA.pem",
+  "wildcard-san-cert.pem": "wildcardCA.pem"
+}
+
+REQUIRED_MIN_OPENSSL_VERSION = 0x10001000
+# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS 
Python2.7.5
+# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
+if IS_REDHAT_DERIVATIVE:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
+else:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
+_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
+if _openssl_version_number is None:
+  SKIP_SSL_MSG = "Legacy OpenSSL module detected"
+elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION:
+  SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % (
+    ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION)
+else:
+  SKIP_SSL_MSG = None
diff --git a/tests/custom_cluster/test_client_ssl.py 
b/tests/custom_cluster/test_client_ssl.py
index 832b547ae..09c462d97 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -24,34 +24,20 @@ import pytest
 import re
 import requests
 import signal
-import ssl
 import socket
 import sys
 import time
 
-from tests.common.environ import IS_REDHAT_DERIVATIVE
+
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
+from tests.common.network import SKIP_SSL_MSG, 
REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12
 from tests.common.test_dimensions import create_client_protocol_dimension
 from tests.common.test_vector import BEESWAX
 from tests.shell.util import run_impala_shell_cmd, 
run_impala_shell_cmd_no_expect, \
     ImpalaShell, create_impala_shell_executable_dimension
 
-REQUIRED_MIN_OPENSSL_VERSION = 0x10001000
-# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS 
Python2.7.5
-# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
-if IS_REDHAT_DERIVATIVE:
-  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
-else:
-  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
-_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
-if _openssl_version_number is None:
-  SKIP_SSL_MSG = "Legacy OpenSSL module detected"
-elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION:
-  SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % (
-    ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION)
-else:
-  SKIP_SSL_MSG = None
+
 CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
 
 
@@ -99,7 +85,6 @@ class TestClientSsl(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, 
statestored_args=SSL_ARGS,
                                     catalogd_args=SSL_ARGS)
   def test_ssl(self, vector):
-
     self._verify_negative_cases(vector)
     # TODO: This is really two different tests, but the custom cluster takes 
too long to
     # start. Make it so that custom clusters can be specified across test 
suites.
@@ -140,6 +125,7 @@ class TestClientSsl(CustomClusterTestSuite):
     print(result.stderr)
     assert "Query Status: Cancelled" in result.stdout
     assert impalad.wait_for_num_in_flight_queries(0)
+    self.check_connections()
 
   WEBSERVER_SSL_ARGS = 
("--webserver_certificate_file=%(cert_dir)s/server-cert.pem "
                         
"--webserver_private_key_file=%(cert_dir)s/server-key.pem "
@@ -217,6 +203,7 @@ class TestClientSsl(CustomClusterTestSuite):
 
     self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR,
                                   host="ip4.impala.test")
+    self.check_connections()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=SSL_WILDCARD_SAN_ARGS,
@@ -225,7 +212,6 @@ class TestClientSsl(CustomClusterTestSuite):
   @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
   def test_wildcard_san_ssl(self, vector):
     """ Test for IMPALA-3159: Test with a certificate which has a wildcard as 
a SAN. """
-
     # This block of code is the same as _validate_positive_cases() but we want 
to check
     # if retrieving the SAN is supported first.
     args = ["--ssl", "-q", "select 1 + 2", "--ca_cert=%s/wildcardCA.pem" % 
CERT_DIR]
@@ -239,6 +225,8 @@ class TestClientSsl(CustomClusterTestSuite):
 
     self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR,
                                   host="ip4.impala.test")
+    self.check_connections()
+
 
   def _verify_negative_cases(self, vector, host=""):
     # Expect the shell to not start successfully if we point --ca_cert to an 
incorrect
diff --git a/tests/custom_cluster/test_event_processing_error.py 
b/tests/custom_cluster/test_event_processing_error.py
index 49d5e9b10..3942e2698 100644
--- a/tests/custom_cluster/test_event_processing_error.py
+++ b/tests/custom_cluster/test_event_processing_error.py
@@ -352,18 +352,18 @@ class TestEventProcessingError(CustomClusterTestSuite):
     replication tests
     """
     # inserts on transactional tables
-    TestEventProcessingBase._run_test_insert_events_impl(unique_database, True)
+    TestEventProcessingBase._run_test_insert_events_impl(self, 
unique_database, True)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     try:
         test_db = unique_database + "_no_transact"
         self.run_stmt_in_hive("""create database {}""".format(test_db))
         # inserts on external tables
-        TestEventProcessingBase._run_test_insert_events_impl(test_db, False)
+        TestEventProcessingBase._run_test_insert_events_impl(self, test_db, 
False)
         assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     finally:
         self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
     # replication related tests
-    TestEventProcessingBase._run_event_based_replication_tests_impl(
+    TestEventProcessingBase._run_event_based_replication_tests_impl(self,
         self.filesystem_client)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
diff --git a/tests/custom_cluster/test_ipv6.py 
b/tests/custom_cluster/test_ipv6.py
new file mode 100644
index 000000000..f4a0a9de5
--- /dev/null
+++ b/tests/custom_cluster/test_ipv6.py
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import, division, print_function
+import json
+import logging
+import os
+import pytest
+import requests
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.network import SKIP_SSL_MSG
+from tests.common.test_dimensions import create_client_protocol_dimension
+from tests.shell.util import run_impala_shell_cmd, \
+    create_impala_shell_executable_dimension
+from tests.common.impala_connection import create_connection
+
+LOG = logging.getLogger('impala_test_suite')
+
+CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
+
+# Use wildcard san cert to be flexible about host name.
+SSL_WILDCARD_SAN_ARGS = ("--ssl_client_ca_certificate={0}/wildcardCA.pem "
+                         "--ssl_server_certificate={0}/wildcard-san-cert.pem "
+                         "--ssl_private_key={0}/wildcard-san-cert.key "
+                         "--hostname={1} "
+                         "--state_store_host={1} "
+                         "--catalog_service_host={1} "
+                         
"--webserver_certificate_file={0}/wildcard-san-cert.pem "
+                         
"--webserver_private_key_file={0}/wildcard-san-cert.key "
+                         ).format(CERT_DIR, "ip4.impala.test")
+
+WILDCARD_CA_CERT_PATH = "%s/wildcardCA.pem" % CERT_DIR
+
+IPV6_ONLY_IP_WEBSERBER_ARG = "--webserver_interface=::1 "
+IPV6_DUAL_IP_WEBSERBER_ARG = "--webserver_interface=:: "
+IPV6_ONLY_IP_QUERY_ARG = "--external_interface=::1 "
+IPV6_DUAL_IP_QUERY_ARG = "--external_interface=:: "
+IPV6_ONLY_IP_COORDINATOR_ARG = \
+    "%s %s" % (IPV6_ONLY_IP_WEBSERBER_ARG, IPV6_ONLY_IP_QUERY_ARG)
+IPV6_DUAL_IP_COORDINATOR_ARG = \
+    "%s %s" % (IPV6_DUAL_IP_WEBSERBER_ARG, IPV6_DUAL_IP_QUERY_ARG)
+
+IPV6_ONLY_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip6.impala.test "
+IPV6_DUAL_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip46.impala.test "
+IPV6_ONLY_HOSTNAME_QUERY_ARG = "--external_interface=::1 "
+IPV6_DUAL_HOSTNAME_QUERY_ARG = "--external_interface=:: "
+
+WEBUI_PORTS = [25000, 25010, 25020]
+
+# Error text can depend on both protocol and python version.
+CONN_ERR = ["Could not connect", "Connection refused"]
+CERT_ERR = ["doesn't match", "certificate verify failed"]
+WEB_CERT_ERR = "CertificateError"
+
+
+class TestIPv6Base(CustomClusterTestSuite):
+  ca_cert = None
+
+  @classmethod
+  def setup_class(cls):
+    super(TestIPv6Base, cls).setup_class()
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIPv6Base, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    
cls.ImpalaTestMatrix.add_dimension(create_impala_shell_executable_dimension())
+
+  def _smoke(self, host, vector, expected_errors=[]):
+    proto = vector.get_value('protocol')
+    try:
+      port = self._get_default_port(proto)
+      host_port = "%s:%d" % (host, port)
+      use_ssl = self.ca_cert is not None
+      conn = create_connection(host_port, protocol=proto, use_ssl=use_ssl)
+      conn.connect()
+      assert not expected_errors
+      res = conn.execute("select 1")
+      assert res.data == ["1"]
+    except Exception as ex:
+      for err in expected_errors:
+        if err in str(ex): return
+      raise ex
+
+  def _webui_smoke(self, url, err=None):
+    """Tests to check glibc version and locale is available"""
+    try:
+      if self.ca_cert:
+        other_info_page = requests.get(url + "/?json", 
verify=self.ca_cert).text
+      else:
+        other_info_page = requests.get(url + "/?json", verify=False).text
+      assert err is None
+      other_info = json.loads(other_info_page)
+      assert "glibc_version" in other_info
+    except Exception as ex:
+      if not err: raise ex
+      assert err in str(ex)
+
+  def _shell_smoke(self, host, vector, expected_errors=[]):
+    proto = vector.get_value('protocol')
+    port = self._get_default_port(proto)
+    host_port = "%s:%d" % (host, port)
+    try:
+      shell_options = ["-i", host_port, "-q", "select 1"]
+      if self.ca_cert:
+        shell_options.append("--ssl")
+        shell_options.append("--ca_cert=" + self.ca_cert)
+      result = run_impala_shell_cmd(vector, shell_options)
+      assert not expected_errors
+      assert "Fetched 1 row" in result.stderr
+    except Exception as ex:
+      for err in expected_errors:
+        if err in str(ex): return
+      raise ex
+
+
[email protected]_args(impalad_args=IPV6_DUAL_IP_WEBSERBER_ARG
+                                               + IPV6_DUAL_IP_QUERY_ARG,
+                                  statestored_args=IPV6_DUAL_IP_WEBSERBER_ARG,
+                                  catalogd_args=IPV6_DUAL_IP_WEBSERBER_ARG)
+class TestIPv6DualNoSsl(TestIPv6Base):
+
+  def test_ipv6_dual_no_ssl(self, vector):
+    for port in WEBUI_PORTS:
+      self._webui_smoke("http://127.0.0.1:%d"; % port)
+      self._webui_smoke("http://[::1]:%d"; % port)
+      self._webui_smoke("http://ip4.impala.test:%d"; % port)
+      self._webui_smoke("http://ip6.impala.test:%d"; % port)
+      self._webui_smoke("http://ip46.impala.test:%d"; % port)
+
+    self._smoke("[::1]", vector)
+    self._smoke("127.0.0.1", vector)
+    self._smoke("ip4.impala.test", vector)
+    self._smoke("ip6.impala.test", vector)
+    self._smoke("ip46.impala.test", vector)
+
+    self._shell_smoke("[::1]", vector)
+    self._shell_smoke("127.0.0.1", vector)
+    self._shell_smoke("ip4.impala.test", vector)
+    self._shell_smoke("ip6.impala.test", vector)
+    self._shell_smoke("ip46.impala.test", vector)
+
+
[email protected]_args(impalad_args=IPV6_ONLY_IP_WEBSERBER_ARG
+                                               + IPV6_ONLY_IP_QUERY_ARG,
+                                  statestored_args=IPV6_ONLY_IP_WEBSERBER_ARG,
+                                  catalogd_args=IPV6_ONLY_IP_WEBSERBER_ARG)
+class TestIPv6OnlyNoSsl(TestIPv6Base):
+
+  def test_ipv6_only_no_ssl(self, vector):
+    self.check_connections()
+    for port in WEBUI_PORTS:
+      self._webui_smoke("http://127.0.0.1:%d"; % port, err="Connection refused")
+      self._webui_smoke("http://[::1]:%d"; % port)
+      self._webui_smoke("http://ip4.impala.test:%d"; % port, err="Connection 
refused")
+      self._webui_smoke("http://ip6.impala.test:%d"; % port)
+      self._webui_smoke("http://ip46.impala.test:%d"; % port)
+
+    self._smoke("[::1]", vector)
+    self._smoke("127.0.0.1", vector, CONN_ERR)
+    self._smoke("ip4.impala.test", vector, CONN_ERR)
+    self._smoke("ip6.impala.test", vector)
+    self._smoke("ip46.impala.test", vector)
+
+    self._shell_smoke("[::1]", vector)
+    self._shell_smoke("127.0.0.1", vector, CONN_ERR)
+    self._shell_smoke("ip4.impala.test", vector, CONN_ERR)
+    self._shell_smoke("ip6.impala.test", vector)
+    self._shell_smoke("ip46.impala.test", vector)
+
+
[email protected]_args(impalad_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
+                                               + IPV6_DUAL_HOSTNAME_QUERY_ARG
+                                               + SSL_WILDCARD_SAN_ARGS,
+                                  
statestored_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
+                                                  + SSL_WILDCARD_SAN_ARGS,
+                                  
catalogd_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
+                                                + SSL_WILDCARD_SAN_ARGS)
+class TestIPv6DualSsl(TestIPv6Base):
+  ca_cert = WILDCARD_CA_CERT_PATH
+
+  @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
+  def test_ipv6_dual_ssl(self, vector):
+    self.check_connections()
+    for port in WEBUI_PORTS:
+      self._webui_smoke("https://127.0.0.1:%d"; % port, WEB_CERT_ERR)
+      self._webui_smoke("https://[::1]:%d"; % port, WEB_CERT_ERR)
+      self._webui_smoke("https://ip4.impala.test:%d"; % port)
+      self._webui_smoke("https://ip6.impala.test:%d"; % port)
+      self._webui_smoke("https://ip46.impala.test:%d"; % port)
+
+    self._smoke("[::1]", vector, CONN_ERR)
+    self._smoke("127.0.0.1", vector, CONN_ERR)
+    self._smoke("ip4.impala.test", vector)
+    self._smoke("ip6.impala.test", vector)
+    self._smoke("ip46.impala.test", vector)
+
+    self._shell_smoke("[::1]", vector, CERT_ERR)
+    self._shell_smoke("127.0.0.1", vector, CERT_ERR)
+    self._shell_smoke("ip4.impala.test", vector)
+    self._shell_smoke("ip6.impala.test", vector)
+    self._shell_smoke("ip46.impala.test", vector)
+
+
[email protected]_args(impalad_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
+                                               + IPV6_ONLY_HOSTNAME_QUERY_ARG
+                                               + SSL_WILDCARD_SAN_ARGS,
+                                 
statestored_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
+                                                  + SSL_WILDCARD_SAN_ARGS,
+                                 catalogd_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
+                                               + SSL_WILDCARD_SAN_ARGS)
+class TestIPv6OnlySsl(TestIPv6Base):
+  ca_cert = WILDCARD_CA_CERT_PATH
+
+  @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
+  def test_ipv6_only_ssl(self, vector):
+    self.check_connections()
+    for port in WEBUI_PORTS:
+      self._webui_smoke("https://127.0.0.1:%d"; % port, WEB_CERT_ERR)
+      self._webui_smoke("https://[::1]:%d"; % port, WEB_CERT_ERR)
+      self._webui_smoke("https://ip4.impala.test:%d"; % port, "Connection 
refused")
+      self._webui_smoke("https://ip6.impala.test:%d"; % port)
+      self._webui_smoke("https://ip46.impala.test:%d"; % port)
+
+    self._smoke("[::1]", vector, CONN_ERR)
+    self._smoke("127.0.0.1", vector, CONN_ERR)
+    self._smoke("ip4.impala.test", vector, CONN_ERR)
+    self._smoke("ip6.impala.test", vector)
+    self._smoke("ip46.impala.test", vector)
+
+    self._shell_smoke("[::1]", vector, CERT_ERR)
+    self._shell_smoke("127.0.0.1", vector, CONN_ERR)
+    self._shell_smoke("ip4.impala.test", vector, CONN_ERR)
+    self._shell_smoke("ip6.impala.test", vector)
+    self._shell_smoke("ip46.impala.test", vector)
diff --git a/tests/custom_cluster/test_redaction.py 
b/tests/custom_cluster/test_redaction.py
index 7e4b2aad4..0ca333206 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -39,6 +39,10 @@ class TestRedaction(CustomClusterTestSuite):
      limited to table data and query text since queries may refer to table 
data.
   '''
 
+  @classmethod
+  def setup_class(cls):
+    super(TestRedaction, cls).setup_class()
+
   @property
   def log_dir(self):
     return os.path.join(self.tmp_dir, "logs")
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index 1b05c146a..767833304 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -35,10 +35,14 @@ PROCESSING_TIMEOUT_S = 10
 LOG = logging.getLogger(__name__)
 
 @SkipIfFS.hive
-class TestEventProcessing(ImpalaTestSuite):
+class TestEventProcessing(TestEventProcessingBase):
   """This class contains tests that exercise the event processing mechanism in 
the
   catalog."""
 
+  @classmethod
+  def setup_class(cls):
+    super(TestEventProcessing, cls).setup_class()
+
   @classmethod
   def default_test_protocol(cls):
     return HS2
@@ -47,13 +51,13 @@ class TestEventProcessing(ImpalaTestSuite):
   def test_transactional_insert_events(self, unique_database):
     """Executes 'run_test_insert_events' for transactional tables.
     """
-    TestEventProcessingBase._run_test_insert_events_impl(
+    TestEventProcessingBase._run_test_insert_events_impl(self,
         unique_database, is_transactional=True)
 
   def test_insert_events(self, unique_database):
     """Executes 'run_test_insert_events' for non-transactional tables.
     """
-    TestEventProcessingBase._run_test_insert_events_impl(unique_database)
+    TestEventProcessingBase._run_test_insert_events_impl(self, unique_database)
 
   def test_iceberg_inserts(self):
     """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event 
generation
@@ -99,13 +103,12 @@ class TestEventProcessing(ImpalaTestSuite):
     self._run_test_empty_partition_events(unique_database, False)
 
   def test_event_based_replication(self):
-    TestEventProcessingBase._run_event_based_replication_tests_impl(
+    self._run_event_based_replication_tests_impl(self,
         self.filesystem_client)
 
   def _run_test_empty_partition_events(self, unique_database, 
is_transactional):
     test_tbl = unique_database + ".test_events"
-    TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
-      is_transactional)
+    TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional)
     self.run_stmt_in_hive("create table {0} (key string, value string) \
       partitioned by (year int) stored as parquet {1}".format(test_tbl, 
TBLPROPERTIES))
     self.client.set_configuration({
diff --git a/tests/metadata/test_event_processing_base.py 
b/tests/metadata/test_event_processing_base.py
index 97725a2c0..baafd5358 100644
--- a/tests/metadata/test_event_processing_base.py
+++ b/tests/metadata/test_event_processing_base.py
@@ -27,17 +27,22 @@ EVENT_SYNC_QUERY_OPTIONS = {
 class TestEventProcessingBase(ImpalaTestSuite):
 
   @classmethod
-  def _run_test_insert_events_impl(cls, unique_database, 
is_transactional=False):
+  def setup_class(cls):
+    super(TestEventProcessingBase, cls).setup_class()
+
+  @classmethod
+  def _run_test_insert_events_impl(cls, suite, unique_database, 
is_transactional=False):
     """Test for insert event processing. Events are created in Hive and 
processed in
     Impala. The following cases are tested :
     Insert into table --> for partitioned and non-partitioned table
     Insert overwrite table --> for partitioned and non-partitioned table
     Insert into partition --> for partitioned table
     """
-    with cls.create_impala_client() as impala_client:
+    # TODO: change into an instance method and remove argument "suite" 
(IMPALA-14174)
+    with suite.create_impala_client() as impala_client:
       # Test table with no partitions.
       tbl_insert_nopart = 'tbl_insert_nopart'
-      cls.run_stmt_in_hive(
+      suite.run_stmt_in_hive(
         "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
       tblproperties = ""
       if is_transactional:
@@ -141,16 +146,18 @@ class TestEventProcessingBase(ImpalaTestSuite):
       assert len(result.data) == 0
 
   @classmethod
-  def _run_event_based_replication_tests_impl(cls, filesystem_client, 
transactional=True):
+  def _run_event_based_replication_tests_impl(cls, suite,
+                                              filesystem_client, 
transactional=True):
     """Hive Replication relies on the insert events generated on the tables.
     This test issues some basic replication commands from Hive and makes sure
     that the replicated table has correct data."""
+    # TODO: change into an instance method and remove argument "suite" 
(IMPALA-14174)
     TBLPROPERTIES = cls._get_transactional_tblproperties(transactional)
     source_db = ImpalaTestSuite.get_random_name("repl_source_")
     target_db = ImpalaTestSuite.get_random_name("repl_target_")
     unpartitioned_tbl = "unpart_tbl"
     partitioned_tbl = "part_tbl"
-    impala_client = cls.create_impala_client()
+    impala_client = suite.create_impala_client()
     try:
       cls.run_stmt_in_hive("create database {0}".format(source_db))
       cls.run_stmt_in_hive(
diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py
index 5809cae06..93f9d26a8 100644
--- a/tests/stress/test_acid_stress.py
+++ b/tests/stress/test_acid_stress.py
@@ -93,7 +93,7 @@ class TestAcidInsertsBasic(TestAcidStress):
   def _impala_role_write_inserts(self, tbl_name, partitioned):
     """INSERT INTO/OVERWRITE a table several times from Impala."""
     try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
+      impalad_client = self.create_impala_client()
       part_expr = "partition (p=1)" if partitioned else ""
       for run in range(0, NUM_OVERWRITES + 1):
         OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
@@ -109,7 +109,7 @@ class TestAcidInsertsBasic(TestAcidStress):
   def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
     """SELECT from a table many times until the expected final values are 
found."""
     try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
+      impalad_client = self.create_impala_client()
       expected_result = {"run": -1, "i": 0}
       accept_empty_table = True
       while expected_result["run"] != NUM_OVERWRITES and \
@@ -182,7 +182,7 @@ class TestAcidInsertsBasic(TestAcidStress):
   def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, 
sleep_sec):
     insert_op = "OVERWRITE" if is_overwrite else "INTO"
     try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
+      impalad_client = self.create_impala_client()
       impalad_client.execute(
           """insert {op} table {tbl_name} partition({partition})
              select sleep({sleep_ms})""".format(op=insert_op, 
tbl_name=tbl_name,
diff --git a/tests/stress/test_insert_stress.py 
b/tests/stress/test_insert_stress.py
index 5653cc9eb..235c92416 100644
--- a/tests/stress/test_insert_stress.py
+++ b/tests/stress/test_insert_stress.py
@@ -36,6 +36,10 @@ class TestInsertStress(ImpalaTestSuite):
   def get_workload(self):
     return 'targeted-stress'
 
+  @classmethod
+  def setup_class(cls):
+    super(TestInsertStress, cls).setup_class()
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestInsertStress, cls).add_test_dimensions()
@@ -46,8 +50,8 @@ class TestInsertStress(ImpalaTestSuite):
   def _impala_role_concurrent_writer(self, tbl_name, wid, num_inserts, 
counter):
     """Writes ascending numbers up to 'num_inserts' into column 'i'. To column 
'wid' it
     writes its identifier passed in parameter 'wid'."""
-    target_impalad = wid % ImpalaTestSuite.get_impalad_cluster_size()
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = wid % self.get_impalad_cluster_size()
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     try:
       insert_cnt = 0
       while insert_cnt < num_inserts:
@@ -72,8 +76,8 @@ class TestInsertStress(ImpalaTestSuite):
         assert sorted_run == list(range(sorted_run[0], sorted_run[-1] + 1)), \
           "wid: %d" % wid
 
-    target_impalad = cid % ImpalaTestSuite.get_impalad_cluster_size()
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = cid % self.get_impalad_cluster_size()
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     try:
       while counter.value != writers:
         result = impalad_client.execute("select * from %s" % tbl_name)
diff --git a/tests/stress/test_merge_stress.py 
b/tests/stress/test_merge_stress.py
index e948aa650..7bd34cb23 100644
--- a/tests/stress/test_merge_stress.py
+++ b/tests/stress/test_merge_stress.py
@@ -41,8 +41,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
 
   def _impala_role_concurrent_updater(self, tbl_name, col, num_writes):
     """Increments values in column 'total' and in the column which is passed 
in 'col'."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     merge_stmt = """merge into {0} target using
         {0} source on source.total = target.total
         when matched then update set
@@ -61,8 +61,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
 
   def _impala_role_concurrent_writer(self, tbl_name, num_inserts):
     """Adds a new row based on the maximum 'total' value."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     merge_stmt = """merge into {0} target using
         (select total, a, b, c from {0} order by total desc limit 1) source
         on source.total +1 = target.total
@@ -92,8 +92,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
         assert total == a + b + c
       return max_total
 
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     total = 0
     while total < target_total:
       result = impalad_client.execute("select * from %s" % tbl_name)
diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index 06579f7b8..5d76451af 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -82,8 +82,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
 
   def _impala_role_concurrent_writer(self, tbl_name, col, num_updates):
     """Increments values in column 'total' and in the column which is passed 
in 'col'."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     update_cnt = 0
     while update_cnt < num_updates:
       try:
@@ -107,8 +107,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
       assert total == a + b + c
       return total
 
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     total = 0
     while total < target_total:
       result = impalad_client.execute("select * from %s" % tbl_name)
@@ -163,8 +163,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
   def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, 
num_rows):
     """Deletes every row from the table one by one."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     impalad_client.set_configuration_option("SYNC_DDL", "true")
     i = 0
     while i < num_rows:
@@ -181,8 +181,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
   def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted):
     """Updates every row in the table in a loop."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     impalad_client.set_configuration_option("SYNC_DDL", "true")
     while all_rows_deleted.value != 1:
       try:
@@ -196,8 +196,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
   def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted):
     """Optimizes the table in a loop."""
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     impalad_client.set_configuration_option("SYNC_DDL", "true")
     while all_rows_deleted.value != 1:
       try:
@@ -253,8 +253,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
         prev_j = j
       assert prev_id == num_rows - 1
 
-    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
-    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
+    impalad_client = self.create_client_for_nth_impalad(target_impalad)
     while all_rows_deleted.value != 1:
       result = impalad_client.execute("select * from %s order by id" % 
tbl_name)
       verify_result_set(result)

Reply via email to