This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bdd9c7f3 IMPALA-14227: (Addendum) Add more tests for catalogd HA warm 
failover
5bdd9c7f3 is described below

commit 5bdd9c7f392af0273f73ba9be4c4f549dc54af8a
Author: stiga-huang <[email protected]>
AuthorDate: Tue Jul 22 10:55:08 2025 +0800

    IMPALA-14227: (Addendum) Add more tests for catalogd HA warm failover
    
    This adds more tests in test_catalogd_ha.py for warm failover.
    Refactored _test_metadata_after_failover to run in the following way:
     - Run DDL/DML in the active catalogd.
     - Kill the active catalogd and wait until the failover finishes.
     - Verify the DDL/DML results in the new active catalogd.
     - Restart the killed catalogd
    It accepts two methods in parameters to perform the DDL/DML and the
    verifier. In the last step, the killed catalogd is started so we keep
    having 2 catalogd and can merge these into a single test by invoking
    _test_metadata_after_failover for different method pairs. This saves
    some test time.
    
    The following DDL/DML statements are tested:
     - CreateTable
     - AddPartition
     - REFRESH
     - DropPartition
     - INSERT
     - DropTable
    After each failover, the table is verified to be warmed up (i.e. loaded).
    
    Also validate flags in startup to make sure enable_insert_events and
    enable_reload_events are both set to true when warm failover is enabled,
    i.e. --catalogd_ha_reset_metadata_on_failover=false.
    
    Change-Id: I6b20adeb0bd175592b425e521138c41196347600
    Reviewed-on: http://gerrit.cloudera.org:8080/23206
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 be/src/common/init.cc                              |  25 ++-
 bin/rat_exclude_files.txt                          |   1 +
 .../impala/catalog/CatalogServiceCatalog.java      |   2 +-
 testdata/data/warmup_test_config.txt               |   1 +
 tests/custom_cluster/test_catalogd_ha.py           | 223 ++++++++++++++++-----
 tests/custom_cluster/test_ext_data_sources.py      |   3 +-
 6 files changed, 197 insertions(+), 58 deletions(-)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 460ed3cea..ee4ea6dd7 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -87,6 +87,8 @@ DECLARE_int64(thrift_rpc_max_message_size);
 DECLARE_int64(thrift_external_rpc_max_message_size);
 DECLARE_double(hms_event_polling_interval_s);
 DECLARE_bool(catalogd_ha_reset_metadata_on_failover);
+DECLARE_bool(enable_insert_events);
+DECLARE_bool(enable_reload_events);
 
 DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in 
milliseconds "
     "between memory maintenance iterations");
@@ -564,12 +566,23 @@ void impala::InitCommonRuntime(int argc, char** argv, 
bool init_jvm,
             FLAGS_thrift_external_rpc_max_message_size, 
ThriftDefaultMaxMessageSize()));
   }
 
