This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit daaf73a7c28fe992f176261a8adf249750e8b157
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Jan 16 19:42:51 2025 -0800

    IMPALA-13682: Implement missing capabilities in ImpylaHS2Connection
    
    This patch implements 'wait_for_finished_timeout',
    'wait_for_admission_control', and 'get_admission_result' for
    ImpylaHS2Client.
    
    This patch also changes the behavior of ImpylaHS2Connection to produce
    several extra cursors aside from self.__cursor for 'execute' call that
    supplies user argument and each 'execute_async' to make issuing multiple
    concurrent queries possible. Note that each HS2 cursor opens its own HS2
    Session. Therefore, this patch breaks the assumption that an
    ImpylaHS2Connection is always under a single HS2 Session (see HIVE-11402
    and HIVE-14247 on why concurrent query with shared HS2 Session is
    problematic). However, they do share the same query options stored at
    self.__query_options. In practice, most Impala tests do not care about
    running concurrent queries under a single HS2 session but only require
    them to use the same query options.
    
    The following additions are new for both BeeswaxConnection and
    ImpylaHS2Connection:
    - Add method 'log_client' for convenience.
    - Implement consistent query state mapping and checking through several
      accessor methods.
    - Add methods 'wait_for_impala_state' and 'wait_for_any_impala_state'
      that use 'get_impala_exec_state' method internally.
    - Add 'fetch_profile_after_close' parameter to 'close_query' method. If
      True, 'close_query' will return the query profile after closing the
      query.
    - Add 'discard_results' parameter for 'fetch' method. This can save time
      parsing results if the test does not care about the result.
    
    Reuse existing op_handle_to_query_id and add new
    session_handle_to_session_id to parse HS2
    TOperationHandle.operationId.guid and TSessionHandle.sessionId.guid
    respectively.
    
    To show that ImpylaHS2Connection is on par with BeeswaxConnection, this
    patch refactors test_admission_controller.py to test using HS2 protocol
    by default. Test that does raw HS2 RPC (require capabilities from
    HS2TestSuite) is separated out into a new TestAdmissionControllerRawHS2
    class and stays using beeswax protocol by default. All calls to
    copy.copy is replaced with copy.deepcopy for safety.
    
    Testing:
    - Pass exhaustive tests.
    
    Change-Id: I9ac07732424c16338e060c9392100b54337f11b8
    Reviewed-on: http://gerrit.cloudera.org:8080/22362
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/client-request-state.h             |   5 +
 tests/beeswax/impala_beeswax.py                   |  16 +-
 tests/common/impala_connection.py                 | 490 +++++++++++++++++-----
 tests/common/impala_service.py                    |  20 +
 tests/common/impala_test_suite.py                 |  12 +-
 tests/custom_cluster/test_admission_controller.py | 430 ++++++++++---------
 tests/custom_cluster/test_session_expiration.py   |   2 -
 tests/util/thrift_util.py                         |  10 +-
 8 files changed, 680 insertions(+), 305 deletions(-)

diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 8a8543295..347f8cc9f 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -82,6 +82,11 @@ class ClientRequestState {
 
   ~ClientRequestState();
 
+  /// Enums representing query exection state internal to impala.
+  /// Mapping to client protocol-specific states are provided by methods such 
as
+  /// BeeswaxQueryState() for Beeswax and TOperationState() for HS2.
+  /// If updating this enum, please also update mappings at
+  /// tests/common/impala_connection.py
   enum class ExecState { INITIALIZED, PENDING, RUNNING, FINISHED, ERROR };
 
   enum class RetryState { RETRYING, RETRIED, NOT_RETRIED };
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 5823c00ec..fb74146f2 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -224,11 +224,11 @@ class ImpalaBeeswaxClient(object):
       if not fetch_profile_after_close:
         result.runtime_profile = self.get_runtime_profile(handle)
 
-      self.close_query(handle)
+      profile_after_close = self.close_query(handle, fetch_profile_after_close)
 
       if fetch_profile_after_close:
-        # Fetch the profile again after the query has closed and the profile 
is complete.
-        result.runtime_profile = self.get_runtime_profile(handle)
+        # Attach profile that is obtained after query closed.
+        result.runtime_profile = profile_after_close
 
     return result
 
@@ -292,8 +292,11 @@ class ImpalaBeeswaxClient(object):
   def cancel_query(self, query_id):
     return self.__do_rpc(lambda: self.imp_service.Cancel(query_id))
 
-  def close_query(self, handle):
+  def close_query(self, handle, fetch_profile_after_close=False):
     self.__do_rpc(lambda: self.imp_service.close(handle))
+    if fetch_profile_after_close:
+      return self.get_runtime_profile(handle)
+    return None
 
   def wait_for_finished(self, query_handle):
     """Given a query handle, polls the coordinator waiting for the query to 
transition to
@@ -371,7 +374,8 @@ class ImpalaBeeswaxClient(object):
   def get_log(self, query_handle):
     return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
 
-  def fetch_results(self, query_string, query_handle, max_rows=-1):
+  def fetch_results(self, query_string, query_handle, max_rows=-1,
+                    discard_results=False):
     """Fetches query results given a handle and query type (insert, use, 
other)"""
     query_type = self.__get_query_type(query_string)
     if query_type == 'use':
@@ -382,6 +386,8 @@ class ImpalaBeeswaxClient(object):
 
     # Result fetching for insert is different from other queries.
     exec_result = None
+    if discard_results:
+      return exec_result
     if query_type == 'insert':
       exec_result = self.__fetch_insert_results(query_handle)
     else:
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 4b643573c..789809dc1 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -21,15 +21,24 @@
 
 from __future__ import absolute_import, division, print_function
 import abc
-import codecs
 from future.utils import with_metaclass
 import logging
 import re
+import time
 
+from beeswaxd.BeeswaxService import QueryState
 import impala.dbapi as impyla
+import impala.error as impyla_error
 import tests.common
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
+from tests.beeswax.impala_beeswax import (
+  DEFAULT_SLEEP_INTERVAL,
+  ImpalaBeeswaxClient,
+  ImpalaBeeswaxException)
+from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
+from tests.util.thrift_util import (
+  op_handle_to_query_id,
+  session_handle_to_session_id)
 
 
 LOG = logging.getLogger(__name__)
@@ -46,6 +55,36 @@ PROGRESS_LOG_RE = re.compile(
 
 MAX_SQL_LOGGING_LENGTH = 128 * 1024
 
+# Tuple of root exception types from different client protocol.
+IMPALA_CONNECTION_EXCEPTION = (ImpalaBeeswaxException, impyla_error.Error)
+
+# String representation of ClientRequestState::ExecState
+INITIALIZED = 'INITIALIZED'
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+FINISHED = 'FINISHED'
+ERROR = 'ERROR'
+# ExecState that is final.
+EXEC_STATES_FINAL = set([FINISHED, ERROR])
+# Possible ExecState after query passed admission controller.
+EXEC_STATES_ADMITTED = set([RUNNING, FINISHED, ERROR])
+# Mapping of a ExecState to a set of possible future ExecState.
+LEGAL_FUTURE_STATES = {
+  INITIALIZED: set([PENDING, RUNNING, FINISHED, ERROR]),
+  PENDING: set([RUNNING, FINISHED, ERROR]),
+  RUNNING: set([FINISHED, ERROR]),
+  FINISHED: set([ERROR]),
+  ERROR: set()
+}
+
+
+def has_legal_future_state(impala_state, future_states):
+  """Return True if 'impala_state' can transition to one of state listed in
+  'future_states'."""
+  assert impala_state in LEGAL_FUTURE_STATES
+  expected_impala_states = set(future_states)
+  return len(LEGAL_FUTURE_STATES[impala_state] & expected_impala_states) > 0
+
 
 # test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs). 
It is the
 # only test that runs SQL larger than 128KB. Logging these SQLs in execute() 
increases
@@ -149,20 +188,65 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     pass
 
   @abc.abstractmethod
-  def close_query(self, handle):
+  def close_query(self, handle, fetch_profile_after_close=False):
     """Closes the query."""
     pass
 
   @abc.abstractmethod
   def get_state(self, operation_handle):
-    """Returns the state of a query"""
+    """Returns the state of a query.
+    May raise en error, depending on connection type."""
     pass
 
   @abc.abstractmethod
-  def state_is_finished(self, operation_handle):
-    """Returns whether the state of a query is finished"""
+  def get_impala_exec_state(self, operation_handle):
+    """Returns a string translation from client specific state of 
operation_handle
+    to Impala's ClientRequestState::ExecState."""
     pass
 
+  def __is_at_exec_state(self, operation_handle, impala_state):
+    self.log_handle(
+      operation_handle, 'checking ' + impala_state + ' state for operation')
+    return self.get_impala_exec_state(operation_handle) == impala_state
+
+  def state_is_finished(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is FINISHED.
+    DEPRECATED: use is_finished() instead."""
+    return self.is_finished(operation_handle)
+
+  def is_initialized(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is 
INITIALIZED"""
+    return self.__is_at_exec_state(operation_handle, INITIALIZED)
+
+  def is_pending(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is 
PENDING"""
+    return self.__is_at_exec_state(operation_handle, PENDING)
+
+  def is_running(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is 
RUNNING"""
+    return self.__is_at_exec_state(operation_handle, RUNNING)
+
+  def is_finished(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is 
FINISHED"""
+    return self.__is_at_exec_state(operation_handle, FINISHED)
+
+  def is_error(self, operation_handle):
+    """Returns whether the Impala exec state of a operation_handle is ERROR.
+    Internally, it will call get_state(), and any exception thrown by 
get_state() will
+    cause this method to return True."""
+    return self.__is_at_exec_state(operation_handle, ERROR)
+
+  def is_executing(self, operation_handle):
+    """Returns whether the state of a operation_handle is executing or will be
+    executing. Return False if operation_handle has ended, either successful or
+    with error."""
+    return self.get_impala_exec_state(operation_handle) not in 
EXEC_STATES_FINAL
+
+  def is_admitted(self, operation_handle):
+    """Returns whether the state of a operation_handle has passed Impala
+    admission control. Return True if handle state is error."""
+    return self.get_impala_exec_state(operation_handle) in EXEC_STATES_ADMITTED
+
   @abc.abstractmethod
   def get_log(self, operation_handle):
     """Returns the log of an operation as a string, with entries separated by 
newlines."""
@@ -185,10 +269,19 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     pass
 
   @abc.abstractmethod
-  def fetch(self, sql_stmt, operation_handle, max_rows=-1):
+  def fetch(self, sql_stmt, operation_handle, max_rows=-1, 
discard_results=False):
     """Fetches query results up to max_rows given a handle and sql statement.
     If max_rows < 0, all rows are fetched. If max_rows > 0 but the number of
-    rows returned is less than max_rows, all the rows have been fetched."""
+    rows returned is less than max_rows, all the rows have been fetched.
+    Return None if discard_results is True.
+    TODO: 'sql_stmt' can be obtained from 'operation_handle'."""
+    pass
+
+  @abc.abstractmethod
+  def get_runtime_profile(self, operation_handle,
+                          profile_format=TRuntimeProfileFormat.STRING):
+    """Get runtime profile of given 'operation_handle'.
+    Handle must stay open."""
     pass
 
   @abc.abstractmethod
@@ -198,14 +291,85 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     Otherwise, return str(operation_handle)."""
     pass
 
-  @abc.abstractmethod
   def log_handle(self, operation_handle, message):
     """Log 'message' at INFO level, along with id of 'operation_handle'."""
+    handle_id = self.handle_id(operation_handle)
+    LOG.info("-- {0}: {1}".format(handle_id, message))
+
+  def log_client(self, message):
+    """Log 'message' at INFO level."""
+    LOG.info("-- {0}".format(message))
+
+  def wait_for_impala_state(self, operation_handle, expected_impala_state, 
timeout):
+    """Waits for the given 'operation_handle' to reach the 
'expected_impala_state'.
+    'expected_impala_state' must be a string of either 'INITIALIZED', 
'PENDING',
+    'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach the given state 
within
+    'timeout' seconds, the method throws an AssertionError.
+    """
+    self.wait_for_any_impala_state(operation_handle, [expected_impala_state], 
timeout)
+
+  def wait_for_any_impala_state(self, operation_handle, expected_impala_states,
+                                timeout_s):
+    """Waits for the given 'operation_handle' to reach one of 
'expected_impala_states'.
+    Each string in 'expected_impala_states' must either be 'INITIALIZED', 
'PENDING',
+    'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach one of the given 
states
+    within 'timeout' seconds, the method throws an AssertionError.
+    Returns the final state.
+    """
+    start_time = time.time()
+    timeout_msg = None
+    while True:
+      impala_state = self.get_impala_exec_state(operation_handle)
+      interval = time.time() - start_time
+      if impala_state in expected_impala_states:
+        # Reached one of expected_impala_states.
+        break
+      elif not has_legal_future_state(impala_state, expected_impala_states):
+        timeout_msg = ("query '{0}' can not transition from last known state 
'{1}' to "
+                       "any of the expected states {2}. Stop waiting after {3} 
"
+                       "seconds.").format(
+          self.handle_id(operation_handle), impala_state, 
expected_impala_states,
+          interval)
+        break
+      elif interval >= timeout_s:
+        timeout_msg = ("query '{0}' did not reach one of the expected states 
{1}, last "
+                       "known state {2}").format(
+          self.handle_id(operation_handle), expected_impala_states, 
impala_state)
+        break
+      time.sleep(DEFAULT_SLEEP_INTERVAL)
+
+    if timeout_msg is not None:
+      raise tests.common.errors.Timeout(timeout_msg)
+    return impala_state
+
+  @abc.abstractmethod
+  def wait_for_admission_control(self, operation_handle, timeout_s=60):
+    """Given an 'operation_handle', polls the coordinator waiting for it to 
complete
+    admission control processing of the query.
+    Return True if query pass admission control after given 'timeout_s'."""
+    pass
+
+  @abc.abstractmethod
+  def get_admission_result(self, operation_handle):
+    """Given an 'operation_handle', returns the admission result from the query
+    profile"""
     pass
 
 
 # Represents a connection to Impala using the Beeswax API.
 class BeeswaxConnection(ImpalaConnection):
+
+  # This is based on ClientRequestState::BeeswaxQueryState().
+  __QUERY_STATE_TO_EXEC_STATE = {
+    QueryState.CREATED: INITIALIZED,
+    QueryState.COMPILED: PENDING,
+    QueryState.RUNNING: RUNNING,
+    QueryState.FINISHED: FINISHED,
+    QueryState.EXCEPTION: ERROR,
+    # These are not official ExecState, but added to complete mapping.
+    QueryState.INITIALIZED: 'UNIMPLEMENTED_INITIALIZED',
+  }
+
   def __init__(self, host_port, use_kerberos=False, user=None, password=None,
                use_ssl=False):
     self.__beeswax_client = ImpalaBeeswaxClient(host_port, use_kerberos, 
user=user,
@@ -217,7 +381,7 @@ class BeeswaxConnection(ImpalaConnection):
     self.QUERY_STATES = self.__beeswax_client.query_states
 
   def get_test_protocol(self):
-    return 'beeswax'
+    return BEESWAX
 
   def get_host_port(self):
     return self.__host_port
@@ -250,30 +414,35 @@ class BeeswaxConnection(ImpalaConnection):
       self.set_configuration_option("client_identifier", 
tests.common.current_node)
 
   def connect(self):
-    LOG.info("-- connecting to: %s" % self.__host_port)
-    self.__beeswax_client.connect()
+    try:
+      self.__beeswax_client.connect()
+      self.log_client("connected to %s with beeswax" % self.__host_port)
+    except Exception as e:
+      self.log_client("failed connecting to %s with beeswax" % 
self.__host_port)
+      raise e
 
   # TODO: rename to close_connection
   def close(self):
-    LOG.info("-- closing connection to: %s" % self.__host_port)
+    self.log_client("closing beeswax connection to: %s" % self.__host_port)
     self.__beeswax_client.close_connection()
 
-  def close_query(self, operation_handle):
+  def close_query(self, operation_handle, fetch_profile_after_close=False):
     self.log_handle(operation_handle, 'closing query for operation')
-    self.__beeswax_client.close_query(operation_handle.get_handle())
+    return self.__beeswax_client.close_query(operation_handle.get_handle(),
+                                             fetch_profile_after_close)
 
   def close_dml(self, operation_handle):
     self.log_handle(operation_handle, 'closing DML query')
     self.__beeswax_client.close_dml(operation_handle.get_handle())
 
   def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
-    LOG.info("-- executing against %s\n" % (self.__host_port))
+    self.log_client("executing against %s\n" % (self.__host_port))
     log_sql_stmt(sql_stmt)
     return self.__beeswax_client.execute(sql_stmt, user=user,
         fetch_profile_after_close=fetch_profile_after_close)
 
   def execute_async(self, sql_stmt, user=None):
-    LOG.info("-- executing async: %s\n" % (self.__host_port))
+    self.log_client("executing async: %s\n" % (self.__host_port))
     log_sql_stmt(sql_stmt)
     beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt, 
user=user)
     return OperationHandle(beeswax_handle, sql_stmt)
@@ -286,15 +455,17 @@ class BeeswaxConnection(ImpalaConnection):
     self.log_handle(operation_handle, 'getting state')
     return self.__beeswax_client.get_state(operation_handle.get_handle())
 
-  def state_is_finished(self, operation_handle):
-    self.log_handle(operation_handle, 'checking finished state for operation')
-    return self.get_state(operation_handle) == self.QUERY_STATES["FINISHED"]
+  def get_impala_exec_state(self, operation_handle):
+    return self.__QUERY_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
 
   def get_exec_summary(self, operation_handle):
     self.log_handle(operation_handle, 'getting exec summary operation')
     return 
self.__beeswax_client.get_exec_summary(operation_handle.get_handle())
 
-  def get_runtime_profile(self, operation_handle):
+  def get_runtime_profile(self, operation_handle,
+                          profile_format=TRuntimeProfileFormat.STRING):
+    assert profile_format == TRuntimeProfileFormat.STRING, (
+      "Beeswax client only support getting runtime profile in STRING format.")
     self.log_handle(operation_handle, 'getting runtime profile operation')
     return 
self.__beeswax_client.get_runtime_profile(operation_handle.get_handle())
 
@@ -316,11 +487,11 @@ class BeeswaxConnection(ImpalaConnection):
     self.log_handle(operation_handle, 'getting log for operation')
     return 
self.__beeswax_client.get_log(operation_handle.get_handle().log_context)
 
-  def fetch(self, sql_stmt, operation_handle, max_rows=-1):
+  def fetch(self, sql_stmt, operation_handle, max_rows=-1, 
discard_results=False):
     self.log_handle(operation_handle, 'fetching {} rows'.format(
       'all' if max_rows < 0 else max_rows))
     return self.__beeswax_client.fetch_results(
-        sql_stmt, operation_handle.get_handle(), max_rows)
+        sql_stmt, operation_handle.get_handle(), max_rows, discard_results)
 
   def handle_id(self, operation_handle):
     query_id = operation_handle.get_handle().id
@@ -340,9 +511,23 @@ class ImpylaHS2Connection(ImpalaConnection):
   plus Impala-specific extensions, e.g. for fetching runtime profiles.
   TODO: implement support for kerberos, SSL, etc.
   """
+
+  # ClientRequestState::TOperationState()
+  __OPERATION_STATE_TO_EXEC_STATE = {
+    'INITIALIZED_STATE': INITIALIZED,
+    'PENDING_STATE': PENDING,
+    'RUNNING_STATE': RUNNING,
+    'FINISHED_STATE': FINISHED,
+    'ERROR_STATE': ERROR,
+    # These are not official ExecState, but added to complete mapping.
+    'CANCELED_STATE': 'UNIMPLEMENTED_CANCELLED',
+    'CLOSED_STATE': 'UNIMPLEMENTED_CLOSED',
+    'UKNOWN_STATE': 'UNIMPLEMENTED_UNKNOWN'
+  }
+
   def __init__(self, host_port, use_kerberos=False, is_hive=False,
                use_http_transport=False, http_path="", use_ssl=False,
-               collect_profile_and_log=True):
+               collect_profile_and_log=True, user=None):
     self.__host_port = host_port
     self.__use_http_transport = use_http_transport
     self.__http_path = http_path
@@ -353,28 +538,37 @@ class ImpylaHS2Connection(ImpalaConnection):
     # cursor for different operations (as opposed to creating a new cursor per 
operation)
     # so that the session is preserved. This means that we can only execute 
one operation
     # at a time per connection, which is a limitation also imposed by the 
Beeswax API.
+    # However, for ease of async query testing, opening multiple cursors 
through single
+    # ImpylaHS2Connection is allowed if executing query through 
execute_async() or
+    # execute() with user parameter that is different than self.__user. Do 
note though
+    # that they will not share the same session with self.__cursor.
     self.__impyla_conn = None
     self.__cursor = None
     # Default query option obtained from initial connect.
     # Query option names are in lower case for consistency.
     self.__default_query_options = {}
+    # List of all cursors that created through execute_async.
+    self.__async_cursors = list()
     # Query options to send along with each query.
     self.__query_options = {}
     self._is_hive = is_hive
     # Some Hive HS2 protocol, such as custom Calcite planner, may be able to 
collect
     # profile and log from Impala.
     self._collect_profile_and_log = collect_profile_and_log
+    self.__user = user
 
   def get_test_protocol(self):
     if self.__http_path:
-      return 'hs2-http'
+      return HS2_HTTP
     else:
-      return 'hs2'
+      return HS2
 
   def get_host_port(self):
     return self.__host_port
 
   def set_configuration_option(self, name, value):
+    # Only set the option if it's not already set to the same value.
+    # value must be parsed to string.
     name = name.lower()
     value = str(value)
     if self.__query_options.get(name) != value:
@@ -389,35 +583,53 @@ class ImpylaHS2Connection(ImpalaConnection):
     if hasattr(tests.common, "current_node") and not self._is_hive:
       self.set_configuration_option("client_identifier", 
tests.common.current_node)
 
+  def __open_single_cursor(self, user=None):
+    return self.__impyla_conn.cursor(user=user, convert_types=False,
+                                     close_finished_queries=False)
+
+  def __close_single_cursor(self, cursor):
+    try:
+      # Explicitly close the cursor so that it will close the session.
+      cursor.close()
+    except Exception:
+      # The session may no longer be valid if the impalad was restarted during 
the test.
+      pass
+
   def connect(self):
-    LOG.info("-- connecting to {0} with impyla".format(self.__host_port))
     host, port = self.__host_port.split(":")
     conn_kwargs = {}
     if self._is_hive:
       conn_kwargs['auth_mechanism'] = 'PLAIN'
-    self.__impyla_conn = impyla.connect(host=host, port=int(port),
-                                        
use_http_transport=self.__use_http_transport,
-                                        http_path=self.__http_path,
-                                        use_ssl=self.__use_ssl, **conn_kwargs)
-    # Get the default query options for the session before any modifications 
are made.
-    self.__cursor = \
-        self.__impyla_conn.cursor(convert_types=False, 
close_finished_queries=False)
-    self.__default_query_options = {}
-    if not self._is_hive:
-      self.__cursor.execute("set all")
-      for name, val, kind in self.__cursor:
-        collect_default_query_options(self.__default_query_options, name, val, 
kind)
-      self.__cursor.close_operation()
+    try:
+      self.__impyla_conn = impyla.connect(
+        host=host, port=int(port), 
use_http_transport=self.__use_http_transport,
+        http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs)
+      self.__cursor = self.__open_single_cursor(user=self.__user)
+      # Get the default query options for the session before any modifications 
are made.
+      self.__default_query_options = {}
+      if not self._is_hive:
+        self.__cursor.execute("set all")
+        for name, val, kind in self.__cursor:
+          collect_default_query_options(self.__default_query_options, name, 
val, kind)
+        self.__cursor.close_operation()
       LOG.debug("Default query options: 
{0}".format(self.__default_query_options))
+      self.log_client("connected to {0} with impyla {1} session {2}".format(
+        self.__host_port, self.get_test_protocol(), 
self.__get_session_id(self.__cursor)
+      ))
+    except Exception as e:
+      self.log_client("failed connecting to {0} with impyla {1}".format(
+        self.__host_port, self.get_test_protocol()
+      ))
+      raise e
 
   def close(self):
-    LOG.info("-- closing connection to: {0}".format(self.__host_port))
-    try:
-      # Explicitly close the cursor so that it will close the session.
-      self.__cursor.close()
-    except Exception:
-      # The session may no longer be valid if the impalad was restarted during 
the test.
-      pass
+    self.log_client("closing 1 sync and {0} async {1} connections to: 
{2}".format(
+      len(self.__async_cursors), self.get_test_protocol(), self.__host_port))
+    self.__close_single_cursor(self.__cursor)
+    for async_cursor in self.__async_cursors:
+      self.__close_single_cursor(async_cursor)
+    # Remove all async cursors.
+    self.__async_cursors = list()
     try:
       self.__impyla_conn.close()
     except AttributeError as e:
@@ -430,59 +642,90 @@ class ImpylaHS2Connection(ImpalaConnection):
     """Trigger the GetTables() HS2 request on the given database (None means 
all dbs).
     Returns a list of (catalogName, dbName, tableName, tableType, 
tableComment).
     """
-    LOG.info("-- getting tables for database: {0}".format(database))
+    self.log_client("getting tables for database: {0}".format(database))
     self.__cursor.get_tables(database_name=database)
     return self.__cursor.fetchall()
 
-  def close_query(self, operation_handle):
+  def close_query(self, operation_handle, fetch_profile_after_close=False):
     self.log_handle(operation_handle, 'closing query for operation')
+    # close_operation() will wipe out _last_operation.
+    # Assign it to op_handle so that we can pull the profile after 
close_operation().
+    op_handle = operation_handle.get_handle()._last_operation
     operation_handle.get_handle().close_operation()
+    if fetch_profile_after_close:
+      assert self._collect_profile_and_log, (
+        "This connection is not configured to collect profile.")
+      return op_handle.get_profile(TRuntimeProfileFormat.STRING)
+    return None
+
+  def __log_execute(self, cursor, user):
+    self.log_client(
+      "executing against {0} at {1}. session: {2} main_cursor: {3} user: 
{4}\n".format(
+        (self._is_hive and 'Hive' or 'Impala'), self.__host_port,
+        self.__get_session_id(cursor), (cursor == self.__cursor), user)
+    )
 
   def execute(self, sql_stmt, user=None, 
profile_format=TRuntimeProfileFormat.STRING,
       fetch_profile_after_close=False):
-    LOG.info("-- executing against {0} at {1}\n".format(
-      self._is_hive and 'Hive' or 'Impala', self.__host_port))
     log_sql_stmt(sql_stmt)
-    self.__cursor.execute(sql_stmt, configuration=self.__query_options)
-    handle = OperationHandle(self.__cursor, sql_stmt)
+    cursor = self.__cursor
+    result = None
+    try:
+      if user != self.__user:
+        # Must create a new cursor to supply 'user'.
+        cursor = self.__open_single_cursor(user=user)
+      self.__log_execute(cursor, user)
+      cursor.execute(sql_stmt, configuration=self.__query_options)
+      handle = OperationHandle(cursor, sql_stmt)
+      self.log_handle(handle, "started query in session {0}".format(
+        self.__get_session_id(cursor)))
+      result = self.__fetch_results_and_profile(
+        handle, profile_format=profile_format,
+        fetch_profile_after_close=fetch_profile_after_close)
+    finally:
+      cursor.close_operation()
+      if cursor != self.__cursor:
+        self.__close_single_cursor(cursor)
+    return result
 
+  def __fetch_results_and_profile(
+      self, operation_handle, profile_format=TRuntimeProfileFormat.STRING,
+      fetch_profile_after_close=False):
     r = None
     try:
-      r = self.__fetch_results(handle, profile_format=profile_format)
+      r = self.__fetch_results(operation_handle, profile_format=profile_format)
     finally:
       if r is None:
         # Try to close the query handle but ignore any exceptions not to 
replace the
         # original exception raised by '__fetch_results'.
         try:
-          self.close_query(handle)
+          self.close_query(operation_handle)
         except Exception:
           pass
       elif fetch_profile_after_close:
-        op_handle = handle.get_handle()._last_operation
-        self.close_query(handle)
-
         # Match ImpalaBeeswaxResult by placing the full profile including end 
time and
         # duration into the return object.
-        r.runtime_profile = op_handle.get_profile(profile_format)
+        r.runtime_profile = self.close_query(operation_handle, 
fetch_profile_after_close)
         return r
       else:
-        self.close_query(handle)
+        self.close_query(operation_handle)
         return r
 
   def execute_async(self, sql_stmt, user=None):
-    LOG.info("-- executing against {0} at {1}\n".format(
-        self._is_hive and 'Hive' or 'Impala', self.__host_port))
-    log_sql_stmt(sql_stmt)
-    if user is not None:
-      raise NotImplementedError("Not yet implemented for HS2 - authentication")
+    async_cursor = None
     try:
-      self.__cursor.execute_async(sql_stmt, configuration=self.__query_options)
-      handle = OperationHandle(self.__cursor, sql_stmt)
-      LOG.info("Started query {0}".format(self.get_query_id(handle)))
+      async_cursor = self.__open_single_cursor(user=user)
+      handle = OperationHandle(async_cursor, sql_stmt)
+      self.__log_execute(async_cursor, user)
+      log_sql_stmt(sql_stmt)
+      async_cursor.execute_async(sql_stmt, configuration=self.__query_options)
+      self.__async_cursors.append(async_cursor)
       return handle
-    except Exception:
-      self.__cursor.close_operation()
-      raise
+    except Exception as e:
+      if async_cursor:
+        async_cursor.close_operation()
+        self.__close_single_cursor(async_cursor)
+      raise e
 
   def cancel(self, operation_handle):
     self.log_handle(operation_handle, 'canceling operation')
@@ -492,34 +735,38 @@ class ImpylaHS2Connection(ImpalaConnection):
   def get_query_id(self, operation_handle):
     """Return the string representation of the query id.
     Return empty string if handle is already canceled or closed."""
+    id = None
     last_op = operation_handle.get_handle()._last_operation
-    if last_op is None:
-      return ""
-    guid_bytes = last_op.handle.operationId.guid
-    # hex_codec works on bytes, so this needs to a decode() to get back to a 
string
-    hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode()
-    lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
-    return "{0}:{1}".format(hi_str, lo_str)
+    if last_op is not None:
+      id = op_handle_to_query_id(last_op.handle)
+    return "" if id is None else id
+
+  def __get_session_id(self, cursor):
+    """Return the string representation of the session id.
+    Return empty string if handle is already canceled or closed."""
+    id = None
+    if cursor.session is not None:
+      id = session_handle_to_session_id(cursor.session.handle)
+    return "" if id is None else id
 
   def handle_id(self, operation_handle):
     query_id = self.get_query_id(operation_handle)
     return query_id if query_id else str(operation_handle)
 
-  def log_handle(self, operation_handle, message):
-    handle_id = self.handle_id(operation_handle)
-    LOG.info("-- {0}: {1}".format(handle_id, message))
-
   def get_state(self, operation_handle):
     self.log_handle(operation_handle, 'getting state')
     cursor = operation_handle.get_handle()
-    return cursor.status()
-
-  def state_is_finished(self, operation_handle):
-    self.log_handle(operation_handle, 'checking finished state for operation')
-    cursor = operation_handle.get_handle()
     # cursor.status contains a string representation of one of
     # TCLIService.TOperationState.
-    return cursor.status() == "FINISHED_STATE"
+    return cursor.status()
+
+  def get_impala_exec_state(self, operation_handle):
+    try:
+      return 
self.__OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
+    except impyla_error.Error:
+      return ERROR
+    except Exception as e:
+      raise e
 
   def get_exec_summary(self, operation_handle):
     self.log_handle(operation_handle, 'getting exec summary operation')
@@ -527,22 +774,53 @@ class ImpylaHS2Connection(ImpalaConnection):
     # summary returned is thrift, not string.
     return cursor.get_summary()
 
-  def get_runtime_profile(self, operation_handle, profile_format):
+  def get_runtime_profile(self, operation_handle,
+                          profile_format=TRuntimeProfileFormat.STRING):
     self.log_handle(operation_handle, 'getting runtime profile operation')
     cursor = operation_handle.get_handle()
     return cursor.get_profile(profile_format=profile_format)
 
   def wait_for_finished_timeout(self, operation_handle, timeout):
     self.log_handle(operation_handle, 'waiting for query to reach FINISHED 
state')
-    raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
+    start_time = time.time()
+    while time.time() - start_time < timeout:
+      start_rpc_time = time.time()
+      impala_state = self.get_impala_exec_state(operation_handle)
+      rpc_time = time.time() - start_rpc_time
+      # if the rpc succeeded, the output is the query state
+      if impala_state == FINISHED:
+        return True
+      elif impala_state == ERROR:
+        try:
+          error_log = self.__do_rpc(
+            lambda: self.imp_service.get_log(operation_handle.log_context))
+          raise impyla_error.OperationalError(error_log, None)
+        finally:
+          self.close_query(operation_handle)
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
+    return False
 
-  def wait_for_admission_control(self, operation_handle):
+  def wait_for_admission_control(self, operation_handle, timeout_s=60):
     self.log_handle(operation_handle, 'waiting for completion of the admission 
control')
-    raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
+    start_time = time.time()
+    while time.time() - start_time < timeout_s:
+      start_rpc_time = time.time()
+      if self.is_admitted(operation_handle):
+        return True
+      rpc_time = time.time() - start_rpc_time
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
+    return False
 
   def get_admission_result(self, operation_handle):
     self.log_handle(operation_handle, 'getting the admission result')
-    raise NotImplementedError("Not yet implemented for HS2 - states differ 
from beeswax")
+    if self.is_admitted(operation_handle):
+      query_profile = self.get_runtime_profile(operation_handle)
+      admit_result = re.search(r"Admission result: (.*)", query_profile)
+      if admit_result:
+        return admit_result.group(1)
+    return ""
 
   def get_log(self, operation_handle):
     self.log_handle(operation_handle, 'getting log for operation')
@@ -552,12 +830,13 @@ class ImpylaHS2Connection(ImpalaConnection):
              if not PROGRESS_LOG_RE.match(line)]
     return '\n'.join(lines)
 
-  def fetch(self, sql_stmt, operation_handle, max_rows=-1):
+  def fetch(self, sql_stmt, operation_handle, max_rows=-1, 
discard_results=False):
     self.log_handle(operation_handle, 'fetching {} rows'.format(
       'all' if max_rows < 0 else max_rows))
-    return self.__fetch_results(operation_handle, max_rows)
+    return self.__fetch_results(operation_handle, max_rows, discard_results)
 
   def __fetch_results(self, handle, max_rows=-1,
+                      discard_results=False,
                       profile_format=TRuntimeProfileFormat.STRING):
     """Implementation of result fetching from handle."""
     cursor = handle.get_handle()
@@ -581,10 +860,15 @@ class ImpylaHS2Connection(ImpalaConnection):
     else:
       log = None
       profile = None
-    return ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
-                              column_labels=column_labels, 
column_types=column_types,
-                              query=handle.sql_stmt(), log=log, 
profile=profile,
-                              query_id=self.get_query_id(handle))
+    result = None
+
+    if discard_results:
+      return result
+    result = ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
+                                column_labels=column_labels, 
column_types=column_types,
+                                query=handle.sql_stmt(), log=log, 
profile=profile,
+                                query_id=self.get_query_id(handle))
+    return result
 
 
 class ImpylaHS2ResultSet(object):
@@ -628,17 +912,17 @@ class ImpylaHS2ResultSet(object):
       return str(val)
 
 
-def create_connection(host_port, use_kerberos=False, protocol='beeswax',
+def create_connection(host_port, use_kerberos=False, protocol=BEESWAX,
     is_hive=False, use_ssl=False, collect_profile_and_log=True):
-  if protocol == 'beeswax':
+  if protocol == BEESWAX:
     c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos,
                           use_ssl=use_ssl)
-  elif protocol == 'hs2':
+  elif protocol == HS2:
     c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
         is_hive=is_hive, use_ssl=use_ssl,
         collect_profile_and_log=collect_profile_and_log)
   else:
