This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit daaf73a7c28fe992f176261a8adf249750e8b157 Author: Riza Suminto <[email protected]> AuthorDate: Thu Jan 16 19:42:51 2025 -0800 IMPALA-13682: Implement missing capabilities in ImpylaHS2Connection This patch implements 'wait_for_finished_timeout', 'wait_for_admission_control', and 'get_admission_result' for ImpylaHS2Client. This patch also changes the behavior of ImpylaHS2Connection to produce several extra cursors aside from self.__cursor for 'execute' call that supplies user argument and each 'execute_async' to make issuing multiple concurrent queries possible. Note that each HS2 cursor opens its own HS2 Session. Therefore, this patch breaks the assumption that an ImpylaHS2Connection is always under a single HS2 Session (see HIVE-11402 and HIVE-14247 on why concurrent query with shared HS2 Session is problematic). However, they do share the same query options stored at self.__query_options. In practice, most Impala tests do not care about running concurrent queries under a single HS2 session but only require them to use the same query options. The following additions are new for both BeeswaxConnection and ImpylaHS2Connection: - Add method 'log_client' for convenience. - Implement consistent query state mapping and checking through several accessor methods. - Add methods 'wait_for_impala_state' and 'wait_for_any_impala_state' that use 'get_impala_exec_state' method internally. - Add 'fetch_profile_after_close' parameter to 'close_query' method. If True, 'close_query' will return the query profile after closing the query. - Add 'discard_results' parameter for 'fetch' method. This can save time parsing results if the test does not care about the result. Reuse existing op_handle_to_query_id and add new session_handle_to_session_id to parse HS2 TOperationHandle.operationId.guid and TSessionHandle.sessionId.guid respectively. To show that ImpylaHS2Connection is on par with BeeswaxConnection, this patch refactors test_admission_controller.py to test using HS2 protocol by default. Test that does raw HS2 RPC (require capabilities from HS2TestSuite) is separated out into a new TestAdmissionControllerRawHS2 class and stays using beeswax protocol by default. All calls to copy.copy is replaced with copy.deepcopy for safety. Testing: - Pass exhaustive tests. Change-Id: I9ac07732424c16338e060c9392100b54337f11b8 Reviewed-on: http://gerrit.cloudera.org:8080/22362 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/client-request-state.h | 5 + tests/beeswax/impala_beeswax.py | 16 +- tests/common/impala_connection.py | 490 +++++++++++++++++----- tests/common/impala_service.py | 20 + tests/common/impala_test_suite.py | 12 +- tests/custom_cluster/test_admission_controller.py | 430 ++++++++++--------- tests/custom_cluster/test_session_expiration.py | 2 - tests/util/thrift_util.py | 10 +- 8 files changed, 680 insertions(+), 305 deletions(-) diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 8a8543295..347f8cc9f 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -82,6 +82,11 @@ class ClientRequestState { ~ClientRequestState(); + /// Enums representing query exection state internal to impala. + /// Mapping to client protocol-specific states are provided by methods such as + /// BeeswaxQueryState() for Beeswax and TOperationState() for HS2. + /// If updating this enum, please also update mappings at + /// tests/common/impala_connection.py enum class ExecState { INITIALIZED, PENDING, RUNNING, FINISHED, ERROR }; enum class RetryState { RETRYING, RETRIED, NOT_RETRIED }; diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index 5823c00ec..fb74146f2 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -224,11 +224,11 @@ class ImpalaBeeswaxClient(object): if not fetch_profile_after_close: result.runtime_profile = self.get_runtime_profile(handle) - self.close_query(handle) + profile_after_close = self.close_query(handle, fetch_profile_after_close) if fetch_profile_after_close: - # Fetch the profile again after the query has closed and the profile is complete. - result.runtime_profile = self.get_runtime_profile(handle) + # Attach profile that is obtained after query closed. + result.runtime_profile = profile_after_close return result @@ -292,8 +292,11 @@ class ImpalaBeeswaxClient(object): def cancel_query(self, query_id): return self.__do_rpc(lambda: self.imp_service.Cancel(query_id)) - def close_query(self, handle): + def close_query(self, handle, fetch_profile_after_close=False): self.__do_rpc(lambda: self.imp_service.close(handle)) + if fetch_profile_after_close: + return self.get_runtime_profile(handle) + return None def wait_for_finished(self, query_handle): """Given a query handle, polls the coordinator waiting for the query to transition to @@ -371,7 +374,8 @@ class ImpalaBeeswaxClient(object): def get_log(self, query_handle): return self.__do_rpc(lambda: self.imp_service.get_log(query_handle)) - def fetch_results(self, query_string, query_handle, max_rows=-1): + def fetch_results(self, query_string, query_handle, max_rows=-1, + discard_results=False): """Fetches query results given a handle and query type (insert, use, other)""" query_type = self.__get_query_type(query_string) if query_type == 'use': @@ -382,6 +386,8 @@ class ImpalaBeeswaxClient(object): # Result fetching for insert is different from other queries. exec_result = None + if discard_results: + return exec_result if query_type == 'insert': exec_result = self.__fetch_insert_results(query_handle) else: diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 4b643573c..789809dc1 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -21,15 +21,24 @@ from __future__ import absolute_import, division, print_function import abc -import codecs from future.utils import with_metaclass import logging import re +import time +from beeswaxd.BeeswaxService import QueryState import impala.dbapi as impyla +import impala.error as impyla_error import tests.common from RuntimeProfile.ttypes import TRuntimeProfileFormat -from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient +from tests.beeswax.impala_beeswax import ( + DEFAULT_SLEEP_INTERVAL, + ImpalaBeeswaxClient, + ImpalaBeeswaxException) +from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP +from tests.util.thrift_util import ( + op_handle_to_query_id, + session_handle_to_session_id) LOG = logging.getLogger(__name__) @@ -46,6 +55,36 @@ PROGRESS_LOG_RE = re.compile( MAX_SQL_LOGGING_LENGTH = 128 * 1024 +# Tuple of root exception types from different client protocol. +IMPALA_CONNECTION_EXCEPTION = (ImpalaBeeswaxException, impyla_error.Error) + +# String representation of ClientRequestState::ExecState +INITIALIZED = 'INITIALIZED' +PENDING = 'PENDING' +RUNNING = 'RUNNING' +FINISHED = 'FINISHED' +ERROR = 'ERROR' +# ExecState that is final. +EXEC_STATES_FINAL = set([FINISHED, ERROR]) +# Possible ExecState after query passed admission controller. +EXEC_STATES_ADMITTED = set([RUNNING, FINISHED, ERROR]) +# Mapping of a ExecState to a set of possible future ExecState. +LEGAL_FUTURE_STATES = { + INITIALIZED: set([PENDING, RUNNING, FINISHED, ERROR]), + PENDING: set([RUNNING, FINISHED, ERROR]), + RUNNING: set([FINISHED, ERROR]), + FINISHED: set([ERROR]), + ERROR: set() +} + + +def has_legal_future_state(impala_state, future_states): + """Return True if 'impala_state' can transition to one of state listed in + 'future_states'.""" + assert impala_state in LEGAL_FUTURE_STATES + expected_impala_states = set(future_states) + return len(LEGAL_FUTURE_STATES[impala_state] & expected_impala_states) > 0 + # test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs). It is the # only test that runs SQL larger than 128KB. Logging these SQLs in execute() increases @@ -149,20 +188,65 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): pass @abc.abstractmethod - def close_query(self, handle): + def close_query(self, handle, fetch_profile_after_close=False): """Closes the query.""" pass @abc.abstractmethod def get_state(self, operation_handle): - """Returns the state of a query""" + """Returns the state of a query. + May raise en error, depending on connection type.""" pass @abc.abstractmethod - def state_is_finished(self, operation_handle): - """Returns whether the state of a query is finished""" + def get_impala_exec_state(self, operation_handle): + """Returns a string translation from client specific state of operation_handle + to Impala's ClientRequestState::ExecState.""" pass + def __is_at_exec_state(self, operation_handle, impala_state): + self.log_handle( + operation_handle, 'checking ' + impala_state + ' state for operation') + return self.get_impala_exec_state(operation_handle) == impala_state + + def state_is_finished(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is FINISHED. + DEPRECATED: use is_finished() instead.""" + return self.is_finished(operation_handle) + + def is_initialized(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is INITIALIZED""" + return self.__is_at_exec_state(operation_handle, INITIALIZED) + + def is_pending(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is PENDING""" + return self.__is_at_exec_state(operation_handle, PENDING) + + def is_running(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is RUNNING""" + return self.__is_at_exec_state(operation_handle, RUNNING) + + def is_finished(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is FINISHED""" + return self.__is_at_exec_state(operation_handle, FINISHED) + + def is_error(self, operation_handle): + """Returns whether the Impala exec state of a operation_handle is ERROR. + Internally, it will call get_state(), and any exception thrown by get_state() will + cause this method to return True.""" + return self.__is_at_exec_state(operation_handle, ERROR) + + def is_executing(self, operation_handle): + """Returns whether the state of a operation_handle is executing or will be + executing. Return False if operation_handle has ended, either successful or + with error.""" + return self.get_impala_exec_state(operation_handle) not in EXEC_STATES_FINAL + + def is_admitted(self, operation_handle): + """Returns whether the state of a operation_handle has passed Impala + admission control. Return True if handle state is error.""" + return self.get_impala_exec_state(operation_handle) in EXEC_STATES_ADMITTED + @abc.abstractmethod def get_log(self, operation_handle): """Returns the log of an operation as a string, with entries separated by newlines.""" @@ -185,10 +269,19 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): pass @abc.abstractmethod - def fetch(self, sql_stmt, operation_handle, max_rows=-1): + def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False): """Fetches query results up to max_rows given a handle and sql statement. If max_rows < 0, all rows are fetched. If max_rows > 0 but the number of - rows returned is less than max_rows, all the rows have been fetched.""" + rows returned is less than max_rows, all the rows have been fetched. + Return None if discard_results is True. + TODO: 'sql_stmt' can be obtained from 'operation_handle'.""" + pass + + @abc.abstractmethod + def get_runtime_profile(self, operation_handle, + profile_format=TRuntimeProfileFormat.STRING): + """Get runtime profile of given 'operation_handle'. + Handle must stay open.""" pass @abc.abstractmethod @@ -198,14 +291,85 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): Otherwise, return str(operation_handle).""" pass - @abc.abstractmethod def log_handle(self, operation_handle, message): """Log 'message' at INFO level, along with id of 'operation_handle'.""" + handle_id = self.handle_id(operation_handle) + LOG.info("-- {0}: {1}".format(handle_id, message)) + + def log_client(self, message): + """Log 'message' at INFO level.""" + LOG.info("-- {0}".format(message)) + + def wait_for_impala_state(self, operation_handle, expected_impala_state, timeout): + """Waits for the given 'operation_handle' to reach the 'expected_impala_state'. + 'expected_impala_state' must be a string of either 'INITIALIZED', 'PENDING', + 'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach the given state within + 'timeout' seconds, the method throws an AssertionError. + """ + self.wait_for_any_impala_state(operation_handle, [expected_impala_state], timeout) + + def wait_for_any_impala_state(self, operation_handle, expected_impala_states, + timeout_s): + """Waits for the given 'operation_handle' to reach one of 'expected_impala_states'. + Each string in 'expected_impala_states' must either be 'INITIALIZED', 'PENDING', + 'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach one of the given states + within 'timeout' seconds, the method throws an AssertionError. + Returns the final state. + """ + start_time = time.time() + timeout_msg = None + while True: + impala_state = self.get_impala_exec_state(operation_handle) + interval = time.time() - start_time + if impala_state in expected_impala_states: + # Reached one of expected_impala_states. + break + elif not has_legal_future_state(impala_state, expected_impala_states): + timeout_msg = ("query '{0}' can not transition from last known state '{1}' to " + "any of the expected states {2}. Stop waiting after {3} " + "seconds.").format( + self.handle_id(operation_handle), impala_state, expected_impala_states, + interval) + break + elif interval >= timeout_s: + timeout_msg = ("query '{0}' did not reach one of the expected states {1}, last " + "known state {2}").format( + self.handle_id(operation_handle), expected_impala_states, impala_state) + break + time.sleep(DEFAULT_SLEEP_INTERVAL) + + if timeout_msg is not None: + raise tests.common.errors.Timeout(timeout_msg) + return impala_state + + @abc.abstractmethod + def wait_for_admission_control(self, operation_handle, timeout_s=60): + """Given an 'operation_handle', polls the coordinator waiting for it to complete + admission control processing of the query. + Return True if query pass admission control after given 'timeout_s'.""" + pass + + @abc.abstractmethod + def get_admission_result(self, operation_handle): + """Given an 'operation_handle', returns the admission result from the query + profile""" pass # Represents a connection to Impala using the Beeswax API. class BeeswaxConnection(ImpalaConnection): + + # This is based on ClientRequestState::BeeswaxQueryState(). + __QUERY_STATE_TO_EXEC_STATE = { + QueryState.CREATED: INITIALIZED, + QueryState.COMPILED: PENDING, + QueryState.RUNNING: RUNNING, + QueryState.FINISHED: FINISHED, + QueryState.EXCEPTION: ERROR, + # These are not official ExecState, but added to complete mapping. + QueryState.INITIALIZED: 'UNIMPLEMENTED_INITIALIZED', + } + def __init__(self, host_port, use_kerberos=False, user=None, password=None, use_ssl=False): self.__beeswax_client = ImpalaBeeswaxClient(host_port, use_kerberos, user=user, @@ -217,7 +381,7 @@ class BeeswaxConnection(ImpalaConnection): self.QUERY_STATES = self.__beeswax_client.query_states def get_test_protocol(self): - return 'beeswax' + return BEESWAX def get_host_port(self): return self.__host_port @@ -250,30 +414,35 @@ class BeeswaxConnection(ImpalaConnection): self.set_configuration_option("client_identifier", tests.common.current_node) def connect(self): - LOG.info("-- connecting to: %s" % self.__host_port) - self.__beeswax_client.connect() + try: + self.__beeswax_client.connect() + self.log_client("connected to %s with beeswax" % self.__host_port) + except Exception as e: + self.log_client("failed connecting to %s with beeswax" % self.__host_port) + raise e # TODO: rename to close_connection def close(self): - LOG.info("-- closing connection to: %s" % self.__host_port) + self.log_client("closing beeswax connection to: %s" % self.__host_port) self.__beeswax_client.close_connection() - def close_query(self, operation_handle): + def close_query(self, operation_handle, fetch_profile_after_close=False): self.log_handle(operation_handle, 'closing query for operation') - self.__beeswax_client.close_query(operation_handle.get_handle()) + return self.__beeswax_client.close_query(operation_handle.get_handle(), + fetch_profile_after_close) def close_dml(self, operation_handle): self.log_handle(operation_handle, 'closing DML query') self.__beeswax_client.close_dml(operation_handle.get_handle()) def execute(self, sql_stmt, user=None, fetch_profile_after_close=False): - LOG.info("-- executing against %s\n" % (self.__host_port)) + self.log_client("executing against %s\n" % (self.__host_port)) log_sql_stmt(sql_stmt) return self.__beeswax_client.execute(sql_stmt, user=user, fetch_profile_after_close=fetch_profile_after_close) def execute_async(self, sql_stmt, user=None): - LOG.info("-- executing async: %s\n" % (self.__host_port)) + self.log_client("executing async: %s\n" % (self.__host_port)) log_sql_stmt(sql_stmt) beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt, user=user) return OperationHandle(beeswax_handle, sql_stmt) @@ -286,15 +455,17 @@ class BeeswaxConnection(ImpalaConnection): self.log_handle(operation_handle, 'getting state') return self.__beeswax_client.get_state(operation_handle.get_handle()) - def state_is_finished(self, operation_handle): - self.log_handle(operation_handle, 'checking finished state for operation') - return self.get_state(operation_handle) == self.QUERY_STATES["FINISHED"] + def get_impala_exec_state(self, operation_handle): + return self.__QUERY_STATE_TO_EXEC_STATE[self.get_state(operation_handle)] def get_exec_summary(self, operation_handle): self.log_handle(operation_handle, 'getting exec summary operation') return self.__beeswax_client.get_exec_summary(operation_handle.get_handle()) - def get_runtime_profile(self, operation_handle): + def get_runtime_profile(self, operation_handle, + profile_format=TRuntimeProfileFormat.STRING): + assert profile_format == TRuntimeProfileFormat.STRING, ( + "Beeswax client only support getting runtime profile in STRING format.") self.log_handle(operation_handle, 'getting runtime profile operation') return self.__beeswax_client.get_runtime_profile(operation_handle.get_handle()) @@ -316,11 +487,11 @@ class BeeswaxConnection(ImpalaConnection): self.log_handle(operation_handle, 'getting log for operation') return self.__beeswax_client.get_log(operation_handle.get_handle().log_context) - def fetch(self, sql_stmt, operation_handle, max_rows=-1): + def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False): self.log_handle(operation_handle, 'fetching {} rows'.format( 'all' if max_rows < 0 else max_rows)) return self.__beeswax_client.fetch_results( - sql_stmt, operation_handle.get_handle(), max_rows) + sql_stmt, operation_handle.get_handle(), max_rows, discard_results) def handle_id(self, operation_handle): query_id = operation_handle.get_handle().id @@ -340,9 +511,23 @@ class ImpylaHS2Connection(ImpalaConnection): plus Impala-specific extensions, e.g. for fetching runtime profiles. TODO: implement support for kerberos, SSL, etc. """ + + # ClientRequestState::TOperationState() + __OPERATION_STATE_TO_EXEC_STATE = { + 'INITIALIZED_STATE': INITIALIZED, + 'PENDING_STATE': PENDING, + 'RUNNING_STATE': RUNNING, + 'FINISHED_STATE': FINISHED, + 'ERROR_STATE': ERROR, + # These are not official ExecState, but added to complete mapping. + 'CANCELED_STATE': 'UNIMPLEMENTED_CANCELLED', + 'CLOSED_STATE': 'UNIMPLEMENTED_CLOSED', + 'UKNOWN_STATE': 'UNIMPLEMENTED_UNKNOWN' + } + def __init__(self, host_port, use_kerberos=False, is_hive=False, use_http_transport=False, http_path="", use_ssl=False, - collect_profile_and_log=True): + collect_profile_and_log=True, user=None): self.__host_port = host_port self.__use_http_transport = use_http_transport self.__http_path = http_path @@ -353,28 +538,37 @@ class ImpylaHS2Connection(ImpalaConnection): # cursor for different operations (as opposed to creating a new cursor per operation) # so that the session is preserved. This means that we can only execute one operation # at a time per connection, which is a limitation also imposed by the Beeswax API. + # However, for ease of async query testing, opening multiple cursors through single + # ImpylaHS2Connection is allowed if executing query through execute_async() or + # execute() with user parameter that is different than self.__user. Do note though + # that they will not share the same session with self.__cursor. self.__impyla_conn = None self.__cursor = None # Default query option obtained from initial connect. # Query option names are in lower case for consistency. self.__default_query_options = {} + # List of all cursors that created through execute_async. + self.__async_cursors = list() # Query options to send along with each query. self.__query_options = {} self._is_hive = is_hive # Some Hive HS2 protocol, such as custom Calcite planner, may be able to collect # profile and log from Impala. self._collect_profile_and_log = collect_profile_and_log + self.__user = user def get_test_protocol(self): if self.__http_path: - return 'hs2-http' + return HS2_HTTP else: - return 'hs2' + return HS2 def get_host_port(self): return self.__host_port def set_configuration_option(self, name, value): + # Only set the option if it's not already set to the same value. + # value must be parsed to string. name = name.lower() value = str(value) if self.__query_options.get(name) != value: @@ -389,35 +583,53 @@ class ImpylaHS2Connection(ImpalaConnection): if hasattr(tests.common, "current_node") and not self._is_hive: self.set_configuration_option("client_identifier", tests.common.current_node) + def __open_single_cursor(self, user=None): + return self.__impyla_conn.cursor(user=user, convert_types=False, + close_finished_queries=False) + + def __close_single_cursor(self, cursor): + try: + # Explicitly close the cursor so that it will close the session. + cursor.close() + except Exception: + # The session may no longer be valid if the impalad was restarted during the test. + pass + def connect(self): - LOG.info("-- connecting to {0} with impyla".format(self.__host_port)) host, port = self.__host_port.split(":") conn_kwargs = {} if self._is_hive: conn_kwargs['auth_mechanism'] = 'PLAIN' - self.__impyla_conn = impyla.connect(host=host, port=int(port), - use_http_transport=self.__use_http_transport, - http_path=self.__http_path, - use_ssl=self.__use_ssl, **conn_kwargs) - # Get the default query options for the session before any modifications are made. - self.__cursor = \ - self.__impyla_conn.cursor(convert_types=False, close_finished_queries=False) - self.__default_query_options = {} - if not self._is_hive: - self.__cursor.execute("set all") - for name, val, kind in self.__cursor: - collect_default_query_options(self.__default_query_options, name, val, kind) - self.__cursor.close_operation() + try: + self.__impyla_conn = impyla.connect( + host=host, port=int(port), use_http_transport=self.__use_http_transport, + http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs) + self.__cursor = self.__open_single_cursor(user=self.__user) + # Get the default query options for the session before any modifications are made. + self.__default_query_options = {} + if not self._is_hive: + self.__cursor.execute("set all") + for name, val, kind in self.__cursor: + collect_default_query_options(self.__default_query_options, name, val, kind) + self.__cursor.close_operation() LOG.debug("Default query options: {0}".format(self.__default_query_options)) + self.log_client("connected to {0} with impyla {1} session {2}".format( + self.__host_port, self.get_test_protocol(), self.__get_session_id(self.__cursor) + )) + except Exception as e: + self.log_client("failed connecting to {0} with impyla {1}".format( + self.__host_port, self.get_test_protocol() + )) + raise e def close(self): - LOG.info("-- closing connection to: {0}".format(self.__host_port)) - try: - # Explicitly close the cursor so that it will close the session. - self.__cursor.close() - except Exception: - # The session may no longer be valid if the impalad was restarted during the test. - pass + self.log_client("closing 1 sync and {0} async {1} connections to: {2}".format( + len(self.__async_cursors), self.get_test_protocol(), self.__host_port)) + self.__close_single_cursor(self.__cursor) + for async_cursor in self.__async_cursors: + self.__close_single_cursor(async_cursor) + # Remove all async cursors. + self.__async_cursors = list() try: self.__impyla_conn.close() except AttributeError as e: @@ -430,59 +642,90 @@ class ImpylaHS2Connection(ImpalaConnection): """Trigger the GetTables() HS2 request on the given database (None means all dbs). Returns a list of (catalogName, dbName, tableName, tableType, tableComment). """ - LOG.info("-- getting tables for database: {0}".format(database)) + self.log_client("getting tables for database: {0}".format(database)) self.__cursor.get_tables(database_name=database) return self.__cursor.fetchall() - def close_query(self, operation_handle): + def close_query(self, operation_handle, fetch_profile_after_close=False): self.log_handle(operation_handle, 'closing query for operation') + # close_operation() will wipe out _last_operation. + # Assign it to op_handle so that we can pull the profile after close_operation(). + op_handle = operation_handle.get_handle()._last_operation operation_handle.get_handle().close_operation() + if fetch_profile_after_close: + assert self._collect_profile_and_log, ( + "This connection is not configured to collect profile.") + return op_handle.get_profile(TRuntimeProfileFormat.STRING) + return None + + def __log_execute(self, cursor, user): + self.log_client( + "executing against {0} at {1}. session: {2} main_cursor: {3} user: {4}\n".format( + (self._is_hive and 'Hive' or 'Impala'), self.__host_port, + self.__get_session_id(cursor), (cursor == self.__cursor), user) + ) def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING, fetch_profile_after_close=False): - LOG.info("-- executing against {0} at {1}\n".format( - self._is_hive and 'Hive' or 'Impala', self.__host_port)) log_sql_stmt(sql_stmt) - self.__cursor.execute(sql_stmt, configuration=self.__query_options) - handle = OperationHandle(self.__cursor, sql_stmt) + cursor = self.__cursor + result = None + try: + if user != self.__user: + # Must create a new cursor to supply 'user'. + cursor = self.__open_single_cursor(user=user) + self.__log_execute(cursor, user) + cursor.execute(sql_stmt, configuration=self.__query_options) + handle = OperationHandle(cursor, sql_stmt) + self.log_handle(handle, "started query in session {0}".format( + self.__get_session_id(cursor))) + result = self.__fetch_results_and_profile( + handle, profile_format=profile_format, + fetch_profile_after_close=fetch_profile_after_close) + finally: + cursor.close_operation() + if cursor != self.__cursor: + self.__close_single_cursor(cursor) + return result + def __fetch_results_and_profile( + self, operation_handle, profile_format=TRuntimeProfileFormat.STRING, + fetch_profile_after_close=False): r = None try: - r = self.__fetch_results(handle, profile_format=profile_format) + r = self.__fetch_results(operation_handle, profile_format=profile_format) finally: if r is None: # Try to close the query handle but ignore any exceptions not to replace the # original exception raised by '__fetch_results'. try: - self.close_query(handle) + self.close_query(operation_handle) except Exception: pass elif fetch_profile_after_close: - op_handle = handle.get_handle()._last_operation - self.close_query(handle) - # Match ImpalaBeeswaxResult by placing the full profile including end time and # duration into the return object. - r.runtime_profile = op_handle.get_profile(profile_format) + r.runtime_profile = self.close_query(operation_handle, fetch_profile_after_close) return r else: - self.close_query(handle) + self.close_query(operation_handle) return r def execute_async(self, sql_stmt, user=None): - LOG.info("-- executing against {0} at {1}\n".format( - self._is_hive and 'Hive' or 'Impala', self.__host_port)) - log_sql_stmt(sql_stmt) - if user is not None: - raise NotImplementedError("Not yet implemented for HS2 - authentication") + async_cursor = None try: - self.__cursor.execute_async(sql_stmt, configuration=self.__query_options) - handle = OperationHandle(self.__cursor, sql_stmt) - LOG.info("Started query {0}".format(self.get_query_id(handle))) + async_cursor = self.__open_single_cursor(user=user) + handle = OperationHandle(async_cursor, sql_stmt) + self.__log_execute(async_cursor, user) + log_sql_stmt(sql_stmt) + async_cursor.execute_async(sql_stmt, configuration=self.__query_options) + self.__async_cursors.append(async_cursor) return handle - except Exception: - self.__cursor.close_operation() - raise + except Exception as e: + if async_cursor: + async_cursor.close_operation() + self.__close_single_cursor(async_cursor) + raise e def cancel(self, operation_handle): self.log_handle(operation_handle, 'canceling operation') @@ -492,34 +735,38 @@ class ImpylaHS2Connection(ImpalaConnection): def get_query_id(self, operation_handle): """Return the string representation of the query id. Return empty string if handle is already canceled or closed.""" + id = None last_op = operation_handle.get_handle()._last_operation - if last_op is None: - return "" - guid_bytes = last_op.handle.operationId.guid - # hex_codec works on bytes, so this needs to a decode() to get back to a string - hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode() - lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode() - return "{0}:{1}".format(hi_str, lo_str) + if last_op is not None: + id = op_handle_to_query_id(last_op.handle) + return "" if id is None else id + + def __get_session_id(self, cursor): + """Return the string representation of the session id. + Return empty string if handle is already canceled or closed.""" + id = None + if cursor.session is not None: + id = session_handle_to_session_id(cursor.session.handle) + return "" if id is None else id def handle_id(self, operation_handle): query_id = self.get_query_id(operation_handle) return query_id if query_id else str(operation_handle) - def log_handle(self, operation_handle, message): - handle_id = self.handle_id(operation_handle) - LOG.info("-- {0}: {1}".format(handle_id, message)) - def get_state(self, operation_handle): self.log_handle(operation_handle, 'getting state') cursor = operation_handle.get_handle() - return cursor.status() - - def state_is_finished(self, operation_handle): - self.log_handle(operation_handle, 'checking finished state for operation') - cursor = operation_handle.get_handle() # cursor.status contains a string representation of one of # TCLIService.TOperationState. - return cursor.status() == "FINISHED_STATE" + return cursor.status() + + def get_impala_exec_state(self, operation_handle): + try: + return self.__OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)] + except impyla_error.Error: + return ERROR + except Exception as e: + raise e def get_exec_summary(self, operation_handle): self.log_handle(operation_handle, 'getting exec summary operation') @@ -527,22 +774,53 @@ class ImpylaHS2Connection(ImpalaConnection): # summary returned is thrift, not string. return cursor.get_summary() - def get_runtime_profile(self, operation_handle, profile_format): + def get_runtime_profile(self, operation_handle, + profile_format=TRuntimeProfileFormat.STRING): self.log_handle(operation_handle, 'getting runtime profile operation') cursor = operation_handle.get_handle() return cursor.get_profile(profile_format=profile_format) def wait_for_finished_timeout(self, operation_handle, timeout): self.log_handle(operation_handle, 'waiting for query to reach FINISHED state') - raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") + start_time = time.time() + while time.time() - start_time < timeout: + start_rpc_time = time.time() + impala_state = self.get_impala_exec_state(operation_handle) + rpc_time = time.time() - start_rpc_time + # if the rpc succeeded, the output is the query state + if impala_state == FINISHED: + return True + elif impala_state == ERROR: + try: + error_log = self.__do_rpc( + lambda: self.imp_service.get_log(operation_handle.log_context)) + raise impyla_error.OperationalError(error_log, None) + finally: + self.close_query(operation_handle) + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) + return False - def wait_for_admission_control(self, operation_handle): + def wait_for_admission_control(self, operation_handle, timeout_s=60): self.log_handle(operation_handle, 'waiting for completion of the admission control') - raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") + start_time = time.time() + while time.time() - start_time < timeout_s: + start_rpc_time = time.time() + if self.is_admitted(operation_handle): + return True + rpc_time = time.time() - start_rpc_time + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) + return False def get_admission_result(self, operation_handle): self.log_handle(operation_handle, 'getting the admission result') - raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") + if self.is_admitted(operation_handle): + query_profile = self.get_runtime_profile(operation_handle) + admit_result = re.search(r"Admission result: (.*)", query_profile) + if admit_result: + return admit_result.group(1) + return "" def get_log(self, operation_handle): self.log_handle(operation_handle, 'getting log for operation') @@ -552,12 +830,13 @@ class ImpylaHS2Connection(ImpalaConnection): if not PROGRESS_LOG_RE.match(line)] return '\n'.join(lines) - def fetch(self, sql_stmt, operation_handle, max_rows=-1): + def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False): self.log_handle(operation_handle, 'fetching {} rows'.format( 'all' if max_rows < 0 else max_rows)) - return self.__fetch_results(operation_handle, max_rows) + return self.__fetch_results(operation_handle, max_rows, discard_results) def __fetch_results(self, handle, max_rows=-1, + discard_results=False, profile_format=TRuntimeProfileFormat.STRING): """Implementation of result fetching from handle.""" cursor = handle.get_handle() @@ -581,10 +860,15 @@ class ImpylaHS2Connection(ImpalaConnection): else: log = None profile = None - return ImpylaHS2ResultSet(success=True, result_tuples=result_tuples, - column_labels=column_labels, column_types=column_types, - query=handle.sql_stmt(), log=log, profile=profile, - query_id=self.get_query_id(handle)) + result = None + + if discard_results: + return result + result = ImpylaHS2ResultSet(success=True, result_tuples=result_tuples, + column_labels=column_labels, column_types=column_types, + query=handle.sql_stmt(), log=log, profile=profile, + query_id=self.get_query_id(handle)) + return result class ImpylaHS2ResultSet(object): @@ -628,17 +912,17 @@ class ImpylaHS2ResultSet(object): return str(val) -def create_connection(host_port, use_kerberos=False, protocol='beeswax', +def create_connection(host_port, use_kerberos=False, protocol=BEESWAX, is_hive=False, use_ssl=False, collect_profile_and_log=True): - if protocol == 'beeswax': + if protocol == BEESWAX: c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos, use_ssl=use_ssl) - elif protocol == 'hs2': + elif protocol == HS2: c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, is_hive=is_hive, use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log) else: - assert protocol == 'hs2-http' + assert protocol == HS2_HTTP c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, is_hive=is_hive, use_http_transport=True, http_path='cliservice', use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 53eaf722a..103c5c1d4 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -491,6 +491,26 @@ class ImpaladService(BaseImpalaService): # Only check if the port is open, do not create Thrift transport. return self.is_port_open(self.hs2_http_port) + def create_client(self, protocol): + """Creates a new client connection for given protocol to this impalad""" + port = self.beeswax_port + if protocol == 'hs2': + port = self.hs2_port + elif protocol == 'hs2-http': + port = self.hs2_http_port + client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol) + client.connect() + return client + + def create_client_from_vector(self, vector): + """A shorthand for create_client with test vector as input. + Vector must have 'protocol' and 'exec_option' dimension. + Return a client of specified 'protocol' and with cofiguration 'exec_option' set.""" + client = self.create_client(protocol=vector.get_value('protocol')) + client.set_configuration(vector.get_exec_option_dict()) + return client + + # Allows for interacting with the StateStore service to perform operations such as # accessing the debug webpage. class StateStoredService(BaseImpalaService): diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 866d9639d..2fe09a898 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -345,13 +345,11 @@ class ImpalaTestSuite(BaseTestSuite): cls.hive_transport.close() cls.close_impala_clients() - @classmethod - def setup_method(cls, test_method): + def setup_method(self, test_method): """Setup for all test method.""" - cls.__reset_impala_clients() + self._reset_impala_clients() - @classmethod - def teardown_method(cls, test_method): + def teardown_method(self, test_method): """Teardown for all test method. Currently, it is only here as a placeholder for future use and complement setup_method() declaration.""" @@ -426,7 +424,7 @@ class ImpalaTestSuite(BaseTestSuite): cls.client = cls.default_impala_client(cls.default_test_protocol()) @classmethod - def __reset_impala_clients(cls): + def _reset_impala_clients(cls): if cls.beeswax_client: cls.beeswax_client.clear_configuration() if cls.hs2_client: @@ -1437,6 +1435,7 @@ class ImpalaTestSuite(BaseTestSuite): """Waits for the given 'query_handle' to reach the 'expected_state' using 'client', or with the default connection if 'client' is None. If it does not reach the given state within 'timeout' seconds, the method throws an AssertionError. + DEPRECATED: Use client.wait_for_impala_state() instead. """ self.wait_for_any_state(handle, [expected_state], timeout, client) @@ -1445,6 +1444,7 @@ class ImpalaTestSuite(BaseTestSuite): or with the default connection if 'client' is None. If it does not reach one of the given states within 'timeout' seconds, the method throws an AssertionError. Returns the final state. + DEPRECATED: Use client.wait_for_any_impala_state() instead. """ if client is None: client = self.client start_time = time.time() diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 6a94ceaf9..e40214ee5 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -27,12 +27,9 @@ import re import signal import sys import threading -from copy import copy +from copy import deepcopy from time import sleep, time -from beeswaxd.BeeswaxService import QueryState - -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.cluster_config import ( impalad_admission_ctrl_flags, impalad_admission_ctrl_config_args, @@ -43,9 +40,14 @@ from tests.common.custom_cluster_test_suite import ( START_ARGS, CustomClusterTestSuite) from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties +from tests.common.impala_connection import ( + RUNNING, FINISHED, ERROR, + IMPALA_CONNECTION_EXCEPTION) from tests.common.resource_pool_config import ResourcePoolConfig from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster from tests.common.test_dimensions import ( + HS2, BEESWAX, + add_mandatory_exec_option, create_exec_option_dimension, create_single_exec_option_dimension, create_uncompressed_text_dimension) @@ -163,11 +165,22 @@ def metric_key(pool_name, metric_name): return "admission-controller.%s.%s" % (metric_name, pool_name) +def wait_single_statestore_heartbeat(): + """Wait for state sync across impalads.""" + sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0) + + class TestAdmissionControllerBase(CustomClusterTestSuite): + @classmethod def get_workload(self): return 'functional-query' + @classmethod + def default_test_protocol(cls): + # Do not change this. Multiple test method has been hardcoded under this assumption. + return HS2 + @classmethod def add_test_dimensions(cls): super(TestAdmissionControllerBase, cls).add_test_dimensions() @@ -176,14 +189,32 @@ class TestAdmissionControllerBase(CustomClusterTestSuite): cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) + def enable_admission_service(self, method): + """Inject argument to enable admission control service. + Must be called at setup_method() and before calling setup_method() of superclass.""" + start_args = "--enable_admission_service" + if START_ARGS in method.__dict__: + start_args = method.__dict__[START_ARGS] + " " + start_args + method.__dict__[START_ARGS] = start_args + if IMPALAD_ARGS in method.__dict__: + method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS] + + +class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite): + + @classmethod + def default_test_protocol(cls): + # HS2TestSuite override self.hs2_client with a raw Impala hs2 thrift client. + # This will set self.client = self.beeswax_client. + # Do not change this. Multiple test method has been hardcoded under this assumption. + return BEESWAX -class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): def __check_pool_rejected(self, client, pool, expected_error_re): try: client.set_configuration({'request_pool': pool}) client.execute("select 1") assert False, "Query should return error" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert re.search(expected_error_re, str(e)) def __check_query_options(self, profile, expected_query_options): @@ -232,34 +263,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): HS2TestSuite.check_response(get_profile_resp) self.__check_query_options(get_profile_resp.profile, expected_options) - def _execute_and_collect_profiles(self, queries, timeout_s, config_options={}, - allow_query_failure=False): - """Submit the query statements in 'queries' in parallel to the first impalad in - the cluster. After submission, the results are fetched from the queries in - sequence and their profiles are collected. Wait for up to timeout_s for - each query to finish. If 'allow_query_failure' is True, succeeds if the query - completes successfully or ends up in the EXCEPTION state. Otherwise expects the - queries to complete successfully. - Returns the profile strings.""" - client = self.cluster.impalads[0].service.create_beeswax_client() - expected_states = [client.QUERY_STATES['FINISHED']] - if allow_query_failure: - expected_states.append(client.QUERY_STATES['EXCEPTION']) - try: - handles = [] - profiles = [] - client.set_configuration(config_options) - for query in queries: - handles.append(client.execute_async(query)) - for query, handle in zip(queries, handles): - state = self.wait_for_any_state(handle, expected_states, timeout_s) - if state == client.QUERY_STATES['FINISHED']: - self.client.fetch(query, handle) - profiles.append(self.client.get_runtime_profile(handle)) - return profiles - finally: - client.close() - def get_ac_process(self): """Returns the Process that is running the admission control service.""" return self.cluster.impalads[0] @@ -314,7 +317,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Wait for query to clear admission control and get accounted for client.wait_for_admission_control(handle) self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout") - assert client.get_state(handle) == client.QUERY_STATES['FINISHED'] + assert client.is_finished(handle) # queueA has default query options mem_limit=128m,query_timeout_s=5 self.__check_query_options(client.get_runtime_profile(handle), [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA']) @@ -386,7 +389,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): open_session_req = TCLIService.TOpenSessionReq() open_session_req.username = "" open_session_resp = self.hs2_client.OpenSession(open_session_req) - TestAdmissionController.check_response(open_session_resp) + TestAdmissionControllerRawHS2.check_response(open_session_resp) try: execute_statement_req = TCLIService.TExecuteStatementReq() @@ -401,7 +404,110 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): finally: close_req = TCLIService.TCloseSessionReq() close_req.sessionHandle = open_session_resp.sessionHandle - TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req)) + TestAdmissionControllerRawHS2.check_response( + self.hs2_client.CloseSession(close_req)) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10, + pool_max_mem=1024 * 1024 * 1024)) + @needs_session() + def test_queuing_status_through_query_log_and_exec_summary(self): + """Test to verify that the HS2 client's GetLog() call and the ExecSummary expose + the query's queuing status, that is, whether the query was queued and what was the + latest queuing reason.""" + # Start a long-running query. + long_query_resp = self.execute_statement("select sleep(10000)") + # Ensure that the query has started executing. + self.wait_for_admission_control(long_query_resp.operationHandle) + # Submit another query. + queued_query_resp = self.execute_statement("select sleep(1)") + # Wait until the query is queued. + self.wait_for_operation_state(queued_query_resp.operationHandle, + TCLIService.TOperationState.PENDING_STATE) + # Check whether the query log message correctly exposes the queuing status. + log = self.wait_for_log_message( + queued_query_resp.operationHandle, "Admission result :") + assert "Admission result : Queued" in log, log + assert "Latest admission queue reason : number of running queries 1 is at or over " + "limit 1" in log, log + # Now check the same for ExecSummary. + summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq() + summary_req.operationHandle = queued_query_resp.operationHandle + summary_req.sessionHandle = self.session_handle + exec_summary_resp = self.hs2_client.GetExecSummary(summary_req) + assert exec_summary_resp.summary.is_queued + assert "number of running queries 1 is at or over limit 1" in \ + exec_summary_resp.summary.queued_reason, \ + exec_summary_resp.summary.queued_reason + # Close the running query. + self.close(long_query_resp.operationHandle) + # Close the queued query. + self.close(queued_query_resp.operationHandle) + + +class TestAdmissionControllerRawHS2WithACService(TestAdmissionControllerRawHS2): + """Runs all of the tests from TestAdmissionControllerRawHS2 but with the second + impalad in the minicluster configured to perform all admission control.""" + + def get_ac_process(self): + return self.cluster.admissiond + + def get_ac_log_name(self): + return "admissiond" + + def setup_method(self, method): + if self.exploration_strategy() != 'exhaustive': + pytest.skip('runs only in exhaustive') + self.enable_admission_service(method) + super(TestAdmissionControllerRawHS2, self).setup_method(method) + + +class TestAdmissionController(TestAdmissionControllerBase): + + def get_ac_process(self): + """Returns the Process that is running the admission control service.""" + return self.cluster.impalads[0] + + def get_ac_log_name(self): + """Returns the prefix of the log files for the admission control process.""" + return "impalad" + + def setup_method(self, method): + """All tests in this class is non-destructive. Therefore, we can afford + resetting clients at every setup_method.""" + super(TestAdmissionController, self).setup_method(method) + self._reset_impala_clients() + + def _execute_and_collect_profiles(self, queries, timeout_s, config_options={}, + allow_query_failure=False): + """Submit the query statements in 'queries' in parallel to the first impalad in + the cluster. After submission, the results are fetched from the queries in + sequence and their profiles are collected. Wait for up to timeout_s for + each query to finish. If 'allow_query_failure' is True, succeeds if the query + completes successfully or ends up in the EXCEPTION state. Otherwise expects the + queries to complete successfully. + Returns the profile strings.""" + client = self.cluster.impalads[0].service.create_hs2_client() + expected_states = [FINISHED] + if allow_query_failure: + expected_states.append(ERROR) + try: + handles = [] + profiles = [] + client.set_configuration(config_options) + for query in queries: + handles.append(client.execute_async(query)) + for query, handle in zip(queries, handles): + state = client.wait_for_any_impala_state(handle, expected_states, timeout_s) + if state == FINISHED: + self.client.fetch(query, handle) + profiles.append(client.get_runtime_profile(handle)) + return profiles + finally: + for handle in handles: + client.close_query(handle) + client.close() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -610,7 +716,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): handle = self.client.execute_async(query.format(1)) self.client.wait_for_finished_timeout(handle, 1000) expected_mem_limits = self.__get_mem_limits_admission_debug_page() - actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id) + actual_mem_limits = self.__get_mem_limits_memz_debug_page( + self.client.handle_id(handle)) mem_admitted =\ get_mem_admitted_backends_debug_page(self.cluster, self.get_ac_process()) debug_string = " expected_mem_limits:" + str( @@ -670,7 +777,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): def test_dedicated_coordinator_planner_estimates(self, vector, unique_database): """Planner tests to add coverage for coordinator estimates when using dedicated coordinators. Also includes coverage for verifying cluster memory admitted.""" - vector_copy = copy(vector) + vector_copy = deepcopy(vector) exec_options = vector_copy.get_value('exec_option') # Remove num_nodes from the options to allow test case runner to set it in one of # the test cases. @@ -756,17 +863,17 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): query = "select * from functional.alltypesagg, (select 1) B limit 1" # Successfully run a query with mem limit equal to the lowest process memory among # impalads - exec_options = copy(vector.get_value('exec_option')) + exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "2G" self.execute_query_expect_success(self.client, query, exec_options) # Test that a query scheduled to run on a single node and submitted to the impalad # with higher proc mem limit succeeds. - exec_options = copy(vector.get_value('exec_option')) + exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "3G" exec_options['num_nodes'] = "1" self.execute_query_expect_success(self.client, query, exec_options) # Exercise rejection checks in admission controller. - exec_options = copy(vector.get_value('exec_option')) + exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "3G" ex = self.execute_query_expect_failure(self.client, query, exec_options) assert ("Rejected query from pool default-pool: request memory needed " @@ -777,17 +884,18 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Wait for previous queries to finish to avoid flakiness. for impalad in self.cluster.impalads: impalad.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0) - impalad_with_2g_mem = self.cluster.impalads[2].service.create_beeswax_client() + impalad_with_2g_mem = self.cluster.impalads[2].service.create_client_from_vector( + vector) impalad_with_2g_mem.set_configuration_option('mem_limit', '1G') impalad_with_2g_mem.execute_async("select sleep(1000)") # Wait for statestore update to update the mem admitted in each node. - sleep(STATESTORE_RPC_FREQUENCY_MS / 1000) - exec_options = copy(vector.get_value('exec_option')) + wait_single_statestore_heartbeat() + exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "2G" # Since Queuing is synchronous, and we can't close the previous query till this # returns, we wait for this to timeout instead. self.execute_query(query, exec_options) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert re.search(r"Queued reason: Not enough memory available on host \S+.Needed " r"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e) finally: @@ -800,11 +908,11 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT), statestored_args=_STATESTORED_ARGS, disable_log_buffering=True) - def test_cancellation(self): + def test_cancellation(self, vector): """ Test to confirm that all Async cancellation windows are hit and are able to successfully cancel the query""" impalad = self.cluster.impalads[0] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_client_from_vector(vector) try: client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000") client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1) @@ -848,15 +956,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): client.set_configuration_option('enable_trivial_query_for_admission', 'false') queued_query_handle = client.execute_async("select 5") sleep(1) - assert client.get_state(queued_query_handle) == QueryState.COMPILED - assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle) + assert client.is_pending(queued_query_handle) + assert "Admission result: Queued" in client.get_runtime_profile( + queued_query_handle) # Only cancel the queued query, because close will wait till it unregisters, this # gives us a chance to close the running query and allow the dequeue thread to # dequeue the queue query client.cancel(queued_query_handle) client.close_query(handle) - client.close_query(queued_query_handle) - queued_profile = client.get_runtime_profile(queued_query_handle) + queued_profile = client.close_query(queued_query_handle, + fetch_profile_after_close=True) assert "Admission result: Cancelled (queued)" in queued_profile, queued_profile self.assert_log_contains( self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=") @@ -866,11 +975,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): handle = client.execute_async("select sleep(10000)") queued_query_handle = client.execute_async("select 6") sleep(1) - assert client.get_state(queued_query_handle) == QueryState.COMPILED - assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle) - client.close_query(queued_query_handle) + assert client.is_pending(queued_query_handle) + assert "Admission result: Queued" in client.get_runtime_profile( + queued_query_handle) + queued_profile = client.close_query(queued_query_handle, + fetch_profile_after_close=True) client.close_query(handle) - queued_profile = client.get_runtime_profile(queued_query_handle) assert "Admission result: Cancelled (queued)" in queued_profile for i in self.cluster.impalads: i.service.wait_for_metric_value( @@ -1000,7 +1110,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Run a bunch of queries with mem_limit set so that only one can be admitted # immediately. The rest should be queued and dequeued (timeout) due to host memory # pressure. - STMT = "select sleep(100)" + STMT = "select sleep(1000)" TIMEOUT_S = 20 NUM_QUERIES = 5 # IMPALA-9856: Disable query result spooling so that we can run queries with low @@ -1034,7 +1144,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Run a bunch of queries with mem_limit set so that only one can be admitted # immediately. The rest should be queued and dequeued (timeout) due to pool memory # pressure. - STMT = "select sleep(100)" + STMT = "select sleep(1000)" TIMEOUT_S = 20 NUM_QUERIES = 5 # IMPALA-9856: Disable query result spooling so that we can run queries with low @@ -1143,7 +1253,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.close_query(handle_running) self.__assert_num_queries_accounted(0) # Case 3: When a query gets rejected - exec_options = copy(vector.get_value('exec_option')) + exec_options = deepcopy(vector.get_value('exec_option')) exec_options['mem_limit'] = "1b" self.execute_query_expect_failure(self.client, query, exec_options) self.__assert_num_queries_accounted(0) @@ -1211,6 +1321,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): pool=pool) query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, USER_ROOT, pool=pool) + wait_single_statestore_heartbeat() keys = [ "admission-controller.agg-current-users.root.queueB", "admission-controller.local-current-users.root.queueB", @@ -1294,16 +1405,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): query_handles.append(query_handle) # Let state sync across impalads. - sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0) + wait_single_statestore_heartbeat() # Another query should be rejected impalad = self.cluster.impalads[limit % 2] - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.set_configuration({'request_pool': pool}) try: client.execute(SLOW_QUERY, user=user) assert False, "query should fail" - except Exception as e: + except IMPALA_CONNECTION_EXCEPTION as e: # Construct the expected error message. expected = ("Rejected query from pool {pool}: current per-{type} load {limit} for " "user '{user}'{group_description} is at or above the {err_type} limit " @@ -1328,14 +1439,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): def execute_async_and_wait_for_running(self, impalad, query, user, pool): # Execute a query asynchronously, and wait for it to be running. - # Use beeswax client as it allows specifying the user that runs the query. - client = impalad.service.create_beeswax_client() + client = impalad.service.create_hs2_client() client.set_configuration({'request_pool': pool}) handle = client.execute_async(query, user=user) timeout_s = 10 # Make sure the query has been admitted and is running. - self.wait_for_state( - handle, client.QUERY_STATES['RUNNING'], timeout_s, client=client) + client.wait_for_impala_state(handle, RUNNING, timeout_s) return self.ClientAndHandle(client, handle) @pytest.mark.execute_serially @@ -1375,8 +1484,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.client.close_query(sleep_query_handle) # Observe that the queued query fails. - self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20), - self.close_query(queued_query_handle) + self.client.wait_for_impala_state(queued_query_handle, ERROR, 20), + self.client.close_query(queued_query_handle) # Change the config back to a valid value config.set_config_value(pool_name, config_str, 0) @@ -1396,8 +1505,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.client.close_query(sleep_query_handle) # Observe that the queued query fails. - self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20), - self.close_query(queued_query_handle) + self.client.wait_for_impala_state(queued_query_handle, ERROR, 20), + self.client.close_query(queued_query_handle) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -1405,7 +1514,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): pool_max_mem=1024 * 1024 * 1024), statestored_args=_STATESTORED_ARGS) def test_trivial_query(self): - self.client.execute("set enable_trivial_query_for_admission=false") + self.client.set_configuration_option("enable_trivial_query_for_admission", "false") # Test the second request does need to queue when trivial query is disabled. sleep_query_handle = self.client.execute_async("select sleep(10000)") @@ -1417,7 +1526,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.client.close_query(sleep_query_handle) self.client.close_query(trivial_query_handle) - self.client.execute("set enable_trivial_query_for_admission=true") + self.client.set_configuration_option("enable_trivial_query_for_admission", "true") # Test when trivial query is enabled, all trivial queries should be # admitted immediately. sleep_query_handle = self.client.execute_async("select sleep(10000)") @@ -1439,7 +1548,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Test whether it will fail for a normal query. failed_query_handle = self.client.execute_async( "select * from functional_parquet.alltypes limit 100") - self.wait_for_state(failed_query_handle, QueryState.EXCEPTION, 20) + self.client.wait_for_impala_state(failed_query_handle, ERROR, 20) self.client.close_query(failed_query_handle) # Test it should pass all the trivial queries. self._test_trivial_queries_suc() @@ -1463,7 +1572,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): def _test_multi_trivial_query_runs(self): timeout = 10 admit_obj = self.admit_obj - client = admit_obj.cluster.impalads[0].service.create_beeswax_client() + client = admit_obj.cluster.impalads[0].service.create_hs2_client() for i in range(100): handle = client.execute_async(self.sql) if not self.expect_err: @@ -1580,44 +1689,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): assert False, "Timed out waiting for change to profile\nSearch " \ "String: {0}\nProfile:\n{1}".format(search_string, str(profile)) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10, - pool_max_mem=1024 * 1024 * 1024)) - @needs_session() - def test_queuing_status_through_query_log_and_exec_summary(self): - """Test to verify that the HS2 client's GetLog() call and the ExecSummary expose - the query's queuing status, that is, whether the query was queued and what was the - latest queuing reason.""" - # Start a long-running query. - long_query_resp = self.execute_statement("select sleep(10000)") - # Ensure that the query has started executing. - self.wait_for_admission_control(long_query_resp.operationHandle) - # Submit another query. - queued_query_resp = self.execute_statement("select sleep(1)") - # Wait until the query is queued. - self.wait_for_operation_state(queued_query_resp.operationHandle, - TCLIService.TOperationState.PENDING_STATE) - # Check whether the query log message correctly exposes the queuing status. - log = self.wait_for_log_message( - queued_query_resp.operationHandle, "Admission result :") - assert "Admission result : Queued" in log, log - assert "Latest admission queue reason : number of running queries 1 is at or over " - "limit 1" in log, log - # Now check the same for ExecSummary. - summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq() - summary_req.operationHandle = queued_query_resp.operationHandle - summary_req.sessionHandle = self.session_handle - exec_summary_resp = self.hs2_client.GetExecSummary(summary_req) - assert exec_summary_resp.summary.is_queued - assert "number of running queries 1 is at or over limit 1" in \ - exec_summary_resp.summary.queued_reason,\ - exec_summary_resp.summary.queued_reason - # Close the running query. - self.close(long_query_resp.operationHandle) - # Close the queued query. - self.close(queued_query_resp.operationHandle) - @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=(impalad_admission_ctrl_flags( @@ -1743,12 +1814,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # state because of the 'WAIT' debug action), wait for the 'lineitem' scan to # complete, and then validate that one of the executor backends shutdowns and # releases its admitted memory. - self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], timeout) + self.client.wait_for_impala_state(handle, RUNNING, timeout) # Once the 'lineitem' scan completes, NumCompletedBackends should be 1. self.assert_eventually(60, 1, lambda: "NumCompletedBackends: 1 (1)" in self.client.get_runtime_profile(handle)) get_num_completed_backends(self.cluster.impalads[0].service, - handle.get_handle().id) == 1 + self.client.handle_id(handle)) == 1 mem_admitted =\ get_mem_admitted_backends_debug_page(self.cluster, self.get_ac_process()) num_executor_zero_admitted = 0 @@ -2012,13 +2083,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): def setup_method(self, method): if self.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') - - start_args = "--enable_admission_service" - if START_ARGS in method.__dict__: - start_args = method.__dict__[START_ARGS] + " " + start_args - method.__dict__[START_ARGS] = start_args - if IMPALAD_ARGS in method.__dict__: - method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS] + self.enable_admission_service(method) super(TestAdmissionControllerWithACService, self).setup_method(method) @SkipIfNotHdfsMinicluster.tuned_for_minicluster @@ -2038,8 +2103,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): timeout_s = 10 # Make sure the query is through admission control before killing the admissiond. It # should be unaffected and finish successfully. - self.wait_for_state( - before_kill_handle, self.client.QUERY_STATES['RUNNING'], timeout_s) + self.client.wait_for_impala_state(before_kill_handle, RUNNING, timeout_s) self.cluster.admissiond.kill() result = self.client.fetch(query, before_kill_handle) assert result.data == ["730"] @@ -2059,7 +2123,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): try: result = self.client.fetch(query, no_restart_handle) assert False, "Query should have failed" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "Failed to admit query after waiting " in str(e) @SkipIfNotHdfsMinicluster.tuned_for_minicluster @@ -2077,20 +2141,18 @@ class TestAdmissionControllerWithACService(TestAdmissionController): handle1 = self.execute_query_async(query) timeout_s = 10 # Make sure the first query has been admitted. - self.wait_for_state( - handle1, self.client.QUERY_STATES['RUNNING'], timeout_s) + self.client.wait_for_impala_state(handle1, RUNNING, timeout_s) # Run another query. This query should be queued because only 1 query is allowed in # the default pool. - handle2 = self.execute_query_async(query) + handle2 = self.client.execute_async(query) self._wait_for_change_to_profile(handle2, "Admission result: Queued") # Cancel the first query. It's resources should be released and the second query # should be admitted. self.client.cancel(handle1) self.client.close_query(handle1) - self.wait_for_state( - handle2, self.client.QUERY_STATES['RUNNING'], timeout_s) + self.client.wait_for_impala_state(handle2, RUNNING, timeout_s) @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @@ -2108,20 +2170,18 @@ class TestAdmissionControllerWithACService(TestAdmissionController): handle1 = self.execute_query_async(query) timeout_s = 10 # Make sure the first query has been admitted. - self.wait_for_state( - handle1, self.client.QUERY_STATES['RUNNING'], timeout_s) + self.client.wait_for_impala_state(handle1, RUNNING, timeout_s) # Run another query. This query should be queued because the executor group only has 1 # slot. - handle2 = self.execute_query_async(query) + handle2 = self.client.execute_async(query) self._wait_for_change_to_profile(handle2, "Admission result: Queued") # Cancel the first query. It's resources should be released and the second query # should be admitted. self.client.cancel(handle1) self.client.close_query(handle1) - self.wait_for_state( - handle2, self.client.QUERY_STATES['RUNNING'], timeout_s) + self.client.wait_for_impala_state(handle2, RUNNING, timeout_s) @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @@ -2133,25 +2193,23 @@ class TestAdmissionControllerWithACService(TestAdmissionController): # Query designed to run for a few minutes. query = "select count(*) from functional.alltypes where int_col = sleep(10000)" impalad1 = self.cluster.impalads[0] - client1 = impalad1.service.create_beeswax_client() + client1 = impalad1.service.create_hs2_client() handle1 = client1.execute_async(query) timeout_s = 10 # Make sure the first query has been admitted. - self.wait_for_state( - handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client1) + client1.wait_for_impala_state(handle1, RUNNING, timeout_s) # Run another query with a different coordinator. This query should be queued because # only 1 query is allowed in the default pool. impalad2 = self.cluster.impalads[1] - client2 = impalad2.service.create_beeswax_client() + client2 = impalad2.service.create_hs2_client() handle2 = client2.execute_async(query) self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2) # Kill the coordinator for the first query. The resources for the query should get # cleaned up and the second query should be admitted. impalad1.kill() - self.wait_for_state( - handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2) + client2.wait_for_impala_state(handle2, RUNNING, timeout_s) class TestAdmissionControllerStress(TestAdmissionControllerBase): @@ -2185,15 +2243,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): submitting to a single impalad, we know exactly what the values should be, otherwise we just check that they are within reasonable bounds. """ + + BATCH_SIZE = 100 + @classmethod def add_test_dimensions(cls): super(TestAdmissionControllerStress, cls).add_test_dimensions() - # This test really need to run using beeswax client since hs2 client has not - # implemented some methods needed to query admission control status. - cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax')) # Slow down test query by setting low batch_size. cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( - cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[10])) + cluster_sizes=[0], disable_codegen_options=[False], + batch_sizes=[cls.BATCH_SIZE])) + # Turning off result spooling allows us to better control query execution by + # controlling the number of rows fetched. This allows us to maintain resource + # usage among backends. + add_mandatory_exec_option(cls, 'spool_query_results', 0) + # Set 100ms long poling time to get faster initial response. + add_mandatory_exec_option(cls, 'long_polling_time_ms', 100) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION)) cls.ImpalaTestMatrix.add_dimension( @@ -2356,7 +2421,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): REQUEST_QUEUE_UPDATE_INTERVAL)['count'] assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,) - sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000)) + wait_single_statestore_heartbeat() LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats) def wait_for_admitted_threads(self, num_threads): @@ -2410,7 +2475,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): sleep(1) class SubmitQueryThread(threading.Thread): - def __init__(self, impalad, additional_query_options, vector, query_num, + def __init__(self, impalad, vector, query_num, query_end_behavior, executing_threads, exit_signal): """ executing_threads must be provided so that this thread can add itself when the @@ -2418,8 +2483,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): """ super(self.__class__, self).__init__() self.executing_threads = executing_threads - self.vector = vector - self.additional_query_options = additional_query_options + # Make vector local to this thread, because run() will modify it later. + self.vector = deepcopy(vector) self.query_num = query_num self.query_end_behavior = query_end_behavior self.impalad = impalad @@ -2431,6 +2496,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # Set by the main thread when tearing down self.exit_signal = exit_signal self.setDaemon(True) + # Determine how many rows to fetch per interval. + self.rows_per_fetch = TestAdmissionControllerStress.BATCH_SIZE def thread_should_run(self): return not self.exit_signal.is_set() @@ -2444,20 +2511,12 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): if not self.thread_should_run(): return - exec_options = dict() - exec_options.update(self.vector.get_exec_option_dict()) - exec_options.update(self.additional_query_options) - # Turning off result spooling allows us to better control query execution by - # controlling the number of rows fetched. This allows us to maintain resource - # usage among backends. - exec_options['spool_query_results'] = 0 - exec_options['long_polling_time_ms'] = 100 if self.query_end_behavior == 'QUERY_TIMEOUT': - exec_options['query_timeout_s'] = QUERY_END_TIMEOUT_S + self.vector.set_exec_option('query_timeout_s', QUERY_END_TIMEOUT_S) query = QUERY.format(self.query_num) self.query_state = 'SUBMITTING' - client = self.impalad.service.create_beeswax_client() - client.set_configuration(exec_options) + assert self.vector.get_protocol() == HS2, "Must use hs2 protocol" + client = self.impalad.service.create_client_from_vector(self.vector) LOG.info("Submitting query %s with ending behavior %s", self.query_num, self.query_end_behavior) @@ -2485,7 +2544,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): msg = "Admission result for query {} : {}".format( self.query_num, admission_result) self.log_handle(client, query_handle, msg) - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: LOG.exception(e) raise e @@ -2500,10 +2559,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # query. It needs to wait until the main thread requests it to end its query. while self.thread_should_run() and self.query_state != 'COMPLETED': # The query needs to stay active until the main thread requests it to end. - # Otherwise, the query may get cancelled early. Fetch 1 row every - # FETCH_INTERVAL to keep the query active. - fetch_result = client.fetch(query, query_handle, 1) - assert len(fetch_result.data) == 1, str(fetch_result) + # Otherwise, the query may get cancelled early. Fetch self.rows_per_fetch row + # every FETCH_INTERVAL to keep the query active. + fetch_result = client.fetch(query, query_handle, self.rows_per_fetch) + assert len(fetch_result.data) == self.rows_per_fetch, str(fetch_result) self.num_rows_fetched += len(fetch_result.data) if self.query_state == 'REQUEST_QUERY_END': self._end_query(client, query, query_handle) @@ -2537,20 +2596,19 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # Sleep and wait for the query to be cancelled. The cancellation will # set the state to EXCEPTION. start_time = time() - while self.thread_should_run() and ( - client.get_state(query_handle) != client.QUERY_STATES['EXCEPTION']): + while self.thread_should_run() and not client.is_error(query_handle): assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,) sleep(1) - # try fetch and confirm from exception message that query was timed out. try: - client.fetch(query, query_handle) + # try fetch and confirm from exception message that query was timed out. + client.fetch(query, query_handle, discard_results=True) assert False - except Exception as e: + except (Exception, ImpalaHiveServer2Service) as e: assert 'expired due to client inactivity' in str(e) elif self.query_end_behavior == 'EOS': # Fetch all rows so we hit eos. - client.fetch(query, query_handle) + client.fetch(query, query_handle, discard_results=True) elif self.query_end_behavior == 'CLIENT_CANCEL': client.cancel(query_handle) else: @@ -2604,8 +2662,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): LOG.info("Found %s queued queries after %s seconds", actual_queued, round(time() - start_time, 1)) - def run_admission_test(self, vector, additional_query_options, - check_user_aggregates=False): + def run_admission_test(self, vector, check_user_aggregates=False): LOG.info("Starting test case with parameters: %s", vector) self.impalads = self.cluster.impalads self.ac_processes = self.get_ac_processes() @@ -2628,8 +2685,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): sleep(submission_delay_ms / 1000.0) impalad = self.impalads[query_num % len(self.impalads)] query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)] - thread = self.SubmitQueryThread(impalad, additional_query_options, vector, - query_num, query_end_behavior, self.executing_threads, self.exit) + thread = self.SubmitQueryThread(impalad, vector, query_num, query_end_behavior, + self.executing_threads, self.exit) thread.start() self.all_threads.append(thread) @@ -2747,11 +2804,12 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): if self.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') self.pool_name = 'default-pool' + vector.set_exec_option('request_pool', self.pool_name) + vector.set_exec_option('mem_limit', sys.maxsize) # The pool has no mem resources set, so submitting queries with huge mem_limits # should be fine. This exercises the code that does the per-pool memory # accounting (see MemTracker::GetPoolMemReserved()) without actually being throttled. - self.run_admission_test(vector, {'request_pool': self.pool_name, - 'mem_limit': sys.maxsize}) + self.run_admission_test(vector) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -2761,7 +2819,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): statestored_args=_STATESTORED_ARGS) def test_admission_controller_with_configs(self, vector): self.pool_name = 'root.queueB' - self.run_admission_test(vector, {'request_pool': self.pool_name}) + vector.set_exec_option('request_pool', self.pool_name) + self.run_admission_test(vector) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -2778,8 +2837,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # Save time by running only 1 out of 6 vector combination. pytest.skip('Only run with round_robin_submission=True and submission_delay_ms=0.') self.pool_name = 'root.queueF' - self.run_admission_test(vector, {'request_pool': self.pool_name}, - check_user_aggregates=True) + vector.set_exec_option('request_pool', self.pool_name) + self.run_admission_test(vector, check_user_aggregates=True) def get_proc_limit(self): """Gets the process mem limit as reported by the impalad's mem-tracker metric. @@ -2816,8 +2875,9 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # the mem limit. num_impalads = len(self.cluster.impalads) query_mem_limit = (proc_limit // MAX_NUM_CONCURRENT_QUERIES // num_impalads) - 1 - self.run_admission_test(vector, - {'request_pool': self.pool_name, 'mem_limit': query_mem_limit}) + vector.set_exec_option('request_pool', self.pool_name) + vector.set_exec_option('mem_limit', query_mem_limit) + self.run_admission_test(vector) class TestAdmissionControllerStressWithACService(TestAdmissionControllerStress): @@ -2833,11 +2893,5 @@ class TestAdmissionControllerStressWithACService(TestAdmissionControllerStress): def setup_method(self, method): if self.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') - - start_args = "--enable_admission_service" - if START_ARGS in method.__dict__: - start_args = method.__dict__[START_ARGS] + " " + start_args - method.__dict__[START_ARGS] = start_args - if IMPALAD_ARGS in method.__dict__: - method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS] + self.enable_admission_service(method) super(TestAdmissionControllerStressWithACService, self).setup_method(method) diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py index a56c877ba..e76f3457d 100644 --- a/tests/custom_cluster/test_session_expiration.py +++ b/tests/custom_cluster/test_session_expiration.py @@ -179,9 +179,7 @@ class TestSessionExpiration(CustomClusterTestSuite): self.close_impala_clients() # Create 2 sessions. client1 = impalad.service.create_hs2_client() - client1.execute_async("select sleep(5000)") client2 = impalad.service.create_hs2_client() - client2.execute_async("select sleep(5000)") try: # Trying to open a third session should fail. impalad.service.create_hs2_client() diff --git a/tests/util/thrift_util.py b/tests/util/thrift_util.py index 8baff3334..427769849 100644 --- a/tests/util/thrift_util.py +++ b/tests/util/thrift_util.py @@ -25,6 +25,7 @@ from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport from thrift_sasl import TSaslClientTransport + def create_transport(host, port, service, transport_type="buffered", user=None, password=None, use_ssl=False, ssl_cert=None): """ @@ -78,4 +79,11 @@ def op_handle_to_query_id(t_op_handle): if t_op_handle is None or t_op_handle.operationId is None: return None # This should use the same logic as in ImpalaServer::THandleIdentifierToTUniqueId(). - return "%x:%x" % struct.unpack("QQ", t_op_handle.operationId.guid) + return "%016x:%016x" % struct.unpack("QQ", t_op_handle.operationId.guid) + + +def session_handle_to_session_id(t_session_op_handle): + if t_session_op_handle is None or t_session_op_handle.sessionId is None: + return None + # This should use the same logic as in ImpalaServer::THandleIdentifierToTUniqueId(). + return "%016x:%016x" % struct.unpack("QQ", t_session_op_handle.sessionId.guid)
