This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new cb35dc876 IMPALA-13536: Workload Management Tests Failing on Init Check
cb35dc876 is described below

commit cb35dc87690b21a74acc7af599abd492db8cb52f
Author: jasonmfehr <[email protected]>
AuthorDate: Mon Nov 25 13:42:59 2024 -0800

    IMPALA-13536: Workload Management Tests Failing on Init Check
    
    Most of the workload management tests verify that the workload
    management process has successfully completed. Part of this
    verification ensures a catalog update has propagated the workload
    management changes to the coordinators by determining the catalog
    version, from the catalogd logs, that contains the workload
    management table changes and ensuring that version is in the
    coordinator logs.
    
    The test flakiness occurs when multiple catalogd versions are
    combined into a later version. Specifically, tests were failing
    because the coordinator logs were checked for catalog version X but
    the actual version in the coordinator logs was X+1.
    
    The fix for the test flakiness is to allow for the exepected catalog
    version or any later version.
    
    Change-Id: I9f20a149ab1f45ee3506f098f8594965a24a89d3
    Reviewed-on: http://gerrit.cloudera.org:8080/22200
    Reviewed-by: Jason Fehr <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/custom_cluster_test_suite.py | 16 ++++++---
 tests/custom_cluster/test_query_live.py   | 30 ++++++++++------
 tests/custom_cluster/test_query_log.py    | 57 ++++++++++++++++++++-----------
 3 files changed, 70 insertions(+), 33 deletions(-)

diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 582b2d211..c445d37f4 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -36,6 +36,7 @@ from tests.common.file_utils import cleanup_tmp_test_dir, 
make_tmp_test_dir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.util.filesystem_utils import IS_LOCAL
+from tests.util.retry import retry
 from time import sleep
 
 LOG = logging.getLogger(__name__)
@@ -385,10 +386,17 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     self.assert_catalogd_log_contains("INFO", r'Completed workload management '
         r'initialization', timeout_s=timeout_s)
 
-    ret = self.assert_catalogd_log_contains("INFO", r'A catalog update with 
\d+ entries '
-        r'is assembled. Catalog version: (\d+)', timeout_s=10, 
expected_count=-1)
-    self.assert_impalad_log_contains("INFO", r'Catalog topic update applied 
with '
-        r'version: {}'.format(ret.group(1)), timeout_s=30)
+    catalog_log = self.assert_catalogd_log_contains("INFO", r'A catalog update 
with \d+ '
+        r'entries is assembled. Catalog version: (\d+)', timeout_s=10, 
expected_count=-1)
+
+    def assert_func(last_iteration):
+      coord_log = self.assert_impalad_log_contains("INFO", r'Catalog topic 
update '
+          r'applied with version: (\d+)', timeout_s=5, expected_count=-1)
+      return int(coord_log.group(1)) >= int(catalog_log.group(1))
+
+    assert retry(func=assert_func, max_attempts=10, sleep_time_s=3, 
backoff=1), \
+        "Expected a catalog topic update with version '{}' or later, but no 
such " \
+        "update was found.".format(catalog_log.group(1))
 
   @classmethod
   def _stop_impala_cluster(cls):
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 17fe3e3e0..134cfb9a1 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -78,7 +78,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_query_live(self):
     """Asserts the query live table shows and allows filtering queries."""
     # Use a query that reads data from disk for the 1st one, as more 
representative and a
@@ -178,7 +179,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                     catalogd_args="--enable_workload_mgmt "
                                                   
"--catalog_topic_mode=minimal",
                                     default_query_options=[
-                                      ('default_transactional_type', 
'insert_only')])
+                                      ('default_transactional_type', 
'insert_only')],
+                                    disable_log_buffering=True)
   def test_default_transactional(self):
     """Asserts the query live table works when impala is started with
     default_transactional_type=insert_only."""
@@ -192,7 +194,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                                  "--cluster_id=test_query_live 
"
                                                  "--use_local_catalog=true",
                                     catalogd_args="--enable_workload_mgmt "
-                                                  
"--catalog_topic_mode=minimal")
+                                                  
"--catalog_topic_mode=minimal",
+                                    disable_log_buffering=True)
   def test_local_catalog(self):
     """Asserts the query live table works with local catalog mode."""
     result = self.client.execute("select * from functional.alltypes",
@@ -240,7 +243,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=3,
-                                    num_exclusive_coordinators=2)
+                                    num_exclusive_coordinators=2,
+                                    disable_log_buffering=True)
   def test_dedicated_coordinators(self):
     """Asserts scans are performed only on coordinators."""
     # Use a query that reads data from disk for the 1st one, as more 
representative and a
@@ -273,7 +277,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=3,
-                                    num_exclusive_coordinators=2)
+                                    num_exclusive_coordinators=2,
+                                    disable_log_buffering=True)
   def test_executor_groups(self):
     """Asserts scans are performed only on coordinators with multiple executor 
groups."""
     # Add a (non-dedicated) coordinator and executor in a different executor 
group.
@@ -291,7 +296,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_query_entries_are_unique(self):
     """Asserts queries in the query live table are unique."""
     # Start a query and close it with a delay between CloseClientRequestState 
and
@@ -357,7 +363,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_shutdown_coordinator(self):
     """Asserts query fails if a coordinator disappears after scheduling. 
Depends on
     test config of statestore_heartbeat_frequency_ms=50."""