-    assert protocol == 'hs2-http'
+    assert protocol == HS2_HTTP
     c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
         is_hive=is_hive, use_http_transport=True, http_path='cliservice',
         use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log)
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 53eaf722a..103c5c1d4 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -491,6 +491,26 @@ class ImpaladService(BaseImpalaService):
     # Only check if the port is open, do not create Thrift transport.
     return self.is_port_open(self.hs2_http_port)
 
+  def create_client(self, protocol):
+    """Creates a new client connection for given protocol to this impalad"""
+    port = self.beeswax_port
+    if protocol == 'hs2':
+      port = self.hs2_port
+    elif protocol == 'hs2-http':
+      port = self.hs2_http_port
+    client = create_connection('%s:%d' % (self.hostname, port), 
protocol=protocol)
+    client.connect()
+    return client
+
+  def create_client_from_vector(self, vector):
+    """A shorthand for create_client with test vector as input.
+    Vector must have 'protocol' and 'exec_option' dimension.
+    Return a client of specified 'protocol' and with cofiguration 
'exec_option' set."""
+    client = self.create_client(protocol=vector.get_value('protocol'))
+    client.set_configuration(vector.get_exec_option_dict())
+    return client
+
+
 # Allows for interacting with the StateStore service to perform operations 
such as
 # accessing the debug webpage.
 class StateStoredService(BaseImpalaService):
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 866d9639d..2fe09a898 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -345,13 +345,11 @@ class ImpalaTestSuite(BaseTestSuite):
       cls.hive_transport.close()
     cls.close_impala_clients()
 
