This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new cb35dc876 IMPALA-13536: Workload Management Tests Failing on Init Check
cb35dc876 is described below
commit cb35dc87690b21a74acc7af599abd492db8cb52f
Author: jasonmfehr <[email protected]>
AuthorDate: Mon Nov 25 13:42:59 2024 -0800
IMPALA-13536: Workload Management Tests Failing on Init Check
Most of the workload management tests verify that the workload
management process has successfully completed. Part of this
verification ensures a catalog update has propagated the workload
management changes to the coordinators by determining the catalog
version, from the catalogd logs, that contains the workload
management table changes and ensuring that version is in the
coordinator logs.
The test flakiness occurs when multiple catalogd versions are
combined into a later version. Specifically, tests were failing
because the coordinator logs were checked for catalog version X but
the actual version in the coordinator logs was X+1.
The fix for the test flakiness is to allow for the exepected catalog
version or any later version.
Change-Id: I9f20a149ab1f45ee3506f098f8594965a24a89d3
Reviewed-on: http://gerrit.cloudera.org:8080/22200
Reviewed-by: Jason Fehr <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
tests/common/custom_cluster_test_suite.py | 16 ++++++---
tests/custom_cluster/test_query_live.py | 30 ++++++++++------
tests/custom_cluster/test_query_log.py | 57 ++++++++++++++++++++-----------
3 files changed, 70 insertions(+), 33 deletions(-)
diff --git a/tests/common/custom_cluster_test_suite.py
b/tests/common/custom_cluster_test_suite.py
index 582b2d211..c445d37f4 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -36,6 +36,7 @@ from tests.common.file_utils import cleanup_tmp_test_dir,
make_tmp_test_dir
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.util.filesystem_utils import IS_LOCAL
+from tests.util.retry import retry
from time import sleep
LOG = logging.getLogger(__name__)
@@ -385,10 +386,17 @@ class CustomClusterTestSuite(ImpalaTestSuite):
self.assert_catalogd_log_contains("INFO", r'Completed workload management '
r'initialization', timeout_s=timeout_s)
- ret = self.assert_catalogd_log_contains("INFO", r'A catalog update with
\d+ entries '
- r'is assembled. Catalog version: (\d+)', timeout_s=10,
expected_count=-1)
- self.assert_impalad_log_contains("INFO", r'Catalog topic update applied
with '
- r'version: {}'.format(ret.group(1)), timeout_s=30)
+ catalog_log = self.assert_catalogd_log_contains("INFO", r'A catalog update
with \d+ '
+ r'entries is assembled. Catalog version: (\d+)', timeout_s=10,
expected_count=-1)
+
+ def assert_func(last_iteration):
+ coord_log = self.assert_impalad_log_contains("INFO", r'Catalog topic
update '
+ r'applied with version: (\d+)', timeout_s=5, expected_count=-1)
+ return int(coord_log.group(1)) >= int(catalog_log.group(1))
+
+ assert retry(func=assert_func, max_attempts=10, sleep_time_s=3,
backoff=1), \
+ "Expected a catalog topic update with version '{}' or later, but no
such " \
+ "update was found.".format(catalog_log.group(1))
@classmethod
def _stop_impala_cluster(cls):
diff --git a/tests/custom_cluster/test_query_live.py
b/tests/custom_cluster/test_query_live.py
index 17fe3e3e0..134cfb9a1 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -78,7 +78,8 @@ class TestQueryLive(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live",
- catalogd_args="--enable_workload_mgmt")
+ catalogd_args="--enable_workload_mgmt",
+ disable_log_buffering=True)
def test_query_live(self):
"""Asserts the query live table shows and allows filtering queries."""
# Use a query that reads data from disk for the 1st one, as more
representative and a
@@ -178,7 +179,8 @@ class TestQueryLive(CustomClusterTestSuite):
catalogd_args="--enable_workload_mgmt "
"--catalog_topic_mode=minimal",
default_query_options=[
- ('default_transactional_type',
'insert_only')])
+ ('default_transactional_type',
'insert_only')],
+ disable_log_buffering=True)
def test_default_transactional(self):
"""Asserts the query live table works when impala is started with
default_transactional_type=insert_only."""
@@ -192,7 +194,8 @@ class TestQueryLive(CustomClusterTestSuite):
"--cluster_id=test_query_live
"
"--use_local_catalog=true",
catalogd_args="--enable_workload_mgmt "
-
"--catalog_topic_mode=minimal")
+
"--catalog_topic_mode=minimal",
+ disable_log_buffering=True)
def test_local_catalog(self):
"""Asserts the query live table works with local catalog mode."""
result = self.client.execute("select * from functional.alltypes",
@@ -240,7 +243,8 @@ class TestQueryLive(CustomClusterTestSuite):
"--cluster_id=test_query_live",
catalogd_args="--enable_workload_mgmt",
cluster_size=3,
- num_exclusive_coordinators=2)
+ num_exclusive_coordinators=2,
+ disable_log_buffering=True)
def test_dedicated_coordinators(self):
"""Asserts scans are performed only on coordinators."""
# Use a query that reads data from disk for the 1st one, as more
representative and a
@@ -273,7 +277,8 @@ class TestQueryLive(CustomClusterTestSuite):
"--cluster_id=test_query_live",
catalogd_args="--enable_workload_mgmt",
cluster_size=3,
- num_exclusive_coordinators=2)
+ num_exclusive_coordinators=2,
+ disable_log_buffering=True)
def test_executor_groups(self):
"""Asserts scans are performed only on coordinators with multiple executor
groups."""
# Add a (non-dedicated) coordinator and executor in a different executor
group.
@@ -291,7 +296,8 @@ class TestQueryLive(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live",
- catalogd_args="--enable_workload_mgmt")
+ catalogd_args="--enable_workload_mgmt",
+ disable_log_buffering=True)
def test_query_entries_are_unique(self):
"""Asserts queries in the query live table are unique."""
# Start a query and close it with a delay between CloseClientRequestState
and
@@ -357,7 +363,8 @@ class TestQueryLive(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live",
- catalogd_args="--enable_workload_mgmt")
+ catalogd_args="--enable_workload_mgmt",
+ disable_log_buffering=True)
def test_shutdown_coordinator(self):
"""Asserts query fails if a coordinator disappears after scheduling.
Depends on
test config of statestore_heartbeat_frequency_ms=50."""
@@ -381,7 +388,8 @@ class TestQueryLive(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live",
- catalogd_args="--enable_workload_mgmt")
+ catalogd_args="--enable_workload_mgmt",
+ disable_log_buffering=True)
def test_graceful_shutdown_coordinator(self):
"""Asserts query succeeds if another coordinator is shutdown gracefully
after
scheduling. Depends on test config of
statestore_heartbeat_frequency_ms=50."""
@@ -415,7 +423,8 @@ class TestQueryLive(CustomClusterTestSuite):
catalogd_args="--enable_workload_mgmt",
impalad_graceful_shutdown=True,
cluster_size=3,
- num_exclusive_coordinators=2)
+ num_exclusive_coordinators=2,
+ disable_log_buffering=True)
def test_multi_table_union(self):
"""Asserts only system table scan fragments are scheduled to
coordinators."""
utc_timestamp = self.execute_query('select utc_timestamp()')
@@ -449,7 +458,8 @@ class TestQueryLive(CustomClusterTestSuite):
"--cluster_id=test_query_live",
catalogd_args="--enable_workload_mgmt",
cluster_size=3,
- num_exclusive_coordinators=2)
+ num_exclusive_coordinators=2,
+ disable_log_buffering=True)
def test_multi_table_join(self, unique_database):
"""Asserts only system table scan fragments are scheduled to
coordinators."""
self.execute_query('create table {}.users (user
string)'.format(unique_database))
diff --git a/tests/custom_cluster/test_query_log.py
b/tests/custom_cluster/test_query_log.py
index 1b56c928b..b42e7f52a 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -91,7 +91,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
"--query_log_max_plan_length={0}"
.format(MAX_SQL_PLAN_LEN),
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_lower_max_sql_plan(self, vector):
"""Asserts that length limits on the sql and plan columns in the completed
queries
table are respected."""
@@ -127,7 +128,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
"--query_log_write_interval_s=1 "
"--cluster_id=test_max_select",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_sql_plan_too_long(self, vector):
"""Asserts that very long queries have their corresponding plan and sql
columns
shortened in the completed queries table."""
@@ -164,7 +166,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
"--query_log_size=0 "
"--query_log_size_in_bytes=0",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_no_query_log(self, vector):
"""Asserts queries are written to the completed queries table when the
in-memory
query log queue is turned off."""
@@ -195,7 +198,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
catalogd_args="--enable_workload_mgmt",
impalad_graceful_shutdown=True,
cluster_size=1,
- tmp_dir_placeholders=['query_data_cache'])
+ tmp_dir_placeholders=['query_data_cache'],
+ disable_log_buffering=True)
def test_query_data_cache(self, vector):
"""Asserts the values written to the query log table match the values from
the
query profile. Specifically focuses on the data cache metrics."""
@@ -237,7 +241,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
impala_log_dir=("{" + LOG_DIR_MAX_WRITES +
"}"),
catalogd_args="--enable_workload_mgmt",
impalad_graceful_shutdown=True,
- tmp_dir_placeholders=[LOG_DIR_MAX_WRITES])
+ tmp_dir_placeholders=[LOG_DIR_MAX_WRITES],
+ disable_log_buffering=True)
def test_max_attempts_exceeded(self, vector):
"""Asserts that completed queries are only attempted 3 times to be
inserted into the
completed queries table. This test deletes the completed queries table
thus it must
@@ -290,7 +295,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
catalogd_args="--enable_workload_mgmt",
default_query_options=[
('statement_expression_limit', 1024)],
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_flush_on_queued_count_exceeded(self, vector):
"""Asserts that queries that have completed are written to the query log
table when
the maximum number of queued records is reached. Also verifies that
writing
@@ -352,7 +358,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
cluster_size=3,
num_exclusive_coordinators=2,
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_dedicated_coordinator_no_mt_dop(self, vector):
"""Asserts the values written to the query log table match the values from
the
query profile when dedicated coordinators are used."""
@@ -379,7 +386,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
cluster_size=3,
num_exclusive_coordinators=2,
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_dedicated_coordinator_with_mt_dop(self, vector):
"""Asserts the values written to the query log table match the values from
the
query profile when dedicated coordinators are used along with an MT_DOP
setting
@@ -426,7 +434,8 @@ class TestQueryLogOtherTable(TestQueryLogTableBase):
"--blacklisted_dbs=information_schema "
"--query_log_table_name={0}"
.format(OTHER_TBL),
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_renamed_log_table(self, vector):
"""Asserts that the completed queries table can be renamed."""
@@ -467,7 +476,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
.format(HS2_OPERATIONS_CLUSTER_ID),
catalogd_args="--enable_workload_mgmt",
cluster_size=2,
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_hs2_metadata_operations(self, vector):
"""Certain HS2 operations appear to Impala as a special kind of query.
Specifically,
these operations have a type of unknown and a normally invalid sql
syntax. This
@@ -594,7 +604,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
"--cluster_id=test_query_hist_mult",
catalogd_args="--enable_workload_mgmt",
cluster_size=2,
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_query_multiple_tables(self, vector):
"""Asserts the values written to the query log table match the values from
the
query profile for a query that reads from multiple tables."""
@@ -622,7 +633,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
"--query_log_write_interval_s=1 "
"--cluster_id=test_query_hist_3",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_insert_select(self, vector, unique_database,
unique_name):
"""Asserts the values written to the query log table match the values from
the
@@ -654,7 +666,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--query_log_write_interval_s=15",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_flush_on_interval(self, vector):
"""Asserts that queries that have completed are written to the query log
table
after the specified write interval elapses."""
@@ -767,7 +780,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
"--query_log_write_interval_s=1 "
"--cluster_id=test_query_hist_2",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_ddl(self, vector, unique_database, unique_name):
"""Asserts the values written to the query log table match the values from
the
query profile for a DDL query."""
@@ -793,7 +807,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
"--query_log_write_interval_s=1 "
"--cluster_id=test_query_hist_3",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_dml(self, vector, unique_database, unique_name):
"""Asserts the values written to the query log table match the values from
the
query profile for a DML query."""
@@ -826,7 +841,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
"--query_log_write_interval_s=1 "
"--cluster_id=test_query_hist_2",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_invalid_query(self, vector):
"""Asserts correct values are written to the completed queries table for a
failed
query. The query profile is used as the source of expected values."""
@@ -857,7 +873,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--query_log_write_interval_s=1",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_ignored_sqls_not_written(self, vector):
"""Asserts that expected queries are not written to the query log table."""
client = self.get_client(vector.get_value('protocol'))
@@ -949,7 +966,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--query_log_write_interval_s=1",
catalogd_args="--enable_workload_mgmt",
- impalad_graceful_shutdown=True)
+ impalad_graceful_shutdown=True,
+ disable_log_buffering=True)
def test_sql_injection_attempts(self, vector):
client = self.get_client(vector.get_value('protocol'))
impalad = self.cluster.get_first_impalad()
@@ -1036,7 +1054,8 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
"--scratch_dirs={scratch_dir}:5G",
catalogd_args="--enable_workload_mgmt",
impalad_graceful_shutdown=True,
- tmp_dir_placeholders=['scratch_dir'])
+ tmp_dir_placeholders=['scratch_dir'],
+ disable_log_buffering=True)
def test_select(self, vector):
"""Asserts the values written to the query log table match the values from
the
query profile. If the buffer_pool_limit parameter is not None, then
this test