This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8c2017aa001b9c868017e9a29d89f9d257aa10bb
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Jan 14 16:46:56 2025 -0800

    IMPALA-12937: (part 2) Deflake TestAdmissionControllerStress
    
    TestAdmissionControllerStress::test_mem_limit is flaky again. One
    fragment instance that expected to stay alive until query submission
    loop ends actually finished early, even though clients are only fetching
    1 rows every 0.5 second. This patch attempts to address the flakiness in
    two ways.
    
    First, is lowering batch_size to 10. Lower batch size is expected to
    keep all running fragment instances runnning until the query admission
    loop finishes.
    
    Second, is lowering num_queries from 50 to 40 if exploration_strategy is
    exhaustive. This will shorten the query submission loop, expecially when
    submission_delay_ms is high (150 seconds). This is OK because, based on
    the assertions, the test framework will only retain at most 15 active
    queries and 10 in-queue queries once the query submission loop ends.
    
    This patch also refactors SubmitQueryThread. Set
    long_polling_time_ms=100 for all queries to get faster initial response.
    The lock is removed and replaced with threading.Event to signal the end
    of test. The thread client and query_handle scope is made local within
    run() method for proper cleanup. Set timeout for
    wait_for_admission_control instead of waiting indefinitely.
    
    impala_connection.py is refactored so that BeeswaxConnection has
    matching logging functionality as ImpylaHS2Connection. Changed
    ImpylaHS2Connection._collect_profile_and_log initialization for
    possibillity that experimental Calcite planner may have ability to pull
    query profile and log from Impala backend.
    
    Testing:
    - Run and pass test_mem_limit in both TestAdmissionControllerStress and
      TestAdmissionControllerStressWithACService in exhaustive exploration
      10 times.
    - Run and pass the whole TestAdmissionControllerStress and
      TestAdmissionControllerStressWithACService in exhaustive exploration.
    
    Change-Id: I706e3dedce69e38103a524c64306f39eac82fac3
    Reviewed-on: http://gerrit.cloudera.org:8080/22351
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/beeswax/impala_beeswax.py                   |  37 ++--
 tests/common/impala_connection.py                 |  76 +++++---
 tests/custom_cluster/test_admission_controller.py | 217 +++++++++++++---------
 3 files changed, 197 insertions(+), 133 deletions(-)

diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index d90dd7601..c33dcb2b6 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -44,6 +44,10 @@ from thrift.protocol import TBinaryProtocol
 from thrift.Thrift import TApplicationException
 
 LOG = logging.getLogger('impala_beeswax')
+# time to sleep in seconds before polling again. This uses a fixed
+# 50 millisecond sleep that doesn't vary by elapsed time. This is only used for
+# testing, and there is no reason to sleep longer for test environments.
+DEFAULT_SLEEP_INTERVAL = 0.05
 
 
 # Custom exception wrapper.
@@ -291,16 +295,9 @@ class ImpalaBeeswaxClient(object):
   def close_query(self, handle):
     self.__do_rpc(lambda: self.imp_service.close(handle))
 
-  def _get_sleep_interval(self, start_time):
-    """Returns the time to sleep in seconds before polling again. This uses a 
fixed
-       50 millisecond sleep that doesn't vary by elapsed time. This is only 
used for
-       testing, and there is no reason to sleep longer for test 
environments."""
-    return 0.05
-
   def wait_for_finished(self, query_handle):
     """Given a query handle, polls the coordinator waiting for the query to 
transition to
        'FINISHED' state"""
-    loop_start = time.time()
     while True:
       start_rpc_time = time.time()
       query_state = self.get_state(query_handle)
@@ -315,9 +312,8 @@ class ImpalaBeeswaxClient(object):
           raise ImpalaBeeswaxException(error_log, None)
         finally:
           self.close_query(query_handle)