-  @classmethod
-  def setup_method(cls, test_method):
+  def setup_method(self, test_method):
     """Setup for all test method."""
-    cls.__reset_impala_clients()
+    self._reset_impala_clients()
 
-  @classmethod
-  def teardown_method(cls, test_method):
+  def teardown_method(self, test_method):
     """Teardown for all test method.
     Currently, it is only here as a placeholder for future use and complement
     setup_method() declaration."""
@@ -426,7 +424,7 @@ class ImpalaTestSuite(BaseTestSuite):
     cls.client = cls.default_impala_client(cls.default_test_protocol())
 
   @classmethod
-  def __reset_impala_clients(cls):
+  def _reset_impala_clients(cls):
     if cls.beeswax_client:
       cls.beeswax_client.clear_configuration()
     if cls.hs2_client:
@@ -1437,6 +1435,7 @@ class ImpalaTestSuite(BaseTestSuite):
     """Waits for the given 'query_handle' to reach the 'expected_state' using 
'client', or
     with the default connection if 'client' is None. If it does not reach the 
given state
     within 'timeout' seconds, the method throws an AssertionError.
+    DEPRECATED: Use client.wait_for_impala_state() instead.
     """
     self.wait_for_any_state(handle, [expected_state], timeout, client)
 
@@ -1445,6 +1444,7 @@ class ImpalaTestSuite(BaseTestSuite):
     or with the default connection if 'client' is None. If it does not reach 
