This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6ecb8bfcf4d9e68e0091aea64540e0fb64aeb3e0 Author: wzhou-code <[email protected]> AuthorDate: Fri Jul 28 09:33:17 2023 -0700 IMPALA-12323: DDL hang with SYNC_DDL=1 when CatalogD HA enabled When CatalogD HA is enabled, standby catalogd does not receive catalog topic updates from statestore and does not apply catalog updates from the active catalogd. Its min topic version is not changed. Function Statestore::GetMinSubscriberTopicVersion() loops through all subscribers to find min topic version. Standby catalogd causes min topic version not increased, and hence Impala server waits indefinitely in ImpalaServer::WaitForCatalogUpdateTopicPropagation(). This patch fixed the issue by skipping standby catalogd when finding min topic version in Statestore::GetMinSubscriberTopicVersion(). Testing: - Added unit-test code for CatalogD HA to run DDL with SYNC_DDL as 1. Verified that test cases hang without fix, and test cases were passed after fix. - Passed test_catalogd_ha.py. Change-Id: Ie559c711078f32171dfb2d2e2fda54773c0927c3 Reviewed-on: http://gerrit.cloudera.org:8080/20280 Reviewed-by: Andrew Sherman <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/statestore/statestore.cc | 6 ++++++ tests/custom_cluster/test_catalogd_ha.py | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 945d3a088..0ec515f73 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -986,6 +986,12 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion( bool found = false; // Find the minimum version processed for this topic across all topic subscribers. for (const SubscriberMap::value_type& subscriber: subscribers_) { + if (FLAGS_enable_catalogd_ha && subscriber.second->IsCatalogd() + && !catalog_manager_.IsActiveCatalogd(subscriber.second->id())) { + // Skip inactive catalogd since it does not apply catalog updates from the active + // catalogd. + continue; + } auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id); if (subscribed_topics->find(topic_id) != subscribed_topics->end()) { found = true; diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 3803883f7..97b666f14 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -55,8 +55,10 @@ class TestCatalogdHA(CustomClusterTestSuite): _, catalog_service_port = active_catalogd_address.split(":") assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port()) - def __run_simple_queries(self): + def __run_simple_queries(self, sync_ddl=False): try: + if sync_ddl: + self.execute_query_expect_success(self.client, "set SYNC_DDL=1") self.execute_query_expect_success( self.client, "drop table if exists test_catalogd_ha") self.execute_query_expect_success( @@ -90,6 +92,8 @@ class TestCatalogdHA(CustomClusterTestSuite): self.__verify_impalad_active_catalogd_port(2, catalogd_service_1) # Verify simple queries are ran successfully. self.__run_simple_queries() + # Verify simple queries with sync_ddl as 1. + self.__run_simple_queries(sync_ddl=True) # Restart one coordinator. Verify it get active catalogd address from statestore. self.cluster.impalads[0].restart() @@ -154,6 +158,8 @@ class TestCatalogdHA(CustomClusterTestSuite): self.__verify_impalad_active_catalogd_port(2, catalogd_service_2) # Verify simple queries are ran successfully. self.__run_simple_queries() + # Verify simple queries with sync_ddl as 1. + self.__run_simple_queries(sync_ddl=True) end_count_clear_topic_entries = statestore_service.get_metric_value( "statestore.num-clear-topic-entries-requests")