-      sleep_time = self._get_sleep_interval(loop_start)
-      if rpc_time < sleep_time:
-        time.sleep(sleep_time - rpc_time)
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
 
   def wait_for_finished_timeout(self, query_handle, timeout=10):
     """Given a query handle and a timeout, polls the coordinator waiting for 
the query to
@@ -337,19 +333,24 @@ class ImpalaBeeswaxClient(object):
           raise ImpalaBeeswaxException(error_log, None)
         finally:
           self.close_query(query_handle)
-      sleep_time = self._get_sleep_interval(start_time)
-      if rpc_time < sleep_time:
-        time.sleep(sleep_time - rpc_time)
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
     return False
 
-  def wait_for_admission_control(self, query_handle):
+  def wait_for_admission_control(self, query_handle, timeout_s=60):
     """Given a query handle, polls the coordinator waiting for it to complete
-      admission control processing of the query"""
-    while True:
+      admission control processing of the query.
+      Return True if query pass admission control after given timeout_s."""
+    start_time = time.time()
+    while (time.time() - start_time < timeout_s):
+      start_rpc_time = time.time()
       query_state = self.get_state(query_handle)
+      rpc_time = time.time() - start_rpc_time
       if query_state > self.query_states["COMPILED"]:
-        break
-      time.sleep(0.05)
+        return True
+      if rpc_time < DEFAULT_SLEEP_INTERVAL:
+        time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
+    return False
 
   def get_admission_result(self, query_handle):
     """Given a query handle, returns the admission result from the query 
profile"""
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 0bdf2a8ed..bc81ebb76 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -166,6 +166,18 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     rows returned is less than max_rows, all the rows have been fetched."""
     pass
 
+  @abc.abstractmethod
+  def handle_id(self, operation_handle):
+    """Return a string id for given operation_handle.
+    Most implementations will return an Impala query id for given handle.
+    Otherwise, return str(operation_handle)."""
+    pass
+
+  @abc.abstractmethod
+  def log_handle(self, operation_handle, message):
+    """Log 'message' at INFO level, along with id of 'operation_handle'."""
+    pass
+
 
 # Represents a connection to Impala using the Beeswax API.
 class BeeswaxConnection(ImpalaConnection):
@@ -204,11 +216,11 @@ class BeeswaxConnection(ImpalaConnection):
     self.__beeswax_client.close_connection()
 
   def close_query(self, operation_handle):
-    LOG.info("-- closing query for operation handle: %s" % operation_handle)
+    self.log_handle(operation_handle, 'closing query for operation')
     self.__beeswax_client.close_query(operation_handle.get_handle())
 
   def close_dml(self, operation_handle):
-    LOG.info("-- closing DML query for operation handle: %s" % 
operation_handle)
+    self.log_handle(operation_handle, 'closing DML query')
     self.__beeswax_client.close_dml(operation_handle.get_handle())
 
   def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
@@ -224,48 +236,57 @@ class BeeswaxConnection(ImpalaConnection):
     return OperationHandle(beeswax_handle, sql_stmt)
 
   def cancel(self, operation_handle):
-    LOG.info("-- canceling operation: %s" % operation_handle)
+    self.log_handle(operation_handle, 'canceling operation')
     return self.__beeswax_client.cancel_query(operation_handle.get_handle())
 
   def get_state(self, operation_handle):
-    LOG.info("-- getting state for operation: %s" % operation_handle)
+    self.log_handle(operation_handle, 'getting state')
     return self.__beeswax_client.get_state(operation_handle.get_handle())
 
   def state_is_finished(self, operation_handle):
-    LOG.info("-- checking finished state for operation: 
{0}".format(operation_handle))
+    self.log_handle(operation_handle, 'checking finished state for operation')
     return self.get_state(operation_handle) == self.QUERY_STATES["FINISHED"]
 
   def get_exec_summary(self, operation_handle):
-    LOG.info("-- getting exec summary operation: %s" % operation_handle)
+    self.log_handle(operation_handle, 'getting exec summary operation')
     return 
self.__beeswax_client.get_exec_summary(operation_handle.get_handle())
 
   def get_runtime_profile(self, operation_handle):
-    LOG.info("-- getting runtime profile operation: %s" % operation_handle)
+    self.log_handle(operation_handle, 'getting runtime profile operation')
     return 
self.__beeswax_client.get_runtime_profile(operation_handle.get_handle())
 
   def wait_for_finished_timeout(self, operation_handle, timeout):
-    LOG.info("-- waiting for query to reach FINISHED state: %s" % 
operation_handle)
+    self.log_handle(operation_handle, 'waiting for query to reach FINISHED 
state')
     return self.__beeswax_client.wait_for_finished_timeout(
       operation_handle.get_handle(), timeout)
 
-  def wait_for_admission_control(self, operation_handle):
-    LOG.info("-- waiting for completion of the admission control processing of 
the "
-        "query: %s" % operation_handle)
-    return 
self.__beeswax_client.wait_for_admission_control(operation_handle.get_handle())
+  def wait_for_admission_control(self, operation_handle, timeout_s=60):
+    self.log_handle(operation_handle, 'waiting for completion of the admission 
control')
+    return self.__beeswax_client.wait_for_admission_control(
+      operation_handle.get_handle(), timeout_s=timeout_s)
 
   def get_admission_result(self, operation_handle):
-    LOG.info("-- getting the admission result: %s" % operation_handle)
+    self.log_handle(operation_handle, 'getting the admission result')
     return 
self.__beeswax_client.get_admission_result(operation_handle.get_handle())
 
   def get_log(self, operation_handle):
-    LOG.info("-- getting log for operation: %s" % operation_handle)
+    self.log_handle(operation_handle, 'getting log for operation')
     return 
self.__beeswax_client.get_log(operation_handle.get_handle().log_context)
 
   def fetch(self, sql_stmt, operation_handle, max_rows=-1):
-    LOG.info("-- fetching results from: %s" % operation_handle)
+    self.log_handle(operation_handle, 'fetching {} rows'.format(
+      'all' if max_rows < 0 else max_rows))
     return self.__beeswax_client.fetch_results(
         sql_stmt, operation_handle.get_handle(), max_rows)
 
+  def handle_id(self, operation_handle):
+    query_id = operation_handle.get_handle().id
+    return query_id if query_id else str(operation_handle)
+
+  def log_handle(self, operation_handle, message):
+    handle_id = self.handle_id(operation_handle)
+    LOG.info("-- {0}: {1}".format(handle_id, message))
+
 
 class ImpylaHS2Connection(ImpalaConnection):
   """Connection to Impala using the impyla client connecting to HS2 endpoint.
