This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 73de6517a IMPALA-14280: Deflake catalogd HA failover tests
73de6517a is described below

commit 73de6517a4a403edd569f1d79abda79332874fd4
Author: stiga-huang <[email protected]>
AuthorDate: Sun Aug 3 20:49:01 2025 +0800

    IMPALA-14280: Deflake catalogd HA failover tests
    
    Several tests on catalogd HA failover have a loop of the following
    pattern:
     - Do some operations
     - Kills the active catalogd
     - Verifies some results
     - Starts the killed catalogd
    After starting the killed catalogd, the test gets the new active and
    standby catalogds and check their /healthz pages immediately. This could
    fail if the web pages are not registered yet. The cause is when starting
    catalogd, we just wait for its 'statestore-subscriber.connected' to be
    True. This doesn't guarantee that the web pages are initialized. This
    patch adds a wait for this, i.e. when getting the web pages hits 404
    (Not Found) error, wait and retry.
    
    Another flaky issue of these failover tests is cleanup unique_database
    could fail due to impalad still using the old active catalogd address
    even in RPC failure retries (IMPALA-14228). This patch adds a retry on
    the DROP DATABASE statement to work around this.
    
    Sets disable_log_buffering to True so the killed catalogd has complete
    logs.
    
    Sets catalog_client_connection_num_retries to 2 to save time in
    coordinator retrying RPCs to the killed catalogd. This reduce the
    duration of test_warmed_up_metadata_failover_catchup from 100s to 50s.
    
    Tests:
     - Ran all (15) failover tests in test_catalogd_ha.py 10 times (each
       round takes 450s).
    
    Change-Id: Iad42a55ed7c357ed98d85c69e16ff705a8cae89d
    Reviewed-on: http://gerrit.cloudera.org:8080/23235
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Quanlong Huang <[email protected]>
---
 tests/common/impala_cluster.py           |  2 ++
 tests/common/impala_service.py           | 18 ++++++++++
 tests/conftest.py                        | 13 +++++--
 tests/custom_cluster/test_catalogd_ha.py | 60 +++++++++++++++++++++-----------
 4 files changed, 71 insertions(+), 22 deletions(-)

diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 96c436735..7f422c2cf 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -740,6 +740,8 @@ class CatalogdProcess(BaseImpalaProcess):
     if wait_until_ready:
       self.service.wait_for_metric_value('statestore-subscriber.connected',
                                          expected_value=1, timeout=30)
+      # Also wait until web pages are initialized
+      self.service.wait_for_page_ready("healthz")
 
   def set_jvm_log_level(self, class_name, level):
     """Helper method to set JVM log level for certain class name."""
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index d0d10190d..b00a93867 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -163,6 +163,24 @@ class BaseImpalaService(object):
                metric_name, expected_value, timeout, value, total_wait))
     self.__metric_timeout_assert(metric_name, expected_value, timeout, value)
 
+  def wait_for_page_ready(self, page_name, timeout=10, interval=1):
+    start_time = time()
+    total_wait = 0
+    while (total_wait < timeout):
+      response = self.open_debug_webpage(page_name)
+      total_wait = time() - start_time
+      if response.status_code == requests.codes.not_found:
+        LOG.info(
+            "Debug webpage {}:{}/{} not yet available. Sleeping {}s before 
next retry"
+            .format(self.webserver_interface, self.webserver_port, page_name,
+                    interval))
+        sleep(interval)
+        continue
+      LOG.info(
+          "Debug webpage {}:{}/{} is ready. total_wait: {}s"
+          .format(self.webserver_interface, self.webserver_port, page_name, 
total_wait))
+      return
+
   def __request_minidump(self, pid):
     """
     Impala processes (impalad, catalogd, statestored) have a signal handler for
diff --git a/tests/conftest.py b/tests/conftest.py
index 9c3154488..0dc702337 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -389,8 +389,17 @@ def unique_database(request, testid_checksum):
                        'characters.'.format(db_name))
 
   def cleanup_database(client, db_name, must_exist):
-    result = client.execute('DROP DATABASE {0} `{1}` CASCADE'.format(
-      "" if must_exist else "IF EXISTS", db_name))
+    for i in range(2):
+      try:
+        result = client.execute('DROP DATABASE {0} `{1}` CASCADE'.format(
+          "" if must_exist else "IF EXISTS", db_name))
+        break
+      except Exception as e:
+        if i == 0:
+          # Retry in case we hit IMPALA-14228.
+          LOG.warn("Ignored cleanup failure once: " + str(e))
+        else:
+          raise e
     assert result.success
     # The database directory may not be removed if there are external tables 
in the
     # database when it is dropped. The external locations are not removed by 
cascade.
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index a3c3dbe53..e5bda08f3 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -107,9 +107,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
     for catalogd in catalogds:
       port = catalogd.get_webserver_port()
       page = requests.get(self.HEALTHZ_URL.format(port))
-      assert page.status_code == requests.codes.ok
+      LOG.info("Status of healthz page at port {}: {}".format(port, 
page.status_code))
+      assert page.status_code == requests.codes.ok, "port {} not 
ready".format(port)
       page = requests.head(self.HEALTHZ_URL.format(port))
-      assert page.status_code == requests.codes.ok
+      LOG.info("Status of healthz page at port {}: {}".format(port, 
page.status_code))
+      assert page.status_code == requests.codes.ok, "port {} not 
ready".format(port)
     first_impalad = self.cluster.get_first_impalad()
     page = 
requests.head(self.HEALTHZ_URL.format(first_impalad.get_webserver_port()))
     assert page.status_code == requests.codes.ok
@@ -242,7 +244,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args=SS_AUTO_FAILOVER_ARGS,
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--enable_reload_events=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_catalogd_auto_failover(self, unique_database):
     """Tests for Catalog Service auto fail over without failed RPCs."""
     self.__test_catalogd_auto_failover(unique_database)
@@ -261,7 +264,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
         + "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]"),
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--enable_reload_events=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
     """Tests for Catalog Service auto fail over with failed RPCs."""
     self.__test_catalogd_auto_failover(unique_database)
@@ -281,7 +285,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
     catalogd_args="--reset_metadata_lock_duration_ms=100 "
                   "--debug_actions=reset_metadata_loop_locked:SLEEP@50",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   
@UniqueDatabase.parametrize(name_prefix='aaa_test_catalogd_auto_failover_slow_first_db')
   def test_catalogd_auto_failover_slow_first_db(self, unique_database):
     """Tests for Catalog Service auto fail over with both slow metadata reset 