one of the
     given states within 'timeout' seconds, the method throws an 
AssertionError. Returns
     the final state.
+    DEPRECATED: Use client.wait_for_any_impala_state() instead.
     """
     if client is None: client = self.client
     start_time = time.time()
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 6a94ceaf9..e40214ee5 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -27,12 +27,9 @@ import re
 import signal
 import sys
 import threading
-from copy import copy
+from copy import deepcopy
 from time import sleep, time
 
-from beeswaxd.BeeswaxService import QueryState
-
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.cluster_config import (
     impalad_admission_ctrl_flags,
     impalad_admission_ctrl_config_args,
@@ -43,9 +40,14 @@ from tests.common.custom_cluster_test_suite import (
     START_ARGS,
     CustomClusterTestSuite)
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
+from tests.common.impala_connection import (
+    RUNNING, FINISHED, ERROR,
+    IMPALA_CONNECTION_EXCEPTION)
 from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_dimensions import (
+    HS2, BEESWAX,
+    add_mandatory_exec_option,
     create_exec_option_dimension,
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -163,11 +165,22 @@ def metric_key(pool_name, metric_name):
   return "admission-controller.%s.%s" % (metric_name, pool_name)
 
 
+def wait_single_statestore_heartbeat():
+  """Wait for state sync across impalads."""
+  sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
+
+
 class TestAdmissionControllerBase(CustomClusterTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def default_test_protocol(cls):
+    # Do not change this. Multiple test method has been hardcoded under this 
assumption.
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestAdmissionControllerBase, cls).add_test_dimensions()
@@ -176,14 +189,32 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
       create_uncompressed_text_dimension(cls.get_workload()))
 
+  def enable_admission_service(self, method):
+    """Inject argument to enable admission control service.
+    Must be called at setup_method() and before calling setup_method() of 
superclass."""
+    start_args = "--enable_admission_service"
+    if START_ARGS in method.__dict__:
+      start_args = method.__dict__[START_ARGS] + " " + start_args
+    method.__dict__[START_ARGS] = start_args
+    if IMPALAD_ARGS in method.__dict__:
+      method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
+
+
+class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
+
+  @classmethod
+  def default_test_protocol(cls):
+    # HS2TestSuite override self.hs2_client with a raw Impala hs2 thrift 
client.
+    # This will set self.client = self.beeswax_client.
+    # Do not change this. Multiple test method has been hardcoded under this 
assumption.
+    return BEESWAX
 
-class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   def __check_pool_rejected(self, client, pool, expected_error_re):
     try:
       client.set_configuration({'request_pool': pool})
       client.execute("select 1")
       assert False, "Query should return error"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert re.search(expected_error_re, str(e))
 
   def __check_query_options(self, profile, expected_query_options):
@@ -232,34 +263,6 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     HS2TestSuite.check_response(get_profile_resp)
     self.__check_query_options(get_profile_resp.profile, expected_options)
 
-  def _execute_and_collect_profiles(self, queries, timeout_s, 
config_options={},
-      allow_query_failure=False):
-    """Submit the query statements in 'queries' in parallel to the first 
impalad in
-    the cluster. After submission, the results are fetched from the queries in
-    sequence and their profiles are collected. Wait for up to timeout_s for
-    each query to finish. If 'allow_query_failure' is True, succeeds if the 
query
-    completes successfully or ends up in the EXCEPTION state. Otherwise 
expects the
-    queries to complete successfully.
-    Returns the profile strings."""
-    client = self.cluster.impalads[0].service.create_beeswax_client()
-    expected_states = [client.QUERY_STATES['FINISHED']]
-    if allow_query_failure:
-      expected_states.append(client.QUERY_STATES['EXCEPTION'])
-    try:
-      handles = []
-      profiles = []
-      client.set_configuration(config_options)
-      for query in queries:
-        handles.append(client.execute_async(query))
-      for query, handle in zip(queries, handles):
-        state = self.wait_for_any_state(handle, expected_states, timeout_s)
-        if state == client.QUERY_STATES['FINISHED']:
-          self.client.fetch(query, handle)
-        profiles.append(self.client.get_runtime_profile(handle))
-      return profiles
-    finally:
-      client.close()
-
   def get_ac_process(self):
     """Returns the Process that is running the admission control service."""
     return self.cluster.impalads[0]
@@ -314,7 +317,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # Wait for query to clear admission control and get accounted for
       client.wait_for_admission_control(handle)
       self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
-      assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
+      assert client.is_finished(handle)
       # queueA has default query options mem_limit=128m,query_timeout_s=5
       self.__check_query_options(client.get_runtime_profile(handle),
           [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
@@ -386,7 +389,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     open_session_req = TCLIService.TOpenSessionReq()
     open_session_req.username = ""
     open_session_resp = self.hs2_client.OpenSession(open_session_req)
-    TestAdmissionController.check_response(open_session_resp)
+    TestAdmissionControllerRawHS2.check_response(open_session_resp)
 
     try:
       execute_statement_req = TCLIService.TExecuteStatementReq()
@@ -401,7 +404,110 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     finally:
       close_req = TCLIService.TCloseSessionReq()
       close_req.sessionHandle = open_session_resp.sessionHandle
-      
TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
+      TestAdmissionControllerRawHS2.check_response(
+        self.hs2_client.CloseSession(close_req))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
+          pool_max_mem=1024 * 1024 * 1024))
+  @needs_session()
+  def test_queuing_status_through_query_log_and_exec_summary(self):
+    """Test to verify that the HS2 client's GetLog() call and the ExecSummary 
expose
+    the query's queuing status, that is, whether the query was queued and what 
was the
+    latest queuing reason."""
+    # Start a long-running query.
+    long_query_resp = self.execute_statement("select sleep(10000)")
+    # Ensure that the query has started executing.
+    self.wait_for_admission_control(long_query_resp.operationHandle)
+    # Submit another query.
+    queued_query_resp = self.execute_statement("select sleep(1)")
+    # Wait until the query is queued.
+    self.wait_for_operation_state(queued_query_resp.operationHandle,
+            TCLIService.TOperationState.PENDING_STATE)
+    # Check whether the query log message correctly exposes the queuing status.
+    log = self.wait_for_log_message(
+        queued_query_resp.operationHandle, "Admission result :")
+    assert "Admission result : Queued" in log, log
+    assert "Latest admission queue reason : number of running queries 1 is at 
or over "
+    "limit 1" in log, log
+    # Now check the same for ExecSummary.
+    summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
+    summary_req.operationHandle = queued_query_resp.operationHandle
+    summary_req.sessionHandle = self.session_handle
+    exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
+    assert exec_summary_resp.summary.is_queued
+    assert "number of running queries 1 is at or over limit 1" in \
+           exec_summary_resp.summary.queued_reason, \
+      exec_summary_resp.summary.queued_reason
+    # Close the running query.
+    self.close(long_query_resp.operationHandle)
+    # Close the queued query.
+    self.close(queued_query_resp.operationHandle)
+
+
+class 
TestAdmissionControllerRawHS2WithACService(TestAdmissionControllerRawHS2):
+  """Runs all of the tests from TestAdmissionControllerRawHS2 but with the 
second
+  impalad in the minicluster configured to perform all admission control."""
+
+  def get_ac_process(self):
+    return self.cluster.admissiond
+
+  def get_ac_log_name(self):
+    return "admissiond"
+
+  def setup_method(self, method):
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    self.enable_admission_service(method)
+    super(TestAdmissionControllerRawHS2, self).setup_method(method)
+
+
+class TestAdmissionController(TestAdmissionControllerBase):
+
+  def get_ac_process(self):
+    """Returns the Process that is running the admission control service."""
+    return self.cluster.impalads[0]
+
+  def get_ac_log_name(self):
+    """Returns the prefix of the log files for the admission control 
process."""
+    return "impalad"
+
+  def setup_method(self, method):
+    """All tests in this class is non-destructive. Therefore, we can afford
+    resetting clients at every setup_method."""
+    super(TestAdmissionController, self).setup_method(method)
+    self._reset_impala_clients()
+
+  def _execute_and_collect_profiles(self, queries, timeout_s, 
config_options={},
+      allow_query_failure=False):
+    """Submit the query statements in 'queries' in parallel to the first 
impalad in
+    the cluster. After submission, the results are fetched from the queries in
+    sequence and their profiles are collected. Wait for up to timeout_s for
+    each query to finish. If 'allow_query_failure' is True, succeeds if the 
query
+    completes successfully or ends up in the EXCEPTION state. Otherwise 
expects the
+    queries to complete successfully.
+    Returns the profile strings."""
+    client = self.cluster.impalads[0].service.create_hs2_client()
+    expected_states = [FINISHED]
+    if allow_query_failure:
+      expected_states.append(ERROR)
+    try:
+      handles = []
+      profiles = []
+      client.set_configuration(config_options)
+      for query in queries:
+        handles.append(client.execute_async(query))
+      for query, handle in zip(queries, handles):
+        state = client.wait_for_any_impala_state(handle, expected_states, 
timeout_s)
+        if state == FINISHED:
+          self.client.fetch(query, handle)
+        profiles.append(client.get_runtime_profile(handle))
+      return profiles
+    finally:
+      for handle in handles:
+        client.close_query(handle)
+      client.close()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -610,7 +716,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       handle = self.client.execute_async(query.format(1))
       self.client.wait_for_finished_timeout(handle, 1000)
       expected_mem_limits = self.__get_mem_limits_admission_debug_page()
-      actual_mem_limits = 
self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
+      actual_mem_limits = self.__get_mem_limits_memz_debug_page(
+        self.client.handle_id(handle))
       mem_admitted =\
           get_mem_admitted_backends_debug_page(self.cluster, 
self.get_ac_process())
       debug_string = " expected_mem_limits:" + str(
@@ -670,7 +777,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   def test_dedicated_coordinator_planner_estimates(self, vector, 
unique_database):
     """Planner tests to add coverage for coordinator estimates when using 
dedicated
     coordinators. Also includes coverage for verifying cluster memory 
admitted."""
-    vector_copy = copy(vector)
+    vector_copy = deepcopy(vector)
     exec_options = vector_copy.get_value('exec_option')
     # Remove num_nodes from the options to allow test case runner to set it in 
one of
     # the test cases.
@@ -756,17 +863,17 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     query = "select * from functional.alltypesagg, (select 1) B limit 1"
     # Successfully run a query with mem limit equal to the lowest process 
memory among
     # impalads
-    exec_options = copy(vector.get_value('exec_option'))
+    exec_options = deepcopy(vector.get_value('exec_option'))
     exec_options['mem_limit'] = "2G"
     self.execute_query_expect_success(self.client, query, exec_options)
     # Test that a query scheduled to run on a single node and submitted to the 
impalad
     # with higher proc mem limit succeeds.
-    exec_options = copy(vector.get_value('exec_option'))
+    exec_options = deepcopy(vector.get_value('exec_option'))
     exec_options['mem_limit'] = "3G"
     exec_options['num_nodes'] = "1"
     self.execute_query_expect_success(self.client, query, exec_options)
     # Exercise rejection checks in admission controller.
-    exec_options = copy(vector.get_value('exec_option'))
+    exec_options = deepcopy(vector.get_value('exec_option'))
     exec_options['mem_limit'] = "3G"
     ex = self.execute_query_expect_failure(self.client, query, exec_options)
     assert ("Rejected query from pool default-pool: request memory needed "
@@ -777,17 +884,18 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # Wait for previous queries to finish to avoid flakiness.
       for impalad in self.cluster.impalads:
         
impalad.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 
0)
-      impalad_with_2g_mem = 
self.cluster.impalads[2].service.create_beeswax_client()
+      impalad_with_2g_mem = 
self.cluster.impalads[2].service.create_client_from_vector(
+        vector)
       impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
       impalad_with_2g_mem.execute_async("select sleep(1000)")
       # Wait for statestore update to update the mem admitted in each node.
-      sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
-      exec_options = copy(vector.get_value('exec_option'))
+      wait_single_statestore_heartbeat()
+      exec_options = deepcopy(vector.get_value('exec_option'))
       exec_options['mem_limit'] = "2G"
       # Since Queuing is synchronous, and we can't close the previous query 
till this
       # returns, we wait for this to timeout instead.
       self.execute_query(query, exec_options)
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert re.search(r"Queued reason: Not enough memory available on host 
\S+.Needed "
           r"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), 
str(e)
     finally:
@@ -800,11 +908,11 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
     statestored_args=_STATESTORED_ARGS,
     disable_log_buffering=True)
-  def test_cancellation(self):
+  def test_cancellation(self, vector):
     """ Test to confirm that all Async cancellation windows are hit and are 
able to
     successfully cancel the query"""
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_client_from_vector(vector)
     try:
       client.set_configuration_option("debug_action", 
"AC_BEFORE_ADMISSION:SLEEP@2000")
       client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 
1)
@@ -848,15 +956,16 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       queued_query_handle = client.execute_async("select 5")
       sleep(1)
-      assert client.get_state(queued_query_handle) == QueryState.COMPILED
-      assert "Admission result: Queued" in 
client.get_runtime_profile(queued_query_handle)
+      assert client.is_pending(queued_query_handle)
+      assert "Admission result: Queued" in client.get_runtime_profile(
+        queued_query_handle)
       # Only cancel the queued query, because close will wait till it 
unregisters, this
       # gives us a chance to close the running query and allow the dequeue 
thread to
       # dequeue the queue query
       client.cancel(queued_query_handle)
       client.close_query(handle)
-      client.close_query(queued_query_handle)
-      queued_profile = client.get_runtime_profile(queued_query_handle)
+      queued_profile = client.close_query(queued_query_handle,
+                                          fetch_profile_after_close=True)
       assert "Admission result: Cancelled (queued)" in queued_profile, 
queued_profile
       self.assert_log_contains(
           self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=")
@@ -866,11 +975,12 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       handle = client.execute_async("select sleep(10000)")
       queued_query_handle = client.execute_async("select 6")
       sleep(1)
-      assert client.get_state(queued_query_handle) == QueryState.COMPILED
-      assert "Admission result: Queued" in 
client.get_runtime_profile(queued_query_handle)
-      client.close_query(queued_query_handle)
+      assert client.is_pending(queued_query_handle)
+      assert "Admission result: Queued" in client.get_runtime_profile(
+        queued_query_handle)
+      queued_profile = client.close_query(queued_query_handle,
+                                          fetch_profile_after_close=True)
       client.close_query(handle)
-      queued_profile = client.get_runtime_profile(queued_query_handle)
       assert "Admission result: Cancelled (queued)" in queued_profile
       for i in self.cluster.impalads:
         i.service.wait_for_metric_value(
@@ -1000,7 +1110,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Run a bunch of queries with mem_limit set so that only one can be 
admitted
     # immediately. The rest should be queued and dequeued (timeout) due to 
host memory
     # pressure.
-    STMT = "select sleep(100)"
+    STMT = "select sleep(1000)"
     TIMEOUT_S = 20
     NUM_QUERIES = 5
     # IMPALA-9856: Disable query result spooling so that we can run queries 
with low
@@ -1034,7 +1144,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Run a bunch of queries with mem_limit set so that only one can be 
admitted
     # immediately. The rest should be queued and dequeued (timeout) due to 
pool memory
     # pressure.
-    STMT = "select sleep(100)"
+    STMT = "select sleep(1000)"
     TIMEOUT_S = 20
     NUM_QUERIES = 5
     # IMPALA-9856: Disable query result spooling so that we can run queries 
with low
@@ -1143,7 +1253,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.close_query(handle_running)
     self.__assert_num_queries_accounted(0)
     # Case 3: When a query gets rejected
-    exec_options = copy(vector.get_value('exec_option'))
+    exec_options = deepcopy(vector.get_value('exec_option'))
     exec_options['mem_limit'] = "1b"
     self.execute_query_expect_failure(self.client, query, exec_options)
     self.__assert_num_queries_accounted(0)
@@ -1211,6 +1321,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
                                                      pool=pool)
     query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, 
USER_ROOT,
                                                      pool=pool)
+    wait_single_statestore_heartbeat()
     keys = [
       "admission-controller.agg-current-users.root.queueB",
       "admission-controller.local-current-users.root.queueB",
@@ -1294,16 +1405,16 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       query_handles.append(query_handle)
 
     # Let state sync across impalads.
-    sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
+    wait_single_statestore_heartbeat()
 
     # Another query should be rejected
     impalad = self.cluster.impalads[limit % 2]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.set_configuration({'request_pool': pool})
     try:
       client.execute(SLOW_QUERY, user=user)
       assert False, "query should fail"
-    except Exception as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       # Construct the expected error message.
       expected = ("Rejected query from pool {pool}: current per-{type} load 
{limit} for "
                   "user '{user}'{group_description} is at or above the 
{err_type} limit "
@@ -1328,14 +1439,12 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   def execute_async_and_wait_for_running(self, impalad, query, user, pool):
     # Execute a query asynchronously, and wait for it to be running.
-    # Use beeswax client as it allows specifying the user that runs the query.
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.set_configuration({'request_pool': pool})
     handle = client.execute_async(query, user=user)
     timeout_s = 10
     # Make sure the query has been admitted and is running.
-    self.wait_for_state(
-      handle, client.QUERY_STATES['RUNNING'], timeout_s, client=client)
+    client.wait_for_impala_state(handle, RUNNING, timeout_s)
     return self.ClientAndHandle(client, handle)
 
   @pytest.mark.execute_serially
@@ -1375,8 +1484,8 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.close_query(sleep_query_handle)
 
     # Observe that the queued query fails.
-    self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
-    self.close_query(queued_query_handle)
+    self.client.wait_for_impala_state(queued_query_handle, ERROR, 20),
+    self.client.close_query(queued_query_handle)
 
     # Change the config back to a valid value
     config.set_config_value(pool_name, config_str, 0)
@@ -1396,8 +1505,8 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.close_query(sleep_query_handle)
 
     # Observe that the queued query fails.
-    self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
-    self.close_query(queued_query_handle)
+    self.client.wait_for_impala_state(queued_query_handle, ERROR, 20),
+    self.client.close_query(queued_query_handle)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -1405,7 +1514,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       pool_max_mem=1024 * 1024 * 1024),
     statestored_args=_STATESTORED_ARGS)
   def test_trivial_query(self):
-    self.client.execute("set enable_trivial_query_for_admission=false")
+    self.client.set_configuration_option("enable_trivial_query_for_admission", 
"false")
 
     # Test the second request does need to queue when trivial query is 
disabled.
     sleep_query_handle = self.client.execute_async("select sleep(10000)")
@@ -1417,7 +1526,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.close_query(sleep_query_handle)
     self.client.close_query(trivial_query_handle)
 
-    self.client.execute("set enable_trivial_query_for_admission=true")
+    self.client.set_configuration_option("enable_trivial_query_for_admission", 
"true")
     # Test when trivial query is enabled, all trivial queries should be
     # admitted immediately.
     sleep_query_handle = self.client.execute_async("select sleep(10000)")
@@ -1439,7 +1548,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Test whether it will fail for a normal query.
     failed_query_handle = self.client.execute_async(
             "select * from functional_parquet.alltypes limit 100")
-    self.wait_for_state(failed_query_handle, QueryState.EXCEPTION, 20)
+    self.client.wait_for_impala_state(failed_query_handle, ERROR, 20)
     self.client.close_query(failed_query_handle)
     # Test it should pass all the trivial queries.
     self._test_trivial_queries_suc()
@@ -1463,7 +1572,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     def _test_multi_trivial_query_runs(self):
       timeout = 10
       admit_obj = self.admit_obj
-      client = admit_obj.cluster.impalads[0].service.create_beeswax_client()
+      client = admit_obj.cluster.impalads[0].service.create_hs2_client()
       for i in range(100):
         handle = client.execute_async(self.sql)
         if not self.expect_err:
@@ -1580,44 +1689,6 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert False, "Timed out waiting for change to profile\nSearch " \
                   "String: {0}\nProfile:\n{1}".format(search_string, 
str(profile))
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
-          pool_max_mem=1024 * 1024 * 1024))
-  @needs_session()
-  def test_queuing_status_through_query_log_and_exec_summary(self):
-    """Test to verify that the HS2 client's GetLog() call and the ExecSummary 
expose
-    the query's queuing status, that is, whether the query was queued and what 
was the
-    latest queuing reason."""
-    # Start a long-running query.
-    long_query_resp = self.execute_statement("select sleep(10000)")
-    # Ensure that the query has started executing.
-    self.wait_for_admission_control(long_query_resp.operationHandle)
-    # Submit another query.
-    queued_query_resp = self.execute_statement("select sleep(1)")
-    # Wait until the query is queued.
-    self.wait_for_operation_state(queued_query_resp.operationHandle,
-            TCLIService.TOperationState.PENDING_STATE)
-    # Check whether the query log message correctly exposes the queuing status.
-    log = self.wait_for_log_message(
-        queued_query_resp.operationHandle, "Admission result :")
-    assert "Admission result : Queued" in log, log
-    assert "Latest admission queue reason : number of running queries 1 is at 
or over "
-    "limit 1" in log, log
-    # Now check the same for ExecSummary.
-    summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
-    summary_req.operationHandle = queued_query_resp.operationHandle
-    summary_req.sessionHandle = self.session_handle
-    exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
-    assert exec_summary_resp.summary.is_queued
-    assert "number of running queries 1 is at or over limit 1" in \
-           exec_summary_resp.summary.queued_reason,\
-      exec_summary_resp.summary.queued_reason
-    # Close the running query.
-    self.close(long_query_resp.operationHandle)
-    # Close the queued query.
-    self.close(queued_query_resp.operationHandle)
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=(impalad_admission_ctrl_flags(
@@ -1743,12 +1814,12 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # state because of the 'WAIT' debug action), wait for the 'lineitem' 
scan to
       # complete, and then validate that one of the executor backends 
shutdowns and
       # releases its admitted memory.
-      self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], timeout)
+      self.client.wait_for_impala_state(handle, RUNNING, timeout)
       # Once the 'lineitem' scan completes, NumCompletedBackends should be 1.
       self.assert_eventually(60, 1, lambda: "NumCompletedBackends: 1 (1)"
           in self.client.get_runtime_profile(handle))
       get_num_completed_backends(self.cluster.impalads[0].service,
-        handle.get_handle().id) == 1
+        self.client.handle_id(handle)) == 1
       mem_admitted =\
           get_mem_admitted_backends_debug_page(self.cluster, 
self.get_ac_process())
       num_executor_zero_admitted = 0
@@ -2012,13 +2083,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
   def setup_method(self, method):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
-
-    start_args = "--enable_admission_service"
-    if START_ARGS in method.__dict__:
-      start_args = method.__dict__[START_ARGS] + " " + start_args
-    method.__dict__[START_ARGS] = start_args
-    if IMPALAD_ARGS in method.__dict__:
-      method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
+    self.enable_admission_service(method)
     super(TestAdmissionControllerWithACService, self).setup_method(method)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2038,8 +2103,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     timeout_s = 10
     # Make sure the query is through admission control before killing the 
admissiond. It
     # should be unaffected and finish successfully.
-    self.wait_for_state(
-        before_kill_handle, self.client.QUERY_STATES['RUNNING'], timeout_s)
+    self.client.wait_for_impala_state(before_kill_handle, RUNNING, timeout_s)
     self.cluster.admissiond.kill()
     result = self.client.fetch(query, before_kill_handle)
     assert result.data == ["730"]
@@ -2059,7 +2123,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     try:
       result = self.client.fetch(query, no_restart_handle)
       assert False, "Query should have failed"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "Failed to admit query after waiting " in str(e)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2077,20 +2141,18 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     handle1 = self.execute_query_async(query)
     timeout_s = 10
     # Make sure the first query has been admitted.
-    self.wait_for_state(
-        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
+    self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
 
     # Run another query. This query should be queued because only 1 query is 
allowed in
     # the default pool.
-    handle2 = self.execute_query_async(query)
+    handle2 = self.client.execute_async(query)
     self._wait_for_change_to_profile(handle2, "Admission result: Queued")
 
     # Cancel the first query. It's resources should be released and the second 
query
     # should be admitted.
     self.client.cancel(handle1)
     self.client.close_query(handle1)
-    self.wait_for_state(
-        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
+    self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
@@ -2108,20 +2170,18 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     handle1 = self.execute_query_async(query)
     timeout_s = 10
     # Make sure the first query has been admitted.
-    self.wait_for_state(
-        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
+    self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
 
     # Run another query. This query should be queued because the executor 
group only has 1
     # slot.
-    handle2 = self.execute_query_async(query)
+    handle2 = self.client.execute_async(query)
     self._wait_for_change_to_profile(handle2, "Admission result: Queued")
 
     # Cancel the first query. It's resources should be released and the second 
query
     # should be admitted.
     self.client.cancel(handle1)
     self.client.close_query(handle1)
-    self.wait_for_state(
-        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
+    self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
@@ -2133,25 +2193,23 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     # Query designed to run for a few minutes.
     query = "select count(*) from functional.alltypes where int_col = 
sleep(10000)"
     impalad1 = self.cluster.impalads[0]
-    client1 = impalad1.service.create_beeswax_client()
+    client1 = impalad1.service.create_hs2_client()
     handle1 = client1.execute_async(query)
     timeout_s = 10
     # Make sure the first query has been admitted.
-    self.wait_for_state(
-        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, 
client=client1)
+    client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
 
     # Run another query with a different coordinator. This query should be 
queued because
     # only 1 query is allowed in the default pool.
     impalad2 = self.cluster.impalads[1]
-    client2 = impalad2.service.create_beeswax_client()
+    client2 = impalad2.service.create_hs2_client()
     handle2 = client2.execute_async(query)
     self._wait_for_change_to_profile(handle2, "Admission result: Queued", 
client=client2)
 
     # Kill the coordinator for the first query. The resources for the query 
should get
     # cleaned up and the second query should be admitted.
     impalad1.kill()
-    self.wait_for_state(
-        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, 
client=client2)
+    client2.wait_for_impala_state(handle2, RUNNING, timeout_s)
 
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
@@ -2185,15 +2243,22 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       submitting to a single impalad, we know exactly what the values should 
be,
       otherwise we just check that they are within reasonable bounds.
   """
+
+  BATCH_SIZE = 100
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestAdmissionControllerStress, cls).add_test_dimensions()
-    # This test really need to run using beeswax client since hs2 client has 
not
-    # implemented some methods needed to query admission control status.
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 
'beeswax'))
     # Slow down test query by setting low batch_size.
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
-      cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[10]))
+      cluster_sizes=[0], disable_codegen_options=[False],
+      batch_sizes=[cls.BATCH_SIZE]))
+    # Turning off result spooling allows us to better control query execution 
by
+    # controlling the number of rows fetched. This allows us to maintain 
resource
+    # usage among backends.
+    add_mandatory_exec_option(cls, 'spool_query_results', 0)
+    # Set 100ms long poling time to get faster initial response.
+    add_mandatory_exec_option(cls, 'long_polling_time_ms', 100)
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION))
     cls.ImpalaTestMatrix.add_dimension(
@@ -2356,7 +2421,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
             REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
-      sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
+      wait_single_statestore_heartbeat()
     LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), 
heartbeats)
 
   def wait_for_admitted_threads(self, num_threads):
@@ -2410,7 +2475,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       sleep(1)
 
   class SubmitQueryThread(threading.Thread):