@@ -291,7 +312,9 @@ class ImpylaHS2Connection(ImpalaConnection):
     # Query options to send along with each query.
     self.__query_options = {}
     self._is_hive = is_hive
-    self._collect_profile_and_log = not is_hive and collect_profile_and_log
+    # Some Hive HS2 protocol, such as custom Calcite planner, may be able to 
collect
+    # profile and log from Impala.
+    self._collect_profile_and_log = collect_profile_and_log
 
   def set_configuration_option(self, name, value):
     self.__query_options[name] = str(value)
@@ -329,7 +352,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     try:
       # Explicitly close the cursor so that it will close the session.
       self.__cursor.close()
-    except Exception as e:
+    except Exception:
       # The session may no longer be valid if the impalad was restarted during 
the test.
       pass
     try:
@@ -354,6 +377,9 @@ class ImpylaHS2Connection(ImpalaConnection):
 
   def execute(self, sql_stmt, user=None, 
profile_format=TRuntimeProfileFormat.STRING,
       fetch_profile_after_close=False):
+    LOG.info("-- executing against {0} at {1}\n".format(
+      self._is_hive and 'Hive' or 'Impala', self.__host_port))
+    log_sql_stmt(sql_stmt)
     self.__cursor.execute(sql_stmt, configuration=self.__query_options)
     handle = OperationHandle(self.__cursor, sql_stmt)
 
@@ -412,17 +438,16 @@ class ImpylaHS2Connection(ImpalaConnection):
     lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
     return "{0}:{1}".format(hi_str, lo_str)
 
