This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 83734a1220bf99261779bbc5037708a7e1d78df0
Author: jasonmfehr <[email protected]>
AuthorDate: Thu Mar 28 10:43:03 2024 -0700

    IMPALA-12944: Fixes Workload Management Test Flakiness
    
    The custom cluster workload management tests are flaky
    because the tests can actually run before the completed
    queries table has been fully created by the Impala startup
    process. The table create sql runs asynchronously during
    startup and thus can take longer to finish than the custom
    cluster tests take to execute.
    
    This change adds checks at the beginning of each test to
    ensure the completed queries table sql has finished before
    any of the test code runs.
    
    Change-Id: I428702a210e024db95808dc2518da497426922f8
    Reviewed-on: http://gerrit.cloudera.org:8080/21221
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/impala_test_suite.py      | 16 ++++++++---
 tests/custom_cluster/test_query_log.py | 49 +++++++++++++++++++++-------------
 2 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 72b0e1fa3..40020e3d0 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1267,14 +1267,16 @@ class ImpalaTestSuite(BaseTestSuite):
     """
     Convenience wrapper around assert_log_contains for impalad logs.
     """
-    self.assert_log_contains("impalad", level, line_regex, expected_count, 
timeout_s)
+    return self.assert_log_contains(
+        "impalad", level, line_regex, expected_count, timeout_s)
 
   def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
       timeout_s=6):
     """
     Convenience wrapper around assert_log_contains for catalogd logs.
     """
-    self.assert_log_contains("catalogd", level, line_regex, expected_count, 
timeout_s)
+    return self.assert_log_contains(
+        "catalogd", level, line_regex, expected_count, timeout_s)
 
   def assert_log_contains(self, daemon, level, line_regex, expected_count=1, 
timeout_s=6):
     """
@@ -1287,6 +1289,9 @@ class ImpalaTestSuite(BaseTestSuite):
     make sure that log buffering has been disabled, for example by adding
     '-logbuflevel=-1' to the daemon startup options or set timeout_s to a 
value higher
     than the log flush interval.
+
+    Returns the result of the very last call to line_regex.search or None if
+    expected_count is 0 or the line_regex did not match any lines.
     """
     pattern = re.compile(line_regex)
     start_time = time.time()
@@ -1300,10 +1305,13 @@ class ImpalaTestSuite(BaseTestSuite):
         log_file_path = os.path.join(log_dir, daemon + "." + level)
         # Resolve symlinks to make finding the file easier.
         log_file_path = os.path.realpath(log_file_path)
+        last_re_result = None
         with open(log_file_path) as log_file:
           for line in log_file:
-            if pattern.search(line):
+            re_result = pattern.search(line)
+            if re_result:
               found += 1
+              last_re_result = re_result
         if expected_count == -1:
           assert found > 0, "Expected at least one line in file %s matching 
regex '%s'"\
             ", but found none." % (log_file_path, line_regex)
@@ -1312,7 +1320,7 @@ class ImpalaTestSuite(BaseTestSuite):
             "Expected %d lines in file %s matching regex '%s', but found %d 
lines. "\
             "Last line was: \n%s" %\
             (expected_count, log_file_path, line_regex, found, line)
-        return
+        return last_re_result
       except AssertionError as e:
         # Re-throw the exception to the caller only when the timeout is 
expired. Otherwise
         # sleep before retrying.
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index e2f9a08fb..e02993217 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -56,9 +56,25 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
         cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
 
-  def get_client(self, protocol):
+  def get_client(self, protocol, query_table_name=""):
     """Retrieves the default Impala client for the specified protocol. This 
client is
-       automatically closed after the test completes."""
+       automatically closed after the test completes. Also ensures the 
completed queries
+       table has been successfully created by checking the logs to verify the 
create
+       table sql has finished."""
+    if query_table_name == "":
+      query_table_name = self.QUERY_TBL
+
+    # These tests run very quickly and can actually complete before Impala has 
finished
+    # creating the completed queries table. Thus, to make these tests more 
robust, this
+    # code checks to make sure the table create has finished before returning.
+    create_re = r'\]\s+(\w+:\w+)\]\s+Analyzing query: CREATE TABLE IF NOT 
EXISTS {}' \
+        .format(query_table_name)
+    create_match = self.assert_impalad_log_contains("INFO", create_re)
+
+    finish_re = r'Query successfully unregistered: query_id={}' \
+        .format(create_match.group(1))
+    self.assert_impalad_log_contains("INFO", finish_re)
+
     if protocol == self.PROTOCOL_BEESWAX:
       return self.client
     elif protocol == self.PROTOCOL_HS2:
@@ -241,9 +257,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=5 "
                                                  "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
-                                                 "--log_dir={0}"
-                                                 .format(LOG_DIR_MAX_WRITES),
+                                                 "--shutdown_deadline_s=60 ",
+                                    impala_log_dir=LOG_DIR_MAX_WRITES,
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
   def test_query_log_max_attempts_exceeded(self, vector):
@@ -365,13 +380,14 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   def test_query_log_table_different_table(self, vector):
     """Asserts that the completed queries table can be renamed."""
 
-    client = self.get_client(vector.get_value('protocol'))
+    client = self.get_client(vector.get_value('protocol'),
+        "{}.{}".format(self.WM_DB, self.OTHER_TBL))
 
     try:
       res = client.execute("show tables in {0}".format(self.WM_DB))
       assert res.success
       assert len(res.data) > 0, "could not find any tables in database {0}" \
-          .format(self.DB)
+          .format(self.WM_DB)
 
       tbl_found = False
       for tbl in res.data:
@@ -379,7 +395,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
           tbl_found = True
           break
       assert tbl_found, "could not find table '{0}' in database '{1}'" \
-          .format(self.OTHER_TBL, self.DB)
+          .format(self.OTHER_TBL, self.WM_DB)
     finally:
       client.execute("drop table {0}.{1} purge".format(self.WM_DB, 
self.OTHER_TBL))
 
@@ -470,6 +486,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
        these operations have a type of unknown and a normally invalid sql 
syntax. This
        test asserts those queries are not written to the completed queries 
table since
        they are trivial."""
+    client = self.get_client(vector.get_value('protocol'))
+
     host, port = IMPALAD_HS2_HOST_PORT.split(":")
     socket = TSocket(host, port)
     transport = TBufferedTransport(socket)
@@ -568,9 +586,6 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     finally:
       socket.close()
 
-    # Assert none of the queries were written to the completed queries table.
-    client = self.create_impala_client(protocol=vector.get_value('protocol'))
-
     # Execute a general query and wait for it to appear in the completed 
queries table to
     # ensure there are no false positives caused by the assertion query 
executing before
     # Impala has a chance to write queued queries to the completed queries 
table.
@@ -581,13 +596,11 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     # Force Impala to process the inserts to the completed queries table.
     client.execute("refresh {}".format(self.QUERY_TBL))
 
-    try:
-      assert_results = client.execute("select count(*) from {} where 
cluster_id='{}'"
-          .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID))
-      assert assert_results.success
-      assert assert_results.data[0] == "1"
-    finally:
-      client.close()
+    # Assert only the one expected query was written to the completed queries 
table.
+    assert_results = client.execute("select count(*) from {} where 
cluster_id='{}'"
+        .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID))
+    assert assert_results.success
+    assert assert_results.data[0] == "1"
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "

Reply via email to