-    def __init__(self, impalad, additional_query_options, vector, query_num,
+    def __init__(self, impalad, vector, query_num,
         query_end_behavior, executing_threads, exit_signal):
       """
       executing_threads must be provided so that this thread can add itself 
when the
@@ -2418,8 +2483,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       """
       super(self.__class__, self).__init__()
       self.executing_threads = executing_threads
-      self.vector = vector
-      self.additional_query_options = additional_query_options
+      # Make vector local to this thread, because run() will modify it later.
+      self.vector = deepcopy(vector)
       self.query_num = query_num
       self.query_end_behavior = query_end_behavior
       self.impalad = impalad
@@ -2431,6 +2496,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       # Set by the main thread when tearing down
       self.exit_signal = exit_signal
       self.setDaemon(True)
+      # Determine how many rows to fetch per interval.
+      self.rows_per_fetch = TestAdmissionControllerStress.BATCH_SIZE
 
     def thread_should_run(self):
       return not self.exit_signal.is_set()
@@ -2444,20 +2511,12 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           if not self.thread_should_run():
             return
 
-          exec_options = dict()
-          exec_options.update(self.vector.get_exec_option_dict())
-          exec_options.update(self.additional_query_options)
-          # Turning off result spooling allows us to better control query 
execution by
-          # controlling the number of rows fetched. This allows us to maintain 
resource
-          # usage among backends.
-          exec_options['spool_query_results'] = 0
-          exec_options['long_polling_time_ms'] = 100
           if self.query_end_behavior == 'QUERY_TIMEOUT':
-            exec_options['query_timeout_s'] = QUERY_END_TIMEOUT_S
+            self.vector.set_exec_option('query_timeout_s', QUERY_END_TIMEOUT_S)
           query = QUERY.format(self.query_num)
           self.query_state = 'SUBMITTING'
-          client = self.impalad.service.create_beeswax_client()
-          client.set_configuration(exec_options)
+          assert self.vector.get_protocol() == HS2, "Must use hs2 protocol"
+          client = self.impalad.service.create_client_from_vector(self.vector)
 
           LOG.info("Submitting query %s with ending behavior %s",
                    self.query_num, self.query_end_behavior)
@@ -2485,7 +2544,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           msg = "Admission result for query {} : {}".format(
             self.query_num, admission_result)
           self.log_handle(client, query_handle, msg)
-        except ImpalaBeeswaxException as e:
+        except IMPALA_CONNECTION_EXCEPTION as e:
           LOG.exception(e)
           raise e
 
@@ -2500,10 +2559,10 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
         # query. It needs to wait until the main thread requests it to end its 
query.
         while self.thread_should_run() and self.query_state != 'COMPLETED':
           # The query needs to stay active until the main thread requests it 
to end.
-          # Otherwise, the query may get cancelled early. Fetch 1 row every
-          # FETCH_INTERVAL to keep the query active.
-          fetch_result = client.fetch(query, query_handle, 1)
-          assert len(fetch_result.data) == 1, str(fetch_result)
+          # Otherwise, the query may get cancelled early. Fetch 
self.rows_per_fetch row
+          # every FETCH_INTERVAL to keep the query active.
+          fetch_result = client.fetch(query, query_handle, self.rows_per_fetch)
+          assert len(fetch_result.data) == self.rows_per_fetch, 
str(fetch_result)
           self.num_rows_fetched += len(fetch_result.data)
           if self.query_state == 'REQUEST_QUERY_END':
             self._end_query(client, query, query_handle)
@@ -2537,20 +2596,19 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
         # Sleep and wait for the query to be cancelled. The cancellation will
         # set the state to EXCEPTION.
         start_time = time()
-        while self.thread_should_run() and (
-          client.get_state(query_handle) != client.QUERY_STATES['EXCEPTION']):
+        while self.thread_should_run() and not client.is_error(query_handle):
           assert (time() - start_time < STRESS_TIMEOUT),\
             "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
           sleep(1)
-        # try fetch and confirm from exception message that query was timed 
out.
         try:
-          client.fetch(query, query_handle)
+          # try fetch and confirm from exception message that query was timed 
out.
+          client.fetch(query, query_handle, discard_results=True)
           assert False
-        except Exception as e:
+        except (Exception, ImpalaHiveServer2Service) as e:
           assert 'expired due to client inactivity' in str(e)
       elif self.query_end_behavior == 'EOS':
         # Fetch all rows so we hit eos.
-        client.fetch(query, query_handle)
+        client.fetch(query, query_handle, discard_results=True)
       elif self.query_end_behavior == 'CLIENT_CANCEL':
         client.cancel(query_handle)
       else:
@@ -2604,8 +2662,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     LOG.info("Found %s queued queries after %s seconds", actual_queued,
         round(time() - start_time, 1))
 
-  def run_admission_test(self, vector, additional_query_options,
-                         check_user_aggregates=False):
+  def run_admission_test(self, vector, check_user_aggregates=False):
     LOG.info("Starting test case with parameters: %s", vector)
     self.impalads = self.cluster.impalads
     self.ac_processes = self.get_ac_processes()
@@ -2628,8 +2685,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
         sleep(submission_delay_ms / 1000.0)
       impalad = self.impalads[query_num % len(self.impalads)]
       query_end_behavior = QUERY_END_BEHAVIORS[query_num % 
len(QUERY_END_BEHAVIORS)]
-      thread = self.SubmitQueryThread(impalad, additional_query_options, 
vector,
-          query_num, query_end_behavior, self.executing_threads, self.exit)
+      thread = self.SubmitQueryThread(impalad, vector, query_num, 
query_end_behavior,
+                                      self.executing_threads, self.exit)
       thread.start()
       self.all_threads.append(thread)
 
@@ -2747,11 +2804,12 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
     self.pool_name = 'default-pool'
+    vector.set_exec_option('request_pool', self.pool_name)
+    vector.set_exec_option('mem_limit', sys.maxsize)
     # The pool has no mem resources set, so submitting queries with huge 
mem_limits
     # should be fine. This exercises the code that does the per-pool memory
     # accounting (see MemTracker::GetPoolMemReserved()) without actually being 
throttled.
-    self.run_admission_test(vector, {'request_pool': self.pool_name,
-      'mem_limit': sys.maxsize})
+    self.run_admission_test(vector)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -2761,7 +2819,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     statestored_args=_STATESTORED_ARGS)
   def test_admission_controller_with_configs(self, vector):
     self.pool_name = 'root.queueB'
-    self.run_admission_test(vector, {'request_pool': self.pool_name})
+    vector.set_exec_option('request_pool', self.pool_name)
+    self.run_admission_test(vector)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -2778,8 +2837,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       # Save time by running only 1 out of 6 vector combination.
       pytest.skip('Only run with round_robin_submission=True and 
submission_delay_ms=0.')
     self.pool_name = 'root.queueF'
-    self.run_admission_test(vector, {'request_pool': self.pool_name},
-                            check_user_aggregates=True)
+    vector.set_exec_option('request_pool', self.pool_name)
+    self.run_admission_test(vector, check_user_aggregates=True)
 
   def get_proc_limit(self):
     """Gets the process mem limit as reported by the impalad's mem-tracker 
metric.
@@ -2816,8 +2875,9 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     # the mem limit.
     num_impalads = len(self.cluster.impalads)
     query_mem_limit = (proc_limit // MAX_NUM_CONCURRENT_QUERIES // 
num_impalads) - 1
-    self.run_admission_test(vector,
-        {'request_pool': self.pool_name, 'mem_limit': query_mem_limit})
+    vector.set_exec_option('request_pool', self.pool_name)
+    vector.set_exec_option('mem_limit', query_mem_limit)
+    self.run_admission_test(vector)
 
 
 class 
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
@@ -2833,11 +2893,5 @@ class 
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
   def setup_method(self, method):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip('runs only in exhaustive')
-
-    start_args = "--enable_admission_service"
-    if START_ARGS in method.__dict__:
-      start_args = method.__dict__[START_ARGS] + " " + start_args
-    method.__dict__[START_ARGS] = start_args
-    if IMPALAD_ARGS in method.__dict__:
-      method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
+    self.enable_admission_service(method)
     super(TestAdmissionControllerStressWithACService, 
self).setup_method(method)
diff --git a/tests/custom_cluster/test_session_expiration.py 
b/tests/custom_cluster/test_session_expiration.py
index a56c877ba..e76f3457d 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -179,9 +179,7 @@ class TestSessionExpiration(CustomClusterTestSuite):
     self.close_impala_clients()
     # Create 2 sessions.
     client1 = impalad.service.create_hs2_client()
-    client1.execute_async("select sleep(5000)")
     client2 = impalad.service.create_hs2_client()
-    client2.execute_async("select sleep(5000)")
     try:
       # Trying to open a third session should fail.
       impalad.service.create_hs2_client()
diff --git a/tests/util/thrift_util.py b/tests/util/thrift_util.py
index 8baff3334..427769849 100644
--- a/tests/util/thrift_util.py
+++ b/tests/util/thrift_util.py
@@ -25,6 +25,7 @@ from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 from thrift_sasl import TSaslClientTransport
 
+
 def create_transport(host, port, service, transport_type="buffered", user=None,
                      password=None, use_ssl=False, ssl_cert=None):
   """
@@ -78,4 +79,11 @@ def op_handle_to_query_id(t_op_handle):
   if t_op_handle is None or t_op_handle.operationId is None:
     return None
   # This should use the same logic as in 
ImpalaServer::THandleIdentifierToTUniqueId().
-  return "%x:%x" % struct.unpack("QQ", t_op_handle.operationId.guid)
+  return "%016x:%016x" % struct.unpack("QQ", t_op_handle.operationId.guid)
+
+
+def session_handle_to_session_id(t_session_op_handle):
+  if t_session_op_handle is None or t_session_op_handle.sessionId is None:
+    return None
+  # This should use the same logic as in 
ImpalaServer::THandleIdentifierToTUniqueId().
+  return "%016x:%016x" % struct.unpack("QQ", 
t_session_op_handle.sessionId.guid)


Reply via email to