-  def handle_id_for_logging(self, operation_handle):
+  def handle_id(self, operation_handle):
     query_id = self.get_query_id(operation_handle)
     return query_id if query_id else str(operation_handle)
 
-  def log_handle(self, handle, message):
-    handle_id = self.handle_id_for_logging(handle)
+  def log_handle(self, operation_handle, message):
+    handle_id = self.handle_id(operation_handle)
     LOG.info("-- {0}: {1}".format(handle_id, message))
 
   def get_state(self, operation_handle):
-    handle_id = self.handle_id_for_logging(operation_handle)
-    LOG.info("-- getting state for operation: {0}".format(handle_id))
+    self.log_handle(operation_handle, 'getting state')
     cursor = operation_handle.get_handle()
     return cursor.status()
 
@@ -464,9 +489,10 @@ class ImpylaHS2Connection(ImpalaConnection):
              if not PROGRESS_LOG_RE.match(line)]
     return '\n'.join(lines)
 
-  def fetch(self, sql_stmt, handle, max_rows=-1):
-    self.log_handle(handle, 'fetching results')
-    return self.__fetch_results(handle, max_rows)
+  def fetch(self, sql_stmt, operation_handle, max_rows=-1):
+    self.log_handle(operation_handle, 'fetching {} rows'.format(
+      'all' if max_rows < 0 else max_rows))
+    return self.__fetch_results(operation_handle, max_rows)
 
   def __fetch_results(self, handle, max_rows=-1,
                       profile_format=TRuntimeProfileFormat.STRING):
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index c746cb221..71a204b6c 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -25,6 +25,7 @@ import os
 import pytest
 import re
 import shutil
+import signal
 import sys
 import threading
 from copy import copy
@@ -43,6 +44,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_dimensions import (
+    create_exec_option_dimension,
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 from tests.common.test_vector import ImpalaTestDimension
@@ -62,6 +64,7 @@ LOG = logging.getLogger('admission_test')
 # the query active and consuming resources by fetching one row at a time. The
 # where clause is for debugging purposes; each thread will insert its id so
 # that running queries can be correlated with the thread that submitted them.
+# This query returns 329970 rows.
 QUERY = " union all ".join(["select * from functional.alltypesagg where id != 
{0}"] * 30)
 
 SLOW_QUERY = "select count(*) from functional.alltypes where int_col = 
sleep(20000)"
@@ -124,6 +127,10 @@ QUERY_END_TIMEOUT_S = 3
 FETCH_INTERVAL = 0.5
 assert FETCH_INTERVAL < QUERY_END_TIMEOUT_S
 
+# How long to wait for admission control status. This assumes a worst case of 
40 queries
+# admitted serially, with a 3s inactivity timeout.
+ADMIT_TIMEOUT_S = 120
+
 # Value used for --admission_control_stale_topic_threshold_ms in tests.
 STALE_TOPIC_THRESHOLD_MS = 500
 
@@ -730,7 +737,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=2,
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 
* 1024))
-  def test_mem_limit_coordinators(self, vector, unique_database):
+  def test_mem_limit_coordinators(self, vector):
     """Verify that the query option mem_limit_coordinators is only enforced on 
the
     coordinators."""
     ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
@@ -1946,7 +1953,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin
   fashion. Each query is submitted on a separate thread. After admission, the 
query
-  thread will block with the query open and wait for the main thread to notify 
it to
+  thread will fetch rows slowly and wait for the main thread to notify it to
   end its query. The query thread can end its query by fetching to the end, 
cancelling
   itself, closing itself, or waiting for the query timeout to take effect. 
Depending
   on the test parameters a varying number of queries will be admitted, queued, 
and
@@ -1976,29 +1983,42 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
   @classmethod
   def add_test_dimensions(cls):
     super(TestAdmissionControllerStress, cls).add_test_dimensions()
+    # This test really need to run using beeswax client since hs2 client has 
not
+    # implemented some methods needed to query admission control status.
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 
'beeswax'))
+    # Slow down test query by setting low batch_size.
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+      cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[10]))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('submission_delay_ms', *SUBMISSION_DELAY_MS))
 
-    # Additional constraints for code coverage jobs and core.
-    num_queries = 50
+    # The number of queries to submit. The test does not support fewer queries 
than
+    # MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some 
validation logic
+    # simple.
+    num_queries = 40
     if ImpalaTestClusterProperties.get_instance().has_code_coverage():
       # Code coverage builds can't handle the increased concurrency.
       num_queries = 15
     elif cls.exploration_strategy() == 'core':
       num_queries = 30
+    assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('num_queries', num_queries))
+
+  @classmethod
+  def add_custom_cluster_constraints(cls):
+    # Override default constraint from CustomClusterTestSuite
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec == 'none')
+    if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(
           lambda v: v.get_value('submission_delay_ms') == 0)
       cls.ImpalaTestMatrix.add_constraint(
           lambda v: v.get_value('round_robin_submission'))
 
-    # The number of queries to submit. The test does not support fewer queries 
than
-    # MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some 
validation logic
-    # simple.
-    cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('num_queries', num_queries))
-
   def setup(self):
     # All threads are stored in this list and it's used just to make sure we 
clean up
     # properly in teardown.
@@ -2009,30 +2029,32 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     # the list). The individual operations on the list are atomic and 
thread-safe thanks
     # to the GIL.
     self.executing_threads = list()
+    # Exit event to break any sleep/wait.
+    self.exit = threading.Event()
+
+    def quit(signum, _frame):
+      signame = signal.Signals(signum).name
+      LOG.fatal('Signal handler called with signal {} ({}): {}'.format(
+        signum, signame, _frame))
+      self.exit.set()
+
+    signal.signal(signal.SIGTERM, quit)
+    signal.signal(signal.SIGINT, quit)
+    signal.signal(signal.SIGHUP, quit)
 
   def teardown(self):
     # Set shutdown for all threads (cancel if needed)
-    for thread in self.all_threads:
-      try:
-        thread.lock.acquire()
-        thread.shutdown = True
-        if thread.query_handle is not None:
-          LOG.debug("Attempt to clean up thread executing query %s (state %s)",
-              thread.query_num, thread.query_state)
-          client = thread.impalad.service.create_beeswax_client()
-          try:
-            client.cancel(thread.query_handle)
-          finally:
-            client.close()
-      finally:
-        thread.lock.release()
+    self.exit.set()
 
     # Wait for all threads to exit
     for thread in self.all_threads:
       thread.join(5)
