This is an automated email from the ASF dual-hosted git repository. arawat pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8c2017aa001b9c868017e9a29d89f9d257aa10bb Author: Riza Suminto <[email protected]> AuthorDate: Tue Jan 14 16:46:56 2025 -0800 IMPALA-12937: (part 2) Deflake TestAdmissionControllerStress TestAdmissionControllerStress::test_mem_limit is flaky again. One fragment instance that expected to stay alive until query submission loop ends actually finished early, even though clients are only fetching 1 rows every 0.5 second. This patch attempts to address the flakiness in two ways. First, is lowering batch_size to 10. Lower batch size is expected to keep all running fragment instances runnning until the query admission loop finishes. Second, is lowering num_queries from 50 to 40 if exploration_strategy is exhaustive. This will shorten the query submission loop, expecially when submission_delay_ms is high (150 seconds). This is OK because, based on the assertions, the test framework will only retain at most 15 active queries and 10 in-queue queries once the query submission loop ends. This patch also refactors SubmitQueryThread. Set long_polling_time_ms=100 for all queries to get faster initial response. The lock is removed and replaced with threading.Event to signal the end of test. The thread client and query_handle scope is made local within run() method for proper cleanup. Set timeout for wait_for_admission_control instead of waiting indefinitely. impala_connection.py is refactored so that BeeswaxConnection has matching logging functionality as ImpylaHS2Connection. Changed ImpylaHS2Connection._collect_profile_and_log initialization for possibillity that experimental Calcite planner may have ability to pull query profile and log from Impala backend. Testing: - Run and pass test_mem_limit in both TestAdmissionControllerStress and TestAdmissionControllerStressWithACService in exhaustive exploration 10 times. - Run and pass the whole TestAdmissionControllerStress and TestAdmissionControllerStressWithACService in exhaustive exploration. Change-Id: I706e3dedce69e38103a524c64306f39eac82fac3 Reviewed-on: http://gerrit.cloudera.org:8080/22351 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/beeswax/impala_beeswax.py | 37 ++-- tests/common/impala_connection.py | 76 +++++--- tests/custom_cluster/test_admission_controller.py | 217 +++++++++++++--------- 3 files changed, 197 insertions(+), 133 deletions(-) diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index d90dd7601..c33dcb2b6 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -44,6 +44,10 @@ from thrift.protocol import TBinaryProtocol from thrift.Thrift import TApplicationException LOG = logging.getLogger('impala_beeswax') +# time to sleep in seconds before polling again. This uses a fixed +# 50 millisecond sleep that doesn't vary by elapsed time. This is only used for +# testing, and there is no reason to sleep longer for test environments. +DEFAULT_SLEEP_INTERVAL = 0.05 # Custom exception wrapper. @@ -291,16 +295,9 @@ class ImpalaBeeswaxClient(object): def close_query(self, handle): self.__do_rpc(lambda: self.imp_service.close(handle)) - def _get_sleep_interval(self, start_time): - """Returns the time to sleep in seconds before polling again. This uses a fixed - 50 millisecond sleep that doesn't vary by elapsed time. This is only used for - testing, and there is no reason to sleep longer for test environments.""" - return 0.05 - def wait_for_finished(self, query_handle): """Given a query handle, polls the coordinator waiting for the query to transition to 'FINISHED' state""" - loop_start = time.time() while True: start_rpc_time = time.time() query_state = self.get_state(query_handle) @@ -315,9 +312,8 @@ class ImpalaBeeswaxClient(object): raise ImpalaBeeswaxException(error_log, None) finally: self.close_query(query_handle) - sleep_time = self._get_sleep_interval(loop_start) - if rpc_time < sleep_time: - time.sleep(sleep_time - rpc_time) + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) def wait_for_finished_timeout(self, query_handle, timeout=10): """Given a query handle and a timeout, polls the coordinator waiting for the query to @@ -337,19 +333,24 @@ class ImpalaBeeswaxClient(object): raise ImpalaBeeswaxException(error_log, None) finally: self.close_query(query_handle) - sleep_time = self._get_sleep_interval(start_time) - if rpc_time < sleep_time: - time.sleep(sleep_time - rpc_time) + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) return False - def wait_for_admission_control(self, query_handle): + def wait_for_admission_control(self, query_handle, timeout_s=60): """Given a query handle, polls the coordinator waiting for it to complete - admission control processing of the query""" - while True: + admission control processing of the query. + Return True if query pass admission control after given timeout_s.""" + start_time = time.time() + while (time.time() - start_time < timeout_s): + start_rpc_time = time.time() query_state = self.get_state(query_handle) + rpc_time = time.time() - start_rpc_time if query_state > self.query_states["COMPILED"]: - break - time.sleep(0.05) + return True + if rpc_time < DEFAULT_SLEEP_INTERVAL: + time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time) + return False def get_admission_result(self, query_handle): """Given a query handle, returns the admission result from the query profile""" diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 0bdf2a8ed..bc81ebb76 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -166,6 +166,18 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): rows returned is less than max_rows, all the rows have been fetched.""" pass + @abc.abstractmethod + def handle_id(self, operation_handle): + """Return a string id for given operation_handle. + Most implementations will return an Impala query id for given handle. + Otherwise, return str(operation_handle).""" + pass + + @abc.abstractmethod + def log_handle(self, operation_handle, message): + """Log 'message' at INFO level, along with id of 'operation_handle'.""" + pass + # Represents a connection to Impala using the Beeswax API. class BeeswaxConnection(ImpalaConnection): @@ -204,11 +216,11 @@ class BeeswaxConnection(ImpalaConnection): self.__beeswax_client.close_connection() def close_query(self, operation_handle): - LOG.info("-- closing query for operation handle: %s" % operation_handle) + self.log_handle(operation_handle, 'closing query for operation') self.__beeswax_client.close_query(operation_handle.get_handle()) def close_dml(self, operation_handle): - LOG.info("-- closing DML query for operation handle: %s" % operation_handle) + self.log_handle(operation_handle, 'closing DML query') self.__beeswax_client.close_dml(operation_handle.get_handle()) def execute(self, sql_stmt, user=None, fetch_profile_after_close=False): @@ -224,48 +236,57 @@ class BeeswaxConnection(ImpalaConnection): return OperationHandle(beeswax_handle, sql_stmt) def cancel(self, operation_handle): - LOG.info("-- canceling operation: %s" % operation_handle) + self.log_handle(operation_handle, 'canceling operation') return self.__beeswax_client.cancel_query(operation_handle.get_handle()) def get_state(self, operation_handle): - LOG.info("-- getting state for operation: %s" % operation_handle) + self.log_handle(operation_handle, 'getting state') return self.__beeswax_client.get_state(operation_handle.get_handle()) def state_is_finished(self, operation_handle): - LOG.info("-- checking finished state for operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'checking finished state for operation') return self.get_state(operation_handle) == self.QUERY_STATES["FINISHED"] def get_exec_summary(self, operation_handle): - LOG.info("-- getting exec summary operation: %s" % operation_handle) + self.log_handle(operation_handle, 'getting exec summary operation') return self.__beeswax_client.get_exec_summary(operation_handle.get_handle()) def get_runtime_profile(self, operation_handle): - LOG.info("-- getting runtime profile operation: %s" % operation_handle) + self.log_handle(operation_handle, 'getting runtime profile operation') return self.__beeswax_client.get_runtime_profile(operation_handle.get_handle()) def wait_for_finished_timeout(self, operation_handle, timeout): - LOG.info("-- waiting for query to reach FINISHED state: %s" % operation_handle) + self.log_handle(operation_handle, 'waiting for query to reach FINISHED state') return self.__beeswax_client.wait_for_finished_timeout( operation_handle.get_handle(), timeout) - def wait_for_admission_control(self, operation_handle): - LOG.info("-- waiting for completion of the admission control processing of the " - "query: %s" % operation_handle) - return self.__beeswax_client.wait_for_admission_control(operation_handle.get_handle()) + def wait_for_admission_control(self, operation_handle, timeout_s=60): + self.log_handle(operation_handle, 'waiting for completion of the admission control') + return self.__beeswax_client.wait_for_admission_control( + operation_handle.get_handle(), timeout_s=timeout_s) def get_admission_result(self, operation_handle): - LOG.info("-- getting the admission result: %s" % operation_handle) + self.log_handle(operation_handle, 'getting the admission result') return self.__beeswax_client.get_admission_result(operation_handle.get_handle()) def get_log(self, operation_handle): - LOG.info("-- getting log for operation: %s" % operation_handle) + self.log_handle(operation_handle, 'getting log for operation') return self.__beeswax_client.get_log(operation_handle.get_handle().log_context) def fetch(self, sql_stmt, operation_handle, max_rows=-1): - LOG.info("-- fetching results from: %s" % operation_handle) + self.log_handle(operation_handle, 'fetching {} rows'.format( + 'all' if max_rows < 0 else max_rows)) return self.__beeswax_client.fetch_results( sql_stmt, operation_handle.get_handle(), max_rows) + def handle_id(self, operation_handle): + query_id = operation_handle.get_handle().id + return query_id if query_id else str(operation_handle) + + def log_handle(self, operation_handle, message): + handle_id = self.handle_id(operation_handle) + LOG.info("-- {0}: {1}".format(handle_id, message)) + class ImpylaHS2Connection(ImpalaConnection): """Connection to Impala using the impyla client connecting to HS2 endpoint. @@ -291,7 +312,9 @@ class ImpylaHS2Connection(ImpalaConnection): # Query options to send along with each query. self.__query_options = {} self._is_hive = is_hive - self._collect_profile_and_log = not is_hive and collect_profile_and_log + # Some Hive HS2 protocol, such as custom Calcite planner, may be able to collect + # profile and log from Impala. + self._collect_profile_and_log = collect_profile_and_log def set_configuration_option(self, name, value): self.__query_options[name] = str(value) @@ -329,7 +352,7 @@ class ImpylaHS2Connection(ImpalaConnection): try: # Explicitly close the cursor so that it will close the session. self.__cursor.close() - except Exception as e: + except Exception: # The session may no longer be valid if the impalad was restarted during the test. pass try: @@ -354,6 +377,9 @@ class ImpylaHS2Connection(ImpalaConnection): def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING, fetch_profile_after_close=False): + LOG.info("-- executing against {0} at {1}\n".format( + self._is_hive and 'Hive' or 'Impala', self.__host_port)) + log_sql_stmt(sql_stmt) self.__cursor.execute(sql_stmt, configuration=self.__query_options) handle = OperationHandle(self.__cursor, sql_stmt) @@ -412,17 +438,16 @@ class ImpylaHS2Connection(ImpalaConnection): lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode() return "{0}:{1}".format(hi_str, lo_str) - def handle_id_for_logging(self, operation_handle): + def handle_id(self, operation_handle): query_id = self.get_query_id(operation_handle) return query_id if query_id else str(operation_handle) - def log_handle(self, handle, message): - handle_id = self.handle_id_for_logging(handle) + def log_handle(self, operation_handle, message): + handle_id = self.handle_id(operation_handle) LOG.info("-- {0}: {1}".format(handle_id, message)) def get_state(self, operation_handle): - handle_id = self.handle_id_for_logging(operation_handle) - LOG.info("-- getting state for operation: {0}".format(handle_id)) + self.log_handle(operation_handle, 'getting state') cursor = operation_handle.get_handle() return cursor.status() @@ -464,9 +489,10 @@ class ImpylaHS2Connection(ImpalaConnection): if not PROGRESS_LOG_RE.match(line)] return '\n'.join(lines) - def fetch(self, sql_stmt, handle, max_rows=-1): - self.log_handle(handle, 'fetching results') - return self.__fetch_results(handle, max_rows) + def fetch(self, sql_stmt, operation_handle, max_rows=-1): + self.log_handle(operation_handle, 'fetching {} rows'.format( + 'all' if max_rows < 0 else max_rows)) + return self.__fetch_results(operation_handle, max_rows) def __fetch_results(self, handle, max_rows=-1, profile_format=TRuntimeProfileFormat.STRING): diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index c746cb221..71a204b6c 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -25,6 +25,7 @@ import os import pytest import re import shutil +import signal import sys import threading from copy import copy @@ -43,6 +44,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.resource_pool_config import ResourcePoolConfig from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster from tests.common.test_dimensions import ( + create_exec_option_dimension, create_single_exec_option_dimension, create_uncompressed_text_dimension) from tests.common.test_vector import ImpalaTestDimension @@ -62,6 +64,7 @@ LOG = logging.getLogger('admission_test') # the query active and consuming resources by fetching one row at a time. The # where clause is for debugging purposes; each thread will insert its id so # that running queries can be correlated with the thread that submitted them. +# This query returns 329970 rows. QUERY = " union all ".join(["select * from functional.alltypesagg where id != {0}"] * 30) SLOW_QUERY = "select count(*) from functional.alltypes where int_col = sleep(20000)" @@ -124,6 +127,10 @@ QUERY_END_TIMEOUT_S = 3 FETCH_INTERVAL = 0.5 assert FETCH_INTERVAL < QUERY_END_TIMEOUT_S +# How long to wait for admission control status. This assumes a worst case of 40 queries +# admitted serially, with a 3s inactivity timeout. +ADMIT_TIMEOUT_S = 120 + # Value used for --admission_control_stale_topic_threshold_ms in tests. STALE_TOPIC_THRESHOLD_MS = 500 @@ -730,7 +737,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2, impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024)) - def test_mem_limit_coordinators(self, vector, unique_database): + def test_mem_limit_coordinators(self, vector): """Verify that the query option mem_limit_coordinators is only enforced on the coordinators.""" ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) @@ -1946,7 +1953,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions (parameterized) and the ability to submit to one impalad or many in a round-robin fashion. Each query is submitted on a separate thread. After admission, the query - thread will block with the query open and wait for the main thread to notify it to + thread will fetch rows slowly and wait for the main thread to notify it to end its query. The query thread can end its query by fetching to the end, cancelling itself, closing itself, or waiting for the query timeout to take effect. Depending on the test parameters a varying number of queries will be admitted, queued, and @@ -1976,29 +1983,42 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): @classmethod def add_test_dimensions(cls): super(TestAdmissionControllerStress, cls).add_test_dimensions() + # This test really need to run using beeswax client since hs2 client has not + # implemented some methods needed to query admission control status. + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax')) + # Slow down test query by setting low batch_size. + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( + cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[10])) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION)) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('submission_delay_ms', *SUBMISSION_DELAY_MS)) - # Additional constraints for code coverage jobs and core. - num_queries = 50 + # The number of queries to submit. The test does not support fewer queries than + # MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some validation logic + # simple. + num_queries = 40 if ImpalaTestClusterProperties.get_instance().has_code_coverage(): # Code coverage builds can't handle the increased concurrency. num_queries = 15 elif cls.exploration_strategy() == 'core': num_queries = 30 + assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES + cls.ImpalaTestMatrix.add_dimension( + ImpalaTestDimension('num_queries', num_queries)) + + @classmethod + def add_custom_cluster_constraints(cls): + # Override default constraint from CustomClusterTestSuite + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_codec == 'none') + if cls.exploration_strategy() == 'core': cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('submission_delay_ms') == 0) cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('round_robin_submission')) - # The number of queries to submit. The test does not support fewer queries than - # MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some validation logic - # simple. - cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('num_queries', num_queries)) - def setup(self): # All threads are stored in this list and it's used just to make sure we clean up # properly in teardown. @@ -2009,30 +2029,32 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # the list). The individual operations on the list are atomic and thread-safe thanks # to the GIL. self.executing_threads = list() + # Exit event to break any sleep/wait. + self.exit = threading.Event() + + def quit(signum, _frame): + signame = signal.Signals(signum).name + LOG.fatal('Signal handler called with signal {} ({}): {}'.format( + signum, signame, _frame)) + self.exit.set() + + signal.signal(signal.SIGTERM, quit) + signal.signal(signal.SIGINT, quit) + signal.signal(signal.SIGHUP, quit) def teardown(self): # Set shutdown for all threads (cancel if needed) - for thread in self.all_threads: - try: - thread.lock.acquire() - thread.shutdown = True - if thread.query_handle is not None: - LOG.debug("Attempt to clean up thread executing query %s (state %s)", - thread.query_num, thread.query_state) - client = thread.impalad.service.create_beeswax_client() - try: - client.cancel(thread.query_handle) - finally: - client.close() - finally: - thread.lock.release() + self.exit.set() # Wait for all threads to exit for thread in self.all_threads: thread.join(5) - LOG.debug("Join thread for query num %s %s", thread.query_num, + LOG.info("Join thread for query num %s %s", thread.query_num, "TIMED OUT" if thread.isAlive() else "") + def should_run(self): + return not self.exit.is_set() + def get_ac_processes(self): """Returns a list of all Processes which may be used to perform admission control. If round-robin submission is not being used, only the first Process in this list will @@ -2089,7 +2111,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): log_metrics("wait_for_metric_changes, initial=", initial) current = initial start_time = time() - while True: + while self.should_run(): current = self.get_admission_metrics() log_metrics("wait_for_metric_changes, current=", current) deltas = compute_metric_deltas(current, initial) @@ -2117,7 +2139,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): REQUEST_QUEUE_UPDATE_INTERVAL)['count'] curr[impalad] = init[impalad] - while True: + while self.should_run(): LOG.debug("wait_for_statestore_updates: curr=%s, init=%s, d=%s", list(curr.values()), list(init.values()), [curr[i] - init[i] for i in self.impalads]) @@ -2142,7 +2164,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # All individual list operations are thread-safe, so we don't need to use a # lock to synchronize before checking the list length (on which another thread # may call append() concurrently). - while len(self.executing_threads) < num_threads: + while self.should_run() and len(self.executing_threads) < num_threads: assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for " "%s admitted client rpcs to return. Only %s executing " % ( STRESS_TIMEOUT, num_threads, len(self.executing_threads))) @@ -2162,14 +2184,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): for i in range(num_queries): # pop() is thread-safe, it's OK if another thread is appending concurrently. thread = self.executing_threads.pop(0) - LOG.info("Cancelling query {}".format(thread.query_num_and_id())) + LOG.info("Ending query {}".format(thread.query_num)) assert thread.query_state == 'ADMITTED' current_executing_queries.append(thread) thread.query_state = 'REQUEST_QUERY_END' # Wait for the queries to end start_time = time() - while True: + while self.should_run(): all_done = True for thread in self.all_threads: if thread.query_state == 'REQUEST_QUERY_END': @@ -2182,7 +2204,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): class SubmitQueryThread(threading.Thread): def __init__(self, impalad, additional_query_options, vector, query_num, - query_end_behavior, executing_threads): + query_end_behavior, executing_threads, exit_signal): """ executing_threads must be provided so that this thread can add itself when the query is admitted and begins execution. @@ -2195,69 +2217,74 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): self.query_end_behavior = query_end_behavior self.impalad = impalad self.error = None + self.num_rows_fetched = 0 # query_state is defined and used only by the test code, not a property exposed by # the server self.query_state = 'NOT_SUBMITTED' - - # lock protects query_handle and shutdown, used by the main thread in teardown() - self.lock = threading.RLock() - self.query_handle = None - self.shutdown = False # Set by the main thread when tearing down + # Set by the main thread when tearing down + self.exit_signal = exit_signal self.setDaemon(True) + def thread_should_run(self): + return not self.exit_signal.is_set() + def run(self): + # Scope of client and query_handle must be local within this run() method. client = None + query_handle = None try: try: - # Take the lock while query_handle is being created to avoid an unlikely race - # condition with teardown() (i.e. if an error occurs on the main thread), and - # check if the test is already shut down. - self.lock.acquire() - if self.shutdown: - self.print_termination_log() + if not self.thread_should_run(): return - exec_options = self.vector.get_value('exec_option') + exec_options = dict() + exec_options.update(self.vector.get_exec_option_dict()) exec_options.update(self.additional_query_options) # Turning off result spooling allows us to better control query execution by # controlling the number of rows fetched. This allows us to maintain resource # usage among backends. exec_options['spool_query_results'] = 0 + exec_options['long_polling_time_ms'] = 100 + if self.query_end_behavior == 'QUERY_TIMEOUT': + exec_options['query_timeout_s'] = QUERY_END_TIMEOUT_S query = QUERY.format(self.query_num) self.query_state = 'SUBMITTING' client = self.impalad.service.create_beeswax_client() ImpalaTestSuite.change_database(client, self.vector.get_value('table_format')) client.set_configuration(exec_options) - if self.query_end_behavior == 'QUERY_TIMEOUT': - client.execute("SET QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S)) - LOG.info("Submitting query %s with ending behavior %s", self.query_num, self.query_end_behavior) - self.query_handle = client.execute_async(query) - client.wait_for_admission_control(self.query_handle) - admission_result = client.get_admission_result(self.query_handle) + query_handle = client.execute_async(query) + admitted = client.wait_for_admission_control( + query_handle, timeout_s=ADMIT_TIMEOUT_S) + if not admitted: + msg = "Query {} failed to pass admission control within {} seconds".format( + self.query_num, ADMIT_TIMEOUT_S) + self.log_handle(client, query_handle, msg) + self.query_state = 'ADMIT_TIMEOUT' + return + admission_result = client.get_admission_result(query_handle) assert len(admission_result) > 0 if "Rejected" in admission_result: - LOG.info("Rejected query %s", self.query_num_and_id()) + msg = "Rejected query {}".format(self.query_num) + self.log_handle(client, query_handle, msg) self.query_state = 'REJECTED' - self.print_termination_log() - self.query_handle = None return elif "Timed out" in admission_result: - LOG.info("Query %s timed out", self.query_num_and_id()) + msg = "Query {} timed out".format(self.query_num) + self.log_handle(client, query_handle, msg) self.query_state = 'TIMED OUT' - self.print_termination_log() - self.query_handle = None return - LOG.info("Admission result for query %s : %s", - self.query_num_and_id(), admission_result) + msg = "Admission result for query {} : {}".format( + self.query_num, admission_result) + self.log_handle(client, query_handle, msg) except ImpalaBeeswaxException as e: LOG.exception(e) raise e - finally: - self.lock.release() - LOG.info("Admitted query %s", self.query_num_and_id()) + + msg = "Admitted query {}".format(self.query_num) + self.log_handle(client, query_handle, msg) self.query_state = 'ADMITTED' # The thread becomes visible to the main thread when it is added to the # shared list of executing_threads. append() is atomic and thread-safe. @@ -2265,69 +2292,70 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): # Synchronize with the main thread. At this point, the thread is executing a # query. It needs to wait until the main thread requests it to end its query. - while not self.shutdown and self.query_state != 'COMPLETED': + while self.thread_should_run() and self.query_state != 'COMPLETED': # The query needs to stay active until the main thread requests it to end. # Otherwise, the query may get cancelled early. Fetch 1 row every # FETCH_INTERVAL to keep the query active. - fetch_result = client.fetch(query, self.query_handle, 1) + fetch_result = client.fetch(query, query_handle, 1) assert len(fetch_result.data) == 1, str(fetch_result) + self.num_rows_fetched += len(fetch_result.data) if self.query_state == 'REQUEST_QUERY_END': - self._end_query(client, query) + self._end_query(client, query, query_handle) # The query has released admission control resources self.query_state = 'COMPLETED' - self.print_termination_log() - self.query_handle = None sleep(FETCH_INTERVAL) except Exception as e: LOG.exception(e) # Unknown errors will be raised later self.error = e self.query_state = 'ERROR' - self.print_termination_log() finally: + self.print_termination_log() if client is not None: - try: - self.lock.acquire() - client.close() - # Closing the client closes the query as well - self.query_handle = None - finally: - self.lock.release() + # Closing the client closes the query as well + client.close() def print_termination_log(self): - LOG.info("Thread for query {} terminating in state {}".format( - self.query_num, self.query_state)) - - def query_num_and_id(self): - return "{} (id={})".format(self.query_num, self.query_handle.get_handle().id) + msg = ("Thread for query {} terminating in state {}. " + "rows_fetched={} end_behavior={}").format( + self.query_num, self.query_state, self.num_rows_fetched, + self.query_end_behavior) + LOG.info(msg) - def _end_query(self, client, query): + def _end_query(self, client, query, query_handle): """Bring the query to the appropriate end state defined by self.query_end_behaviour. Returns once the query has reached that state.""" - LOG.info("Ending query {} by {}".format( - self.query_num_and_id(), self.query_end_behavior)) + msg = "Ending query {} by {}".format(self.query_num, self.query_end_behavior) + self.log_handle(client, query_handle, msg) if self.query_end_behavior == 'QUERY_TIMEOUT': # Sleep and wait for the query to be cancelled. The cancellation will # set the state to EXCEPTION. start_time = time() - while (client.get_state(self.query_handle) != client.QUERY_STATES['EXCEPTION']): + while self.thread_should_run() and ( + client.get_state(query_handle) != client.QUERY_STATES['EXCEPTION']): assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,) sleep(1) # try fetch and confirm from exception message that query was timed out. try: - client.fetch(query, self.query_handle) + client.fetch(query, query_handle) assert False except Exception as e: assert 'expired due to client inactivity' in str(e) elif self.query_end_behavior == 'EOS': # Fetch all rows so we hit eos. - client.fetch(query, self.query_handle) + client.fetch(query, query_handle) elif self.query_end_behavior == 'CLIENT_CANCEL': - client.cancel(self.query_handle) + client.cancel(query_handle) else: assert self.query_end_behavior == 'CLIENT_CLOSE' - client.close_query(self.query_handle) + client.close_query(query_handle) + + def log_handle(self, client, query_handle, msg): + """Log ourself here rather than using client.log_handle() to display + log timestamp.""" + handle_id = client.handle_id(query_handle) + LOG.info("{}: {}".format(handle_id, msg)) def _check_queries_page_resource_pools(self): """Checks that all queries in the '/queries' webpage json have the correct resource @@ -2360,7 +2388,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): start_time = time() LOG.info("Waiting for %s <= queued queries <= %s" % (min_queued, max_queued)) actual_queued = self._get_queries_page_num_queued() - while actual_queued < min_queued or actual_queued > max_queued: + while self.should_run() and ( + actual_queued < min_queued or actual_queued > max_queued): assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for " "%s <= queued queries <= %s, %s currently queued.", STRESS_TIMEOUT, min_queued, max_queued, actual_queued) @@ -2382,17 +2411,21 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): num_queries = vector.get_value('num_queries') assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES - initial_metrics = self.get_admission_metrics() + initial_metrics = self.get_consistent_admission_metrics(0) log_metrics("Initial metrics: ", initial_metrics) + # This is the query submission loop. for query_num in range(num_queries): + if not self.should_run(): + break + if submission_delay_ms > 0: + sleep(submission_delay_ms / 1000.0) impalad = self.impalads[query_num % len(self.impalads)] query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)] thread = self.SubmitQueryThread(impalad, additional_query_options, vector, - query_num, query_end_behavior, self.executing_threads) + query_num, query_end_behavior, self.executing_threads, self.exit) thread.start() self.all_threads.append(thread) - sleep(submission_delay_ms / 1000.0) # Wait for the admission control to make the initial admission decision for all # the queries. They should either be admitted immediately, queued, or rejected. @@ -2534,6 +2567,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): """Run a workload with a variety of outcomes in a pool that has user quotas configured. Note the user quotas will not prevent any queries from running, but this allows verification that metrics about users are consistent after queries end""" + if (not vector.get_value('round_robin_submission') + or not vector.get_value('submission_delay_ms') == 0): + # Save time by running only 1 out of 6 vector combination. + pytest.skip('Only run with round_robin_submission=True and submission_delay_ms=0.') self.pool_name = 'root.queueF' self.run_admission_test(vector, {'request_pool': self.pool_name}, check_user_aggregates=True)