-  if (!FLAGS_catalogd_ha_reset_metadata_on_failover
-      && FLAGS_hms_event_polling_interval_s <= 0) {
-    CLEAN_EXIT_WITH_ERROR(Substitute(
-        "Invalid hms_event_polling_interval_s: $0. It should be larger than 0 
when "
-        "--catalogd_ha_reset_metadata_on_failover is false",
-        FLAGS_hms_event_polling_interval_s));
+  if (!FLAGS_catalogd_ha_reset_metadata_on_failover) {
+    if (FLAGS_hms_event_polling_interval_s <= 0) {
+      CLEAN_EXIT_WITH_ERROR(Substitute(
+          "Invalid hms_event_polling_interval_s: $0. It should be larger than 
0 when "
+          "--catalogd_ha_reset_metadata_on_failover is false",
+          FLAGS_hms_event_polling_interval_s));
+    }
+    if (!FLAGS_enable_insert_events) {
+      CLEAN_EXIT_WITH_ERROR(Substitute(
+          "--enable_insert_events should be true when "
+          "--catalogd_ha_reset_metadata_on_failover is false"));
+    }
+    if (!FLAGS_enable_reload_events) {
+      CLEAN_EXIT_WITH_ERROR(Substitute(
+          "--enable_reload_events should be true when "
+          "--catalogd_ha_reset_metadata_on_failover is false"));
+    }
   }
 
   impala::InitGoogleLoggingSafe(argv[0]);
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index b2d881961..3a9b38861 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -175,6 +175,7 @@ testdata/data/sfs_d2.txt
 testdata/data/sfs_d4.txt
 testdata/data/load_data_with_catalog_v1.txt
 testdata/data/warmup_table_list.txt
+testdata/data/warmup_test_config.txt
 testdata/datasets/functional/functional_schema_template.sql
 testdata/impala-profiles/README
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d81008a89..a5fce4204 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -481,7 +481,7 @@ public class CatalogServiceCatalog extends Catalog {
     keepsWarmupTablesLoaded_ = 
BackendConfig.INSTANCE.keepsWarmupTablesLoaded();
     warmupTables_ = FileSystemUtil.loadWarmupTableNames(
         BackendConfig.INSTANCE.getWarmupTablesConfigFile());
-    LOG.info("Loaded {} table names to warmup", warmupTables_.size());
+    LOG.info("Loaded {} table names to warmup:\n{}", warmupTables_.size(), 
warmupTables_);
   }
 
   /**
diff --git a/testdata/data/warmup_test_config.txt 
b/testdata/data/warmup_test_config.txt
new file mode 100644
index 000000000..f7097b9fc
--- /dev/null
+++ b/testdata/data/warmup_test_config.txt
@@ -0,0 +1 @@
+warmup_test_db.*
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 8fd02b198..a3c3dbe53 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -23,7 +23,7 @@ import requests
 import time
 
 from builtins import round
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite, 
IMPALA_HOME
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_connection import ERROR
 from tests.common.parametrize import UniqueDatabase
@@ -175,8 +175,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
   def __test_catalogd_auto_failover(self, unique_database):
     """Stop active catalogd and verify standby catalogd becomes active.
     Restart original active catalogd. Verify that statestore does not resume 
its
-    active role. If test_query_fail_during_failover is True, run a query 
during failover
-    and comfirm that it is fail."""
+    active role. Run a query during failover and comfirm that it is fail."""
     (active_catalogd, standby_catalogd) = self.__get_catalogds()
     catalogd_service_1 = active_catalogd.service
     catalogd_service_2 = standby_catalogd.service
@@ -241,7 +240,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(
     statestored_args=SS_AUTO_FAILOVER_ARGS,
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--enable_reload_events=true",
     start_args="--enable_catalogd_ha")
   def test_catalogd_auto_failover(self, unique_database):
     """Tests for Catalog Service auto fail over without failed RPCs."""
@@ -259,7 +259,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args=(
         SS_AUTO_FAILOVER_ARGS
         + "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]"),
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--enable_reload_events=true",
     start_args="--enable_catalogd_ha")
   def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
     """Tests for Catalog Service auto fail over with failed RPCs."""
@@ -368,7 +369,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true "
                      "--statestore_heartbeat_frequency_ms=1000",
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--enable_reload_events=true",
     start_args="--enable_catalogd_ha")
   def test_catalogd_manual_failover(self, unique_database):
     """Tests for Catalog Service manual fail over without failed RPCs."""
@@ -386,7 +388,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestored_args="--use_subscriber_id_as_catalogd_priority=true "
                      "--statestore_heartbeat_frequency_ms=1000 "
                      
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]",
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--enable_reload_events=true",
     start_args="--enable_catalogd_ha")
   def test_catalogd_manual_failover_with_failed_rpc(self, unique_database):
     """Tests for Catalog Service manual fail over with failed RPCs."""
@@ -522,10 +525,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
+                  "--catalog_topic_mode=minimal",
+    impalad_args="--use_local_catalog=true",
     start_args="--enable_catalogd_ha")
   def test_metadata_after_failover(self, unique_database):
-    self._test_metadata_after_failover(unique_database)
+    self._test_metadata_after_failover(
+        unique_database, self._create_native_fn, self._verify_native_fn)
+    self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._verify_new_table)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -535,43 +543,54 @@ class TestCatalogdHA(CustomClusterTestSuite):
     impalad_args="--use_local_catalog=true",
     start_args="--enable_catalogd_ha")
   def test_metadata_after_failover_with_delayed_reset(self, unique_database):
-    self._test_metadata_after_failover(unique_database)
+    self._test_metadata_after_failover(
+        unique_database, self._create_native_fn, self._verify_native_fn)
+    self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._verify_new_table)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
-                  "--catalog_topic_mode=minimal "
+                  "--catalog_topic_mode=minimal --enable_reload_events=true "
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
     impalad_args="--use_local_catalog=true",
     start_args="--enable_catalogd_ha")
   def test_metadata_after_failover_with_hms_sync(self, unique_database):
-    self._test_metadata_after_failover(unique_database, skip_func_test=True)
+    self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._verify_new_table)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
-                  "--warmup_tables_config_file="
+                  "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
     start_args="--enable_catalogd_ha")
   def test_warmed_up_metadata_after_failover(self, unique_database):
     """Verify that the metadata is warmed up in the standby catalogd."""
     for catalogd in self.__get_catalogds():
       self._test_warmed_up_tables(catalogd.service)
-    latest_catalogd = self._test_metadata_after_failover(
-        unique_database, skip_func_test=True)
-    self._test_warmed_up_tables(latest_catalogd)
+    # TODO: due to IMPALA-14210 the standby catalogd can't update the native 
function
+    #  list by applying the ALTER_DATABASE event. So not testing native 
functions
+    #  creation until IMPALA-14210 is resolved.
+    active_catalogd, _ = self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._verify_new_table)
+    self._test_warmed_up_tables(active_catalogd.service)
+    active_catalogd, _ = self._test_metadata_after_failover(
+        unique_database, self._drop_table, self._verify_table_dropped)
+    self._test_warmed_up_tables(active_catalogd.service)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
                   "--catalogd_ha_failover_catchup_timeout_s=2 "
-                  "--warmup_tables_config_file="
+                  "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
     start_args="--enable_catalogd_ha")
   def test_failover_catchup_timeout_and_reset(self, unique_database):
-    self._test_metadata_after_failover(unique_database, skip_func_test=True)
+    self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._verify_new_table)
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -579,15 +598,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
                   "--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
                   "--catalogd_ha_failover_catchup_timeout_s=2 "
                   
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
-                  "--warmup_tables_config_file="
+                  "--enable_reload_events=true --warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
     start_args="--enable_catalogd_ha")
   def test_failover_catchup_timeout_not_reset(self, unique_database):
-    # Use allow_table_not_exists=True since the table is missing due to 
catalog not reset.
-    latest_catalogd = self._test_metadata_after_failover(
-        unique_database, allow_table_not_exists=True, skip_func_test=True)
+    # Skip verifying the table existence since it's missing due to catalog not 
reset.
+    latest_catalogd, _ = self._test_metadata_after_failover(
+        unique_database, self._create_new_table, self._noop_verifier)
     # Verify tables are still loaded
-    self._test_warmed_up_tables(latest_catalogd)
+    self._test_warmed_up_tables(latest_catalogd.service)
     # Run a global IM to bring up 'unique_database' in the new catalogd. 
Otherwise, the
     # cleanup_database step will fail.
     self.execute_query("invalidate metadata")
@@ -599,22 +618,126 @@ class TestCatalogdHA(CustomClusterTestSuite):
       catalogd.verify_table_metadata_loaded(db, table)
     catalogd.verify_table_metadata_loaded(db, "store", expect_loaded=False)
 
-  def _test_metadata_after_failover(self, unique_database,
-                                    allow_table_not_exists=False, 
skip_func_test=False):
-    """Verify that the metadata is correct after failover. Returns the current 
active
-    catalogd"""
-    (active_catalogd, standby_catalogd) = self.__get_catalogds()
-    catalogd_service_2 = standby_catalogd.service
-
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalog_topic_mode=minimal "
+                  "--catalogd_ha_reset_metadata_on_failover=false "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@1000 "
+                  "--enable_reload_events=true --warmup_tables_config_file="
+                  "file://%s/testdata/data/warmup_test_config.txt" % 
IMPALA_HOME,
+    impalad_args="--use_local_catalog=true",
+    start_args="--enable_catalogd_ha")
+  def test_warmed_up_metadata_failover_catchup(self):
+    """All tables under the 'warmup_test_db' will be warmed up based on the 
config.
+    Use local-catalog mode so coordinator needs to fetch metadata from 
catalogd after
+    each DDL."""
+    db = "warmup_test_db"
+    self.execute_query("create database if not exists " + db)
+    try:
+      self._test_metadata_after_failover(
+          db, self._create_new_table, self._verify_new_table)
+      active_catalogd, _ = self._test_metadata_after_failover(
+          db, self._add_new_partition, self._verify_new_partition)
+      active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
+      active_catalogd, _ = self._test_metadata_after_failover(
+          db, self._refresh_table, self._verify_refresh)
+      active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
+      active_catalogd, _ = self._test_metadata_after_failover(
+        db, self._drop_partition, self._verify_no_partitions)
+      active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
+      active_catalogd, _ = self._test_metadata_after_failover(
+        db, self._insert_table, self._verify_insert)
+      active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
+      self._test_metadata_after_failover(
+          db, self._drop_table, self._verify_table_dropped)
+    finally:
+      for i in range(2):
+        try:
+          self.execute_query("drop database if exists %s cascade" % db)
+          break
+        except Exception as e:
+          # Retry in case we hit IMPALA-14228.
+          LOG.warn("Ignored cleanup failure: " + str(e))
+
+  def _create_native_fn(self, unique_database):
     create_func_impala = ("create function {database}.identity_tmp(bigint) "
                           "returns bigint location '{location}' 
symbol='Identity'")
     self.client.execute(create_func_impala.format(
-        database=unique_database,
-        location=get_fs_path('/test-warehouse/libTestUdfs.so')))
+      database=unique_database,
+      location=get_fs_path('/test-warehouse/libTestUdfs.so')))
+    self.execute_query_expect_success(
+      self.client, "select %s.identity_tmp(10)" % unique_database)
+
+  def _verify_native_fn(self, unique_database):
     self.execute_query_expect_success(
-        self.client, "select %s.identity_tmp(10)" % unique_database)
+      self.client, "select %s.identity_tmp(10)" % unique_database)
+
+  def _create_new_table(self, unique_database):
+    table_location = get_fs_path("/test-warehouse/%s.tbl" % unique_database)
+    self.client.execute("create table %s.tbl like functional.alltypes"
+                        " stored as parquet location '%s'"
+                        % (unique_database, table_location))
+
+  def _verify_new_table(self, unique_database):
+    self.execute_query("describe %s.tbl" % unique_database)
+
+  def _drop_table(self, unique_database):
+    self.client.execute("drop table %s.tbl" % unique_database)
+
+  def _verify_table_dropped(self, unique_database):
+    res = self.client.execute("show tables in " + unique_database)
+    assert len(res.data) == 0
+
+  def _add_new_partition(self, unique_database):
+    self.execute_query("alter table %s.tbl add partition(year=2025, month=1)" %
+                       unique_database)
+
+  def _verify_new_partition(self, unique_database):
+    res = self.execute_query("show partitions %s.tbl" % unique_database)
+    LOG.info("partition result: {}".format(res.data))
+    assert "year=2025/month=1" in res.data[0]
+
+  def _refresh_table(self, unique_database):
+    """Add a new file to the table and refresh it"""
+    table_location = get_fs_path("/test-warehouse/%s.tbl" % unique_database)
+    src_file = 
get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
+                           "day=9/*.parq")
+    dst_path = "%s/year=2025/month=1/alltypes.parq" % table_location
+    self.filesystem_client.copy(src_file, dst_path, overwrite=True)
+    self.execute_query("refresh %s.tbl partition (year=2025, month=1)" % 
unique_database)
+
+  def _verify_refresh(self, unique_database):
+    res = self.execute_query("select count(*) from %s.tbl where year=2025 and 
month=1"
+                             % unique_database)
+    assert res.data == ["1000"]
+
+  def _drop_partition(self, unique_database):
+    self.execute_query("alter table %s.tbl drop partition(year=2025, month=1)"
+                       % unique_database)
+
+  def _verify_no_partitions(self, unique_database):
+    res = self.execute_query("show partitions %s.tbl" % unique_database)
+    # The result should only have the "Total" line.
+    assert len(res.data) == 1
+
+  def _insert_table(self, unique_database):
+    self.execute_query("insert into %s.tbl partition(year, month)"
+                       " select * from functional.alltypes" % unique_database)
+
+  def _verify_insert(self, unique_database):
+    res = self.execute_scalar("select count(*) from %s.tbl" % unique_database)
+    assert res == '7300'
+
+  def _noop_verifier(self, unique_database):  # noqa: U100
+    pass
+
+  def _test_metadata_after_failover(self, unique_database, metadata_op_fn, 
verifier_fn):
+    """Verify that the metadata is correct after failover. Returns the updated 
tuple of
+    (active_catalogd, standby_catalogd)"""
+    (active_catalogd, standby_catalogd) = self.__get_catalogds()
+    catalogd_service_2 = standby_catalogd.service
 
-    self.client.execute("create table %s.tbl(i int)" % unique_database)
+    metadata_op_fn(unique_database)
 
     # Kill active catalogd
     active_catalogd.kill()
@@ -627,22 +750,22 @@ class TestCatalogdHA(CustomClusterTestSuite):
         "catalog-server.ha-number-active-status-change") > 0
     assert catalogd_service_2.get_metric_value("catalog-server.active-status")
 
-    # TODO: due to IMPALA-14210 the standby catalogd can't update the native 
function
-    #  list by applying the ALTER_DATABASE event. So this will fail as 
function not found.
-    #  Remove this condition after IMPALA-14210 is resolved.
-    if not skip_func_test:
-      self.execute_query_expect_success(
-          self.client, "select %s.identity_tmp(10)" % unique_database)
-
-    # Check if the new active catalogd has the new table in its cache.
-    try:
-      self.execute_query("describe %s.tbl" % unique_database)
-    except Exception as e:
-      if not allow_table_not_exists:
-        # Due to IMPALA-14228, the query could still fail. But it's not due to 
stale
-        # metadata so allow this until we resolve IMPALA-14228.
-        assert "Error making an RPC call to Catalog server" in str(e)
-    return catalogd_service_2
+    for i in range(2):
+      try:
+        verifier_fn(unique_database)
+        break
+      except Exception as e:
+        if i == 0:
+          # Due to IMPALA-14228, we allow retry on connection failure to the 
previous
+          # active catalogd. Example error message:
+          # Couldn't open transport for xxx:26000 (connect() failed: 
Connection refused)
+          assert str(active_catalogd.service.service_port) in str(e)
+          LOG.info("Retry for error " + str(e))
+          continue
+        assert False, str(e)
+
+    active_catalogd.start()
+    return standby_catalogd, active_catalogd
 
   def test_page_with_disable_ha(self):
     self.__test_catalog_ha_info_page()
diff --git a/tests/custom_cluster/test_ext_data_sources.py 
b/tests/custom_cluster/test_ext_data_sources.py
index bfd739c4c..9d1bae9ef 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -113,7 +113,8 @@ class TestExtDataSources(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true "
                      "--statestore_heartbeat_frequency_ms=1000",
-    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--enable_reload_events=true",
     start_args="--enable_catalogd_ha")
   def test_catalogd_ha_failover(self):
     """The test case for cluster started with catalogd HA enabled."""

Reply via email to