-      LOG.debug("Join thread for query num %s %s", thread.query_num,
+      LOG.info("Join thread for query num %s %s", thread.query_num,
           "TIMED OUT" if thread.isAlive() else "")
 
+  def should_run(self):
+    return not self.exit.is_set()
+
   def get_ac_processes(self):
     """Returns a list of all Processes which may be used to perform admission 
control. If
     round-robin submission is not being used, only the first Process in this 
list will
@@ -2089,7 +2111,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     log_metrics("wait_for_metric_changes, initial=", initial)
     current = initial
     start_time = time()
-    while True:
+    while self.should_run():
       current = self.get_admission_metrics()
       log_metrics("wait_for_metric_changes, current=", current)
       deltas = compute_metric_deltas(current, initial)
@@ -2117,7 +2139,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       curr[impalad] = init[impalad]
 
-    while True:
+    while self.should_run():
       LOG.debug("wait_for_statestore_updates: curr=%s, init=%s, d=%s",
           list(curr.values()), list(init.values()),
           [curr[i] - init[i] for i in self.impalads])
@@ -2142,7 +2164,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     # All individual list operations are thread-safe, so we don't need to use a
     # lock to synchronize before checking the list length (on which another 
thread
     # may call append() concurrently).
-    while len(self.executing_threads) < num_threads:
+    while self.should_run() and len(self.executing_threads) < num_threads:
       assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s 
seconds for "
           "%s admitted client rpcs to return. Only %s executing " % (
             STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
@@ -2162,14 +2184,14 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     for i in range(num_queries):
       # pop() is thread-safe, it's OK if another thread is appending 
concurrently.
       thread = self.executing_threads.pop(0)
-      LOG.info("Cancelling query {}".format(thread.query_num_and_id()))
+      LOG.info("Ending query {}".format(thread.query_num))
       assert thread.query_state == 'ADMITTED'
       current_executing_queries.append(thread)
       thread.query_state = 'REQUEST_QUERY_END'
 
     # Wait for the queries to end
     start_time = time()
-    while True:
+    while self.should_run():
       all_done = True
       for thread in self.all_threads:
         if thread.query_state == 'REQUEST_QUERY_END':
@@ -2182,7 +2204,7 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
 
   class SubmitQueryThread(threading.Thread):
     def __init__(self, impalad, additional_query_options, vector, query_num,
-        query_end_behavior, executing_threads):
+        query_end_behavior, executing_threads, exit_signal):
       """
       executing_threads must be provided so that this thread can add itself 
when the
       query is admitted and begins execution.
@@ -2195,69 +2217,74 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       self.query_end_behavior = query_end_behavior
       self.impalad = impalad
       self.error = None
+      self.num_rows_fetched = 0
       # query_state is defined and used only by the test code, not a property 
exposed by
       # the server
       self.query_state = 'NOT_SUBMITTED'
-
-      # lock protects query_handle and shutdown, used by the main thread in 
teardown()
-      self.lock = threading.RLock()
-      self.query_handle = None
-      self.shutdown = False  # Set by the main thread when tearing down
+      # Set by the main thread when tearing down
+      self.exit_signal = exit_signal
       self.setDaemon(True)
 
+    def thread_should_run(self):
+      return not self.exit_signal.is_set()
+
     def run(self):
+      # Scope of client and query_handle must be local within this run() 
method.
       client = None
+      query_handle = None
       try:
         try:
-          # Take the lock while query_handle is being created to avoid an 
unlikely race
-          # condition with teardown() (i.e. if an error occurs on the main 
thread), and
-          # check if the test is already shut down.
-          self.lock.acquire()
-          if self.shutdown:
-            self.print_termination_log()
+          if not self.thread_should_run():
             return
 
-          exec_options = self.vector.get_value('exec_option')
+          exec_options = dict()
+          exec_options.update(self.vector.get_exec_option_dict())
           exec_options.update(self.additional_query_options)
           # Turning off result spooling allows us to better control query 
execution by
           # controlling the number of rows fetched. This allows us to maintain 
resource
           # usage among backends.
           exec_options['spool_query_results'] = 0
+          exec_options['long_polling_time_ms'] = 100
+          if self.query_end_behavior == 'QUERY_TIMEOUT':
+            exec_options['query_timeout_s'] = QUERY_END_TIMEOUT_S
           query = QUERY.format(self.query_num)
           self.query_state = 'SUBMITTING'
           client = self.impalad.service.create_beeswax_client()
           ImpalaTestSuite.change_database(client, 
self.vector.get_value('table_format'))
           client.set_configuration(exec_options)
 
-          if self.query_end_behavior == 'QUERY_TIMEOUT':
-            client.execute("SET 
QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S))
-
           LOG.info("Submitting query %s with ending behavior %s",
                    self.query_num, self.query_end_behavior)
-          self.query_handle = client.execute_async(query)
-          client.wait_for_admission_control(self.query_handle)
-          admission_result = client.get_admission_result(self.query_handle)
+          query_handle = client.execute_async(query)
+          admitted = client.wait_for_admission_control(
+            query_handle, timeout_s=ADMIT_TIMEOUT_S)
+          if not admitted:
+            msg = "Query {} failed to pass admission control within {} 
seconds".format(
+                self.query_num, ADMIT_TIMEOUT_S)
+            self.log_handle(client, query_handle, msg)
+            self.query_state = 'ADMIT_TIMEOUT'
+            return
+          admission_result = client.get_admission_result(query_handle)
           assert len(admission_result) > 0
           if "Rejected" in admission_result:
