This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit a45dd963a94dad20633c75181797baa934cd69e1 Author: wzhou-code <[email protected]> AuthorDate: Tue Jun 4 21:25:57 2024 -0700 IMPALA-13134: DDL hang with SYNC_DDL enabled when Catalogd is changed to standby status Catalogd waits for SYNC_DDL version when it processes a DDL with SYNC_DDL enabled. If the status of Catalogd is changed from active to standby when CatalogServiceCatalog.waitForSyncDdlVersion() is called, the standby catalogd does not receive catalog topic updates from statestore, hence catalogd thread waits indefinitely. This patch fixed the issue by re-generating service id when Catalogd is changed to standby status and throwing exception if its service id has been changed when waiting for SYNC_DDL version. Testing: - Added unit-test code for CatalogD HA to run DDL with SYNC_DDL enabled and injected delay when waiting SYNC_DLL version, then verify that DDL query fails due to catalog failover. - Passed test_catalogd_ha.py. Change-Id: I2dcd628cff3c10d2e7566ba2d9de0b5886a18fc1 Reviewed-on: http://gerrit.cloudera.org:8080/21480 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 2 ++ be/src/util/backend-gflag-util.cc | 2 ++ common/thrift/BackendGflags.thrift | 2 ++ .../impala/catalog/CatalogServiceCatalog.java | 22 +++++++++++++ .../org/apache/impala/service/BackendConfig.java | 4 +++ .../java/org/apache/impala/util/DebugUtils.java | 4 +++ tests/custom_cluster/test_catalogd_ha.py | 37 ++++++++++++++++++++++ 7 files changed, 73 insertions(+) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 5f318278a..d9d8cd0e6 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -713,6 +713,8 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply, << TNetworkAddressToString(catalogd_registration.address) << ", active_catalogd_version: " << active_catalogd_version; + // Regenerate Catalog Service ID. + catalog_->RegenerateServiceId(); } } } diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 48bb739dc..d7cf69125 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -128,6 +128,7 @@ DECLARE_string(common_hms_event_types); DECLARE_int32(dbcp_max_conn_pool_size); DECLARE_int32(dbcp_max_wait_millis_for_conn); DECLARE_int32(dbcp_data_source_idle_timeout_s); +DECLARE_bool(enable_catalogd_ha); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -491,6 +492,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { #else cfg.__set_is_release_build(false); #endif + cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 122d6ea5e..234d0f231 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -306,4 +306,6 @@ struct TBackendGflags { 137: required i32 dbcp_data_source_idle_timeout 138: required bool is_release_build + + 139: required bool enable_catalogd_ha } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index b58970ea7..666eb8b4a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -132,9 +132,11 @@ import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TTableType; import org.apache.impala.thrift.TTableUsage; import org.apache.impala.thrift.TTableUsageMetrics; +import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateTableUsageRequest; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.CatalogBlacklistUtils; +import org.apache.impala.util.DebugUtils; import org.apache.impala.util.EventSequence; import org.apache.impala.util.FunctionUtils; import org.apache.impala.util.NoOpEventSequence; @@ -152,6 +154,7 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -3499,10 +3502,29 @@ public class CatalogServiceCatalog extends Catalog { long numAttempts = 0; long begin = System.currentTimeMillis(); long versionToWaitFor = -1; + TUniqueId serviceId = JniCatalog.getServiceId(); while (versionToWaitFor == -1) { if (LOG.isTraceEnabled()) { LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts); } + if (BackendConfig.INSTANCE.isCatalogdHAEnabled()) { + // Catalog serviceId is changed when the HA role of the catalog instance is + // changed from active to standby, or from standby to active. Inactive catalogd + // does not receive catalog topic updates from the statestore. To avoid waiting + // indefinitely, throw exception if its service id has been changed. + if (!Strings.isNullOrEmpty(BackendConfig.INSTANCE.debugActions())) { + DebugUtils.executeDebugAction( + BackendConfig.INSTANCE.debugActions(), DebugUtils.WAIT_SYNC_DDL_VER_DELAY); + } + if (!serviceId.equals(JniCatalog.getServiceId())) { + String errorMsg = "Couldn't retrieve the catalog topic update for the " + + "SYNC_DDL operation since HA role of this catalog instance has been " + + "changed. The operation has been successfully executed but its effects " + + "may have not been broadcast to all the coordinators."; + LOG.error(errorMsg); + throw new CatalogException(errorMsg); + } + } // Examine the topic update log to determine the latest topic update that // covers the added/modified/deleted objects in 'result'. long topicVersionForUpdates = diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 6325176bc..326ee5ebf 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -525,4 +525,8 @@ public class BackendConfig { public boolean isReleaseBuild() { return backendCfg_.is_release_build; } + + public boolean isCatalogdHAEnabled() { + return backendCfg_.enable_catalogd_ha; + } } diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 67b05e4e4..141faf80e 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -86,6 +86,10 @@ public class DebugUtils { // debug action to enable eventProcessor public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor"; + // debug action label to inject a delay when waiting SYNC DDL version + public static final String WAIT_SYNC_DDL_VER_DELAY = + "catalogd_wait_sync_ddl_version_delay"; + /** * Returns true if the label of action is set in the debugActions */ diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 244b8a58f..ae1f7d2f9 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function import logging import re +from beeswaxd.BeeswaxService import QueryState from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout from tests.util.filesystem_utils import get_fs_path @@ -427,6 +428,42 @@ class TestCatalogdHA(CustomClusterTestSuite): # Verify simple queries are ran successfully. self.__run_simple_queries() + @CustomClusterTestSuite.with_args( + statestored_args="--use_subscriber_id_as_catalogd_priority=true", + catalogd_args="--debug_actions='catalogd_wait_sync_ddl_version_delay:SLEEP@5000'", + start_args="--enable_catalogd_ha") + def test_catalogd_failover_with_sync_ddl(self, unique_database): + """Tests for Catalog Service force fail-over when running DDL with SYNC_DDL + enabled.""" + # Verify two catalogd instances are created with one as active. + catalogds = self.cluster.catalogds() + assert(len(catalogds) == 2) + catalogd_service_1 = catalogds[0].service + catalogd_service_2 = catalogds[1].service + assert(catalogd_service_1.get_metric_value("catalog-server.active-status")) + assert(not catalogd_service_2.get_metric_value("catalog-server.active-status")) + + # Run DDL with SYNC_DDL enabled. + client = self.cluster.impalads[0].service.create_beeswax_client() + assert client is not None + self.execute_query_expect_success(client, "set SYNC_DDL=1") + ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)" + handle = client.execute_async(ddl_query.format(database=unique_database)) + + # Restart standby catalogd with force_catalogd_active as true. + catalogds[1].kill() + catalogds[1].start(wait_until_ready=True, + additional_args="--force_catalogd_active=true") + # Wait until original active catalogd becomes in-active. + catalogd_service_1 = catalogds[0].service + catalogd_service_1.wait_for_metric_value( + "catalog-server.active-status", expected_value=False, timeout=15) + assert(not catalogd_service_1.get_metric_value("catalog-server.active-status")) + + # Verify that the query is failed due to the Catalogd HA fail-over. + self.wait_for_state(handle, QueryState.EXCEPTION, 30, client=client) + client.close() + @CustomClusterTestSuite.with_args( statestored_args="--use_subscriber_id_as_catalogd_priority=true", catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",