@@ -381,7 +388,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt")
+                                    catalogd_args="--enable_workload_mgmt",
+                                    disable_log_buffering=True)
   def test_graceful_shutdown_coordinator(self):
     """Asserts query succeeds if another coordinator is shutdown gracefully 
after
     scheduling. Depends on test config of 
statestore_heartbeat_frequency_ms=50."""
@@ -415,7 +423,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
                                     cluster_size=3,
-                                    num_exclusive_coordinators=2)
+                                    num_exclusive_coordinators=2,
+                                    disable_log_buffering=True)
   def test_multi_table_union(self):
     """Asserts only system table scan fragments are scheduled to 
coordinators."""
     utc_timestamp = self.execute_query('select utc_timestamp()')
@@ -449,7 +458,8 @@ class TestQueryLive(CustomClusterTestSuite):
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=3,
-                                    num_exclusive_coordinators=2)
+                                    num_exclusive_coordinators=2,
+                                    disable_log_buffering=True)
   def test_multi_table_join(self, unique_database):
     """Asserts only system table scan fragments are scheduled to 
coordinators."""
     self.execute_query('create table {}.users (user 
string)'.format(unique_database))
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 1b56c928b..b42e7f52a 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -91,7 +91,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                                  
"--query_log_max_plan_length={0}"
                                                  .format(MAX_SQL_PLAN_LEN),
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_lower_max_sql_plan(self, vector):
     """Asserts that length limits on the sql and plan columns in the completed 
queries
        table are respected."""
@@ -127,7 +128,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_max_select",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_sql_plan_too_long(self, vector):
     """Asserts that very long queries have their corresponding plan and sql 
columns
        shortened in the completed queries table."""
@@ -164,7 +166,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                                  "--query_log_size=0 "
                                                  "--query_log_size_in_bytes=0",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_no_query_log(self, vector):
     """Asserts queries are written to the completed queries table when the 
in-memory
        query log queue is turned off."""
@@ -195,7 +198,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
                                     cluster_size=1,
-                                    tmp_dir_placeholders=['query_data_cache'])
+                                    tmp_dir_placeholders=['query_data_cache'],
+                                    disable_log_buffering=True)
   def test_query_data_cache(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. Specifically focuses on the data cache metrics."""
@@ -237,7 +241,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     impala_log_dir=("{" + LOG_DIR_MAX_WRITES + 
"}"),
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
-                                    tmp_dir_placeholders=[LOG_DIR_MAX_WRITES])
+                                    tmp_dir_placeholders=[LOG_DIR_MAX_WRITES],
+                                    disable_log_buffering=True)
   def test_max_attempts_exceeded(self, vector):
     """Asserts that completed queries are only attempted 3 times to be 
inserted into the
        completed queries table. This test deletes the completed queries table 
thus it must
@@ -290,7 +295,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     catalogd_args="--enable_workload_mgmt",
                                     default_query_options=[
                                       ('statement_expression_limit', 1024)],
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_flush_on_queued_count_exceeded(self, vector):
     """Asserts that queries that have completed are written to the query log 
table when
        the maximum number of queued records is reached. Also verifies that 
writing
@@ -352,7 +358,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_dedicated_coordinator_no_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile when dedicated coordinators are used."""
@@ -379,7 +386,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_dedicated_coordinator_with_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile when dedicated coordinators are used along with an MT_DOP 
setting
@@ -426,7 +434,8 @@ class TestQueryLogOtherTable(TestQueryLogTableBase):
                                                   
"--blacklisted_dbs=information_schema "
                                                   "--query_log_table_name={0}"
                                                   .format(OTHER_TBL),
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_renamed_log_table(self, vector):
     """Asserts that the completed queries table can be renamed."""
 
@@ -467,7 +476,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                                  
.format(HS2_OPERATIONS_CLUSTER_ID),
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=2,
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_hs2_metadata_operations(self, vector):
     """Certain HS2 operations appear to Impala as a special kind of query. 
Specifically,
        these operations have a type of unknown and a normally invalid sql 
syntax. This
@@ -594,7 +604,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                                  
"--cluster_id=test_query_hist_mult",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=2,
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_query_multiple_tables(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a query that reads from multiple tables."""
@@ -622,7 +633,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_3",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_insert_select(self, vector, unique_database,
       unique_name):
     """Asserts the values written to the query log table match the values from 
the
@@ -654,7 +666,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=15",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_flush_on_interval(self, vector):
     """Asserts that queries that have completed are written to the query log 
table
        after the specified write interval elapses."""
@@ -767,7 +780,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_2",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_ddl(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a DDL query."""
@@ -793,7 +807,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_3",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_dml(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a DML query."""
@@ -826,7 +841,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_2",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_invalid_query(self, vector):
     """Asserts correct values are written to the completed queries table for a 
failed
        query. The query profile is used as the source of expected values."""
@@ -857,7 +873,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_ignored_sqls_not_written(self, vector):
     """Asserts that expected queries are not written to the query log table."""
     client = self.get_client(vector.get_value('protocol'))
@@ -949,7 +966,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1",
                                     catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True)
+                                    impalad_graceful_shutdown=True,
+                                    disable_log_buffering=True)
   def test_sql_injection_attempts(self, vector):
     client = self.get_client(vector.get_value('protocol'))
     impalad = self.cluster.get_first_impalad()
@@ -1036,7 +1054,8 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
                                                  
"--scratch_dirs={scratch_dir}:5G",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
-                                    tmp_dir_placeholders=['scratch_dir'])
+                                    tmp_dir_placeholders=['scratch_dir'],
+                                    disable_log_buffering=True)
   def test_select(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. If the buffer_pool_limit parameter is not None, then 
this test

Reply via email to