-            LOG.info("Rejected query %s", self.query_num_and_id())
+            msg = "Rejected query {}".format(self.query_num)
+            self.log_handle(client, query_handle, msg)
             self.query_state = 'REJECTED'
-            self.print_termination_log()
-            self.query_handle = None
             return
           elif "Timed out" in admission_result:
-            LOG.info("Query %s timed out", self.query_num_and_id())
+            msg = "Query {} timed out".format(self.query_num)
+            self.log_handle(client, query_handle, msg)
             self.query_state = 'TIMED OUT'
-            self.print_termination_log()
-            self.query_handle = None
             return
-          LOG.info("Admission result for query %s : %s",
-                   self.query_num_and_id(), admission_result)
+          msg = "Admission result for query {} : {}".format(
+            self.query_num, admission_result)
+          self.log_handle(client, query_handle, msg)
         except ImpalaBeeswaxException as e:
           LOG.exception(e)
           raise e
-        finally:
-          self.lock.release()
-        LOG.info("Admitted query %s", self.query_num_and_id())
+
+        msg = "Admitted query {}".format(self.query_num)
+        self.log_handle(client, query_handle, msg)
         self.query_state = 'ADMITTED'
         # The thread becomes visible to the main thread when it is added to the
         # shared list of executing_threads. append() is atomic and thread-safe.
@@ -2265,69 +2292,70 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
 
         # Synchronize with the main thread. At this point, the thread is 
executing a
         # query. It needs to wait until the main thread requests it to end its 
query.
-        while not self.shutdown and self.query_state != 'COMPLETED':
+        while self.thread_should_run() and self.query_state != 'COMPLETED':
           # The query needs to stay active until the main thread requests it 
to end.
           # Otherwise, the query may get cancelled early. Fetch 1 row every
           # FETCH_INTERVAL to keep the query active.
-          fetch_result = client.fetch(query, self.query_handle, 1)
+          fetch_result = client.fetch(query, query_handle, 1)
           assert len(fetch_result.data) == 1, str(fetch_result)
+          self.num_rows_fetched += len(fetch_result.data)
           if self.query_state == 'REQUEST_QUERY_END':
-            self._end_query(client, query)
+            self._end_query(client, query, query_handle)
             # The query has released admission control resources
             self.query_state = 'COMPLETED'
-            self.print_termination_log()
-            self.query_handle = None
           sleep(FETCH_INTERVAL)
       except Exception as e:
         LOG.exception(e)
         # Unknown errors will be raised later
         self.error = e
         self.query_state = 'ERROR'
-        self.print_termination_log()
       finally:
+        self.print_termination_log()
         if client is not None:
-          try:
-            self.lock.acquire()
-            client.close()
-            # Closing the client closes the query as well
-            self.query_handle = None
-          finally:
-            self.lock.release()
+          # Closing the client closes the query as well
+          client.close()
 
     def print_termination_log(self):
-      LOG.info("Thread for query {} terminating in state {}".format(
-        self.query_num, self.query_state))
-
-    def query_num_and_id(self):
-      return "{} (id={})".format(self.query_num, 
self.query_handle.get_handle().id)
+      msg = ("Thread for query {} terminating in state {}. "
+             "rows_fetched={} end_behavior={}").format(
+          self.query_num, self.query_state, self.num_rows_fetched,
+          self.query_end_behavior)
+      LOG.info(msg)
 
-    def _end_query(self, client, query):
+    def _end_query(self, client, query, query_handle):
       """Bring the query to the appropriate end state defined by 
self.query_end_behaviour.
       Returns once the query has reached that state."""
-      LOG.info("Ending query {} by {}".format(
-        self.query_num_and_id(), self.query_end_behavior))
+      msg = "Ending query {} by {}".format(self.query_num, 
self.query_end_behavior)
+      self.log_handle(client, query_handle, msg)
       if self.query_end_behavior == 'QUERY_TIMEOUT':
         # Sleep and wait for the query to be cancelled. The cancellation will
         # set the state to EXCEPTION.
         start_time = time()
-        while (client.get_state(self.query_handle) != 
client.QUERY_STATES['EXCEPTION']):
+        while self.thread_should_run() and (
+          client.get_state(query_handle) != client.QUERY_STATES['EXCEPTION']):
           assert (time() - start_time < STRESS_TIMEOUT),\
             "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
           sleep(1)
         # try fetch and confirm from exception message that query was timed 
out.
         try:
-          client.fetch(query, self.query_handle)
+          client.fetch(query, query_handle)
           assert False
         except Exception as e:
           assert 'expired due to client inactivity' in str(e)
       elif self.query_end_behavior == 'EOS':
         # Fetch all rows so we hit eos.
-        client.fetch(query, self.query_handle)
+        client.fetch(query, query_handle)
       elif self.query_end_behavior == 'CLIENT_CANCEL':
-        client.cancel(self.query_handle)
+        client.cancel(query_handle)
       else:
         assert self.query_end_behavior == 'CLIENT_CLOSE'
-        client.close_query(self.query_handle)
+        client.close_query(query_handle)
+
+    def log_handle(self, client, query_handle, msg):
+      """Log ourself here rather than using client.log_handle() to display
+      log timestamp."""
+      handle_id = client.handle_id(query_handle)
+      LOG.info("{}: {}".format(handle_id, msg))
 
   def _check_queries_page_resource_pools(self):
     """Checks that all queries in the '/queries' webpage json have the correct 
resource
@@ -2360,7 +2388,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     start_time = time()
     LOG.info("Waiting for %s <= queued queries <= %s" % (min_queued, 
max_queued))
     actual_queued = self._get_queries_page_num_queued()
-    while actual_queued < min_queued or actual_queued > max_queued:
+    while self.should_run() and (
+      actual_queued < min_queued or actual_queued > max_queued):
       assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s 
seconds for "
           "%s <= queued queries <= %s, %s currently queued.",
             STRESS_TIMEOUT, min_queued, max_queued, actual_queued)
@@ -2382,17 +2411,21 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
 
     num_queries = vector.get_value('num_queries')
     assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
-    initial_metrics = self.get_admission_metrics()
+    initial_metrics = self.get_consistent_admission_metrics(0)
     log_metrics("Initial metrics: ", initial_metrics)
 
+    # This is the query submission loop.
     for query_num in range(num_queries):
+      if not self.should_run():
+        break
+      if submission_delay_ms > 0:
+        sleep(submission_delay_ms / 1000.0)
       impalad = self.impalads[query_num % len(self.impalads)]
       query_end_behavior = QUERY_END_BEHAVIORS[query_num % 
len(QUERY_END_BEHAVIORS)]
       thread = self.SubmitQueryThread(impalad, additional_query_options, 
vector,
-          query_num, query_end_behavior, self.executing_threads)
+          query_num, query_end_behavior, self.executing_threads, self.exit)
       thread.start()
       self.all_threads.append(thread)
-      sleep(submission_delay_ms / 1000.0)
 
     # Wait for the admission control to make the initial admission decision 
for all
     # the queries. They should either be admitted immediately, queued, or 
rejected.
@@ -2534,6 +2567,10 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
     """Run a workload with a variety of outcomes in a pool that has user quotas
     configured. Note the user quotas will not prevent any queries from 
running, but this
     allows verification that metrics about users are consistent after queries 
end"""
+    if (not vector.get_value('round_robin_submission')
+        or not vector.get_value('submission_delay_ms') == 0):
+      # Save time by running only 1 out of 6 vector combination.
+      pytest.skip('Only run with round_robin_submission=True and 
submission_delay_ms=0.')
     self.pool_name = 'root.queueF'
     self.run_admission_test(vector, {'request_pool': self.pool_name},
                             check_user_aggregates=True)


Reply via email to