and slow
@@ -296,7 +301,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
     catalogd_args="--reset_metadata_lock_duration_ms=100 "
                   "--debug_actions=reset_metadata_loop_locked:SLEEP@50",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   
@UniqueDatabase.parametrize(name_prefix='zzz_test_catalogd_auto_failover_slow_last_db')
   def test_catalogd_auto_failover_slow_last_db(self, unique_database):
     """Tests for Catalog Service auto fail over with both slow metadata reset 
and slow
@@ -371,7 +377,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                      "--statestore_heartbeat_frequency_ms=1000",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--enable_reload_events=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_catalogd_manual_failover(self, unique_database):
     """Tests for Catalog Service manual fail over without failed RPCs."""
     self.__test_catalogd_manual_failover(unique_database)
@@ -390,7 +397,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                      
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--enable_reload_events=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_catalogd_manual_failover_with_failed_rpc(self, unique_database):
     """Tests for Catalog Service manual fail over with failed RPCs."""
     self.__test_catalogd_manual_failover(unique_database)
@@ -407,7 +415,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args="--use_subscriber_id_as_catalogd_priority=true "
                      "--statestore_heartbeat_frequency_ms=1000",
     impalad_args="--debug_actions=IGNORE_NEW_ACTIVE_CATALOGD_ADDR:[email protected]",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_manual_failover_with_coord_ignore_notification(self):
     """Tests for Catalog Service manual failover with coordinators to ignore 
failover
     notification."""
@@ -476,7 +485,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(
     catalogd_args="--force_catalogd_active=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_two_catalogd_with_force_active(self, unique_database):
     """The test case for cluster started with catalogd HA enabled and
     both catalogds started with 'force_catalogd_active' as true.
@@ -489,7 +499,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     
catalogd_args="--debug_actions='catalogd_wait_sync_ddl_version_delay:SLEEP@{0}'"
                   .format(SYNC_DDL_DELAY_S * 1000),
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_catalogd_failover_with_sync_ddl(self, unique_database):
     """Tests for Catalog Service force fail-over when running DDL with SYNC_DDL
     enabled."""
@@ -528,7 +539,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
                   "--catalog_topic_mode=minimal",
     impalad_args="--use_local_catalog=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_metadata_after_failover(self, unique_database):
     self._test_metadata_after_failover(
         unique_database, self._create_native_fn, self._verify_native_fn)
@@ -541,7 +553,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--catalog_topic_mode=minimal "
                   "--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@1000",
     impalad_args="--use_local_catalog=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_metadata_after_failover_with_delayed_reset(self, unique_database):
     self._test_metadata_after_failover(
         unique_database, self._create_native_fn, self._verify_native_fn)
@@ -554,7 +567,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--catalog_topic_mode=minimal --enable_reload_events=true "
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
     impalad_args="--use_local_catalog=true",
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_metadata_after_failover_with_hms_sync(self, unique_database):
     self._test_metadata_after_failover(
         unique_database, self._create_new_table, self._verify_new_table)
@@ -565,7 +579,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
                   "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_warmed_up_metadata_after_failover(self, unique_database):
     """Verify that the metadata is warmed up in the standby catalogd."""
     for catalogd in self.__get_catalogds():
@@ -587,7 +602,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--catalogd_ha_failover_catchup_timeout_s=2 "
                   "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_failover_catchup_timeout_and_reset(self, unique_database):
     self._test_metadata_after_failover(
         unique_database, self._create_new_table, self._verify_new_table)
@@ -600,7 +616,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
                   "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
-    start_args="--enable_catalogd_ha")
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_failover_catchup_timeout_not_reset(self, unique_database):
     # Skip verifying the table existence since it's missing due to catalog not 
reset.
     latest_catalogd, _ = self._test_metadata_after_failover(
@@ -625,12 +642,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@1000 "
                   "--enable_reload_events=true --warmup_tables_config_file="
                   "file://%s/testdata/data/warmup_test_config.txt" % 
IMPALA_HOME,
-    impalad_args="--use_local_catalog=true",
-    start_args="--enable_catalogd_ha")
+    impalad_args="--catalog_client_connection_num_retries=2 "
+                 "--use_local_catalog=true",
+    start_args="--enable_catalogd_ha",
+    disable_log_buffering=True)
   def test_warmed_up_metadata_failover_catchup(self):
     """All tables under the 'warmup_test_db' will be warmed up based on the 
config.
     Use local-catalog mode so coordinator needs to fetch metadata from 
catalogd after
-    each DDL."""
+    each DDL. Use a smaller catalog_client_connection_num_retries since RPC 
retries will
+    all fail due to IMPALA-14228. We retry the query instead."""
     db = "warmup_test_db"
     self.execute_query("create database if not exists " + db)
     try:

Reply via email to