This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 73de6517a IMPALA-14280: Deflake catalogd HA failover tests
73de6517a is described below
commit 73de6517a4a403edd569f1d79abda79332874fd4
Author: stiga-huang <[email protected]>
AuthorDate: Sun Aug 3 20:49:01 2025 +0800
IMPALA-14280: Deflake catalogd HA failover tests
Several tests on catalogd HA failover have a loop of the following
pattern:
- Do some operations
- Kills the active catalogd
- Verifies some results
- Starts the killed catalogd
After starting the killed catalogd, the test gets the new active and
standby catalogds and check their /healthz pages immediately. This could
fail if the web pages are not registered yet. The cause is when starting
catalogd, we just wait for its 'statestore-subscriber.connected' to be
True. This doesn't guarantee that the web pages are initialized. This
patch adds a wait for this, i.e. when getting the web pages hits 404
(Not Found) error, wait and retry.
Another flaky issue of these failover tests is cleanup unique_database
could fail due to impalad still using the old active catalogd address
even in RPC failure retries (IMPALA-14228). This patch adds a retry on
the DROP DATABASE statement to work around this.
Sets disable_log_buffering to True so the killed catalogd has complete
logs.
Sets catalog_client_connection_num_retries to 2 to save time in
coordinator retrying RPCs to the killed catalogd. This reduce the
duration of test_warmed_up_metadata_failover_catchup from 100s to 50s.
Tests:
- Ran all (15) failover tests in test_catalogd_ha.py 10 times (each
round takes 450s).
Change-Id: Iad42a55ed7c357ed98d85c69e16ff705a8cae89d
Reviewed-on: http://gerrit.cloudera.org:8080/23235
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Quanlong Huang <[email protected]>
---
tests/common/impala_cluster.py | 2 ++
tests/common/impala_service.py | 18 ++++++++++
tests/conftest.py | 13 +++++--
tests/custom_cluster/test_catalogd_ha.py | 60 +++++++++++++++++++++-----------
4 files changed, 71 insertions(+), 22 deletions(-)
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 96c436735..7f422c2cf 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -740,6 +740,8 @@ class CatalogdProcess(BaseImpalaProcess):
if wait_until_ready:
self.service.wait_for_metric_value('statestore-subscriber.connected',
expected_value=1, timeout=30)
+ # Also wait until web pages are initialized
+ self.service.wait_for_page_ready("healthz")
def set_jvm_log_level(self, class_name, level):
"""Helper method to set JVM log level for certain class name."""
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index d0d10190d..b00a93867 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -163,6 +163,24 @@ class BaseImpalaService(object):
metric_name, expected_value, timeout, value, total_wait))
self.__metric_timeout_assert(metric_name, expected_value, timeout, value)
+ def wait_for_page_ready(self, page_name, timeout=10, interval=1):
+ start_time = time()
+ total_wait = 0
+ while (total_wait < timeout):
+ response = self.open_debug_webpage(page_name)
+ total_wait = time() - start_time
+ if response.status_code == requests.codes.not_found:
+ LOG.info(
+ "Debug webpage {}:{}/{} not yet available. Sleeping {}s before
next retry"
+ .format(self.webserver_interface, self.webserver_port, page_name,
+ interval))
+ sleep(interval)
+ continue
+ LOG.info(
+ "Debug webpage {}:{}/{} is ready. total_wait: {}s"
+ .format(self.webserver_interface, self.webserver_port, page_name,
total_wait))
+ return
+
def __request_minidump(self, pid):
"""
Impala processes (impalad, catalogd, statestored) have a signal handler for
diff --git a/tests/conftest.py b/tests/conftest.py
index 9c3154488..0dc702337 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -389,8 +389,17 @@ def unique_database(request, testid_checksum):
'characters.'.format(db_name))
def cleanup_database(client, db_name, must_exist):
- result = client.execute('DROP DATABASE {0} `{1}` CASCADE'.format(
- "" if must_exist else "IF EXISTS", db_name))
+ for i in range(2):
+ try:
+ result = client.execute('DROP DATABASE {0} `{1}` CASCADE'.format(
+ "" if must_exist else "IF EXISTS", db_name))
+ break
+ except Exception as e:
+ if i == 0:
+ # Retry in case we hit IMPALA-14228.
+ LOG.warn("Ignored cleanup failure once: " + str(e))
+ else:
+ raise e
assert result.success
# The database directory may not be removed if there are external tables
in the
# database when it is dropped. The external locations are not removed by
cascade.
diff --git a/tests/custom_cluster/test_catalogd_ha.py
b/tests/custom_cluster/test_catalogd_ha.py
index a3c3dbe53..e5bda08f3 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -107,9 +107,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
for catalogd in catalogds:
port = catalogd.get_webserver_port()
page = requests.get(self.HEALTHZ_URL.format(port))
- assert page.status_code == requests.codes.ok
+ LOG.info("Status of healthz page at port {}: {}".format(port,
page.status_code))
+ assert page.status_code == requests.codes.ok, "port {} not
ready".format(port)
page = requests.head(self.HEALTHZ_URL.format(port))
- assert page.status_code == requests.codes.ok
+ LOG.info("Status of healthz page at port {}: {}".format(port,
page.status_code))
+ assert page.status_code == requests.codes.ok, "port {} not
ready".format(port)
first_impalad = self.cluster.get_first_impalad()
page =
requests.head(self.HEALTHZ_URL.format(first_impalad.get_webserver_port()))
assert page.status_code == requests.codes.ok
@@ -242,7 +244,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
statestored_args=SS_AUTO_FAILOVER_ARGS,
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_catalogd_auto_failover(self, unique_database):
"""Tests for Catalog Service auto fail over without failed RPCs."""
self.__test_catalogd_auto_failover(unique_database)
@@ -261,7 +264,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
+ "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]"),
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
"""Tests for Catalog Service auto fail over with failed RPCs."""
self.__test_catalogd_auto_failover(unique_database)
@@ -281,7 +285,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
# minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
catalogd_args="--reset_metadata_lock_duration_ms=100 "
"--debug_actions=reset_metadata_loop_locked:SLEEP@50",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
@UniqueDatabase.parametrize(name_prefix='aaa_test_catalogd_auto_failover_slow_first_db')
def test_catalogd_auto_failover_slow_first_db(self, unique_database):
"""Tests for Catalog Service auto fail over with both slow metadata reset
and slow
@@ -296,7 +301,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
# minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
catalogd_args="--reset_metadata_lock_duration_ms=100 "
"--debug_actions=reset_metadata_loop_locked:SLEEP@50",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
@UniqueDatabase.parametrize(name_prefix='zzz_test_catalogd_auto_failover_slow_last_db')
def test_catalogd_auto_failover_slow_last_db(self, unique_database):
"""Tests for Catalog Service auto fail over with both slow metadata reset
and slow
@@ -371,7 +377,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--statestore_heartbeat_frequency_ms=1000",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_catalogd_manual_failover(self, unique_database):
"""Tests for Catalog Service manual fail over without failed RPCs."""
self.__test_catalogd_manual_failover(unique_database)
@@ -390,7 +397,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_catalogd_manual_failover_with_failed_rpc(self, unique_database):
"""Tests for Catalog Service manual fail over with failed RPCs."""
self.__test_catalogd_manual_failover(unique_database)
@@ -407,7 +415,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
"--statestore_heartbeat_frequency_ms=1000",
impalad_args="--debug_actions=IGNORE_NEW_ACTIVE_CATALOGD_ADDR:[email protected]",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_manual_failover_with_coord_ignore_notification(self):
"""Tests for Catalog Service manual failover with coordinators to ignore
failover
notification."""
@@ -476,7 +485,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
catalogd_args="--force_catalogd_active=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_two_catalogd_with_force_active(self, unique_database):
"""The test case for cluster started with catalogd HA enabled and
both catalogds started with 'force_catalogd_active' as true.
@@ -489,7 +499,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--debug_actions='catalogd_wait_sync_ddl_version_delay:SLEEP@{0}'"
.format(SYNC_DDL_DELAY_S * 1000),
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_catalogd_failover_with_sync_ddl(self, unique_database):
"""Tests for Catalog Service force fail-over when running DDL with SYNC_DDL
enabled."""
@@ -528,7 +539,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
"--catalog_topic_mode=minimal",
impalad_args="--use_local_catalog=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_metadata_after_failover(self, unique_database):
self._test_metadata_after_failover(
unique_database, self._create_native_fn, self._verify_native_fn)
@@ -541,7 +553,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--catalog_topic_mode=minimal "
"--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@1000",
impalad_args="--use_local_catalog=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_metadata_after_failover_with_delayed_reset(self, unique_database):
self._test_metadata_after_failover(
unique_database, self._create_native_fn, self._verify_native_fn)
@@ -554,7 +567,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--catalog_topic_mode=minimal --enable_reload_events=true "
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
impalad_args="--use_local_catalog=true",
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_metadata_after_failover_with_hms_sync(self, unique_database):
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@@ -565,7 +579,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_warmed_up_metadata_after_failover(self, unique_database):
"""Verify that the metadata is warmed up in the standby catalogd."""
for catalogd in self.__get_catalogds():
@@ -587,7 +602,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--catalogd_ha_failover_catchup_timeout_s=2 "
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_failover_catchup_timeout_and_reset(self, unique_database):
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@@ -600,7 +616,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
- start_args="--enable_catalogd_ha")
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_failover_catchup_timeout_not_reset(self, unique_database):
# Skip verifying the table existence since it's missing due to catalog not
reset.
latest_catalogd, _ = self._test_metadata_after_failover(
@@ -625,12 +642,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000 "
"--enable_reload_events=true --warmup_tables_config_file="
"file://%s/testdata/data/warmup_test_config.txt" %
IMPALA_HOME,
- impalad_args="--use_local_catalog=true",
- start_args="--enable_catalogd_ha")
+ impalad_args="--catalog_client_connection_num_retries=2 "
+ "--use_local_catalog=true",
+ start_args="--enable_catalogd_ha",
+ disable_log_buffering=True)
def test_warmed_up_metadata_failover_catchup(self):
"""All tables under the 'warmup_test_db' will be warmed up based on the
config.
Use local-catalog mode so coordinator needs to fetch metadata from
catalogd after
- each DDL."""
+ each DDL. Use a smaller catalog_client_connection_num_retries since RPC
retries will
+ all fail due to IMPALA-14228. We retry the query instead."""
db = "warmup_test_db"
self.execute_query("create database if not exists " + db)
try: