This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a45dd963a94dad20633c75181797baa934cd69e1
Author: wzhou-code <[email protected]>
AuthorDate: Tue Jun 4 21:25:57 2024 -0700

    IMPALA-13134: DDL hang with SYNC_DDL enabled when Catalogd is changed to 
standby status
    
    Catalogd waits for SYNC_DDL version when it processes a DDL with
    SYNC_DDL enabled. If the status of Catalogd is changed from active to
    standby when CatalogServiceCatalog.waitForSyncDdlVersion() is called,
    the standby catalogd does not receive catalog topic updates from
    statestore, hence catalogd thread waits indefinitely.
    
    This patch fixed the issue by re-generating service id when Catalogd
    is changed to standby status and throwing exception if its service id
    has been changed when waiting for SYNC_DDL version.
    
    Testing:
     - Added unit-test code for CatalogD HA to run DDL with SYNC_DDL enabled
       and injected delay when waiting SYNC_DLL version, then verify that
       DDL query fails due to catalog failover.
     - Passed test_catalogd_ha.py.
    
    Change-Id: I2dcd628cff3c10d2e7566ba2d9de0b5886a18fc1
    Reviewed-on: http://gerrit.cloudera.org:8080/21480
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  2 ++
 be/src/util/backend-gflag-util.cc                  |  2 ++
 common/thrift/BackendGflags.thrift                 |  2 ++
 .../impala/catalog/CatalogServiceCatalog.java      | 22 +++++++++++++
 .../org/apache/impala/service/BackendConfig.java   |  4 +++
 .../java/org/apache/impala/util/DebugUtils.java    |  4 +++
 tests/custom_cluster/test_catalogd_ha.py           | 37 ++++++++++++++++++++++
 7 files changed, 73 insertions(+)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 5f318278a..d9d8cd0e6 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -713,6 +713,8 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
                 << TNetworkAddressToString(catalogd_registration.address)
                 << ", active_catalogd_version: "
                 << active_catalogd_version;
+      // Regenerate Catalog Service ID.
+      catalog_->RegenerateServiceId();
     }
   }
 }
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 48bb739dc..d7cf69125 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -128,6 +128,7 @@ DECLARE_string(common_hms_event_types);
 DECLARE_int32(dbcp_max_conn_pool_size);
 DECLARE_int32(dbcp_max_wait_millis_for_conn);
 DECLARE_int32(dbcp_data_source_idle_timeout_s);
+DECLARE_bool(enable_catalogd_ha);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -491,6 +492,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
 #else
   cfg.__set_is_release_build(false);
 #endif
+  cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 122d6ea5e..234d0f231 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -306,4 +306,6 @@ struct TBackendGflags {
   137: required i32 dbcp_data_source_idle_timeout
 
   138: required bool is_release_build
+
+  139: required bool enable_catalogd_ha
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b58970ea7..666eb8b4a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -132,9 +132,11 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.thrift.TTableUsage;
 import org.apache.impala.thrift.TTableUsageMetrics;
+import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.CatalogBlacklistUtils;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.NoOpEventSequence;
@@ -152,6 +154,7 @@ import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -3499,10 +3502,29 @@ public class CatalogServiceCatalog extends Catalog {
     long numAttempts = 0;
     long begin = System.currentTimeMillis();
     long versionToWaitFor = -1;
+    TUniqueId serviceId = JniCatalog.getServiceId();
     while (versionToWaitFor == -1) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
       }
+      if (BackendConfig.INSTANCE.isCatalogdHAEnabled()) {
+        // Catalog serviceId is changed when the HA role of the catalog 
instance is
+        // changed from active to standby, or from standby to active. Inactive 
catalogd
+        // does not receive catalog topic updates from the statestore. To 
avoid waiting
+        // indefinitely, throw exception if its service id has been changed.
+        if (!Strings.isNullOrEmpty(BackendConfig.INSTANCE.debugActions())) {
+          DebugUtils.executeDebugAction(
+              BackendConfig.INSTANCE.debugActions(), 
DebugUtils.WAIT_SYNC_DDL_VER_DELAY);
+        }
+        if (!serviceId.equals(JniCatalog.getServiceId())) {
+          String errorMsg = "Couldn't retrieve the catalog topic update for 
the " +
+              "SYNC_DDL operation since HA role of this catalog instance has 
been " +
+              "changed. The operation has been successfully executed but its 
effects " +
+              "may have not been broadcast to all the coordinators.";
+          LOG.error(errorMsg);
+          throw new CatalogException(errorMsg);
+        }
+      }
       // Examine the topic update log to determine the latest topic update that
       // covers the added/modified/deleted objects in 'result'.
       long topicVersionForUpdates =
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 6325176bc..326ee5ebf 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -525,4 +525,8 @@ public class BackendConfig {
   public boolean isReleaseBuild() {
     return backendCfg_.is_release_build;
   }
+
+  public boolean isCatalogdHAEnabled() {
+    return backendCfg_.enable_catalogd_ha;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 67b05e4e4..141faf80e 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -86,6 +86,10 @@ public class DebugUtils {
   // debug action to enable eventProcessor
   public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
 
+  // debug action label to inject a delay when waiting SYNC DDL version
+  public static final String WAIT_SYNC_DDL_VER_DELAY =
+      "catalogd_wait_sync_ddl_version_delay";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 244b8a58f..ae1f7d2f9 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import re
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
 from tests.util.filesystem_utils import get_fs_path
@@ -427,6 +428,42 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    
catalogd_args="--debug_actions='catalogd_wait_sync_ddl_version_delay:SLEEP@5000'",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_failover_with_sync_ddl(self, unique_database):
+    """Tests for Catalog Service force fail-over when running DDL with SYNC_DDL
+    enabled."""
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Run DDL with SYNC_DDL enabled.
+    client = self.cluster.impalads[0].service.create_beeswax_client()
+    assert client is not None
+    self.execute_query_expect_success(client, "set SYNC_DDL=1")
+    ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
+    handle = client.execute_async(ddl_query.format(database=unique_database))
+
+    # Restart standby catalogd with force_catalogd_active as true.
+    catalogds[1].kill()
+    catalogds[1].start(wait_until_ready=True,
+                       additional_args="--force_catalogd_active=true")
+    # Wait until original active catalogd becomes in-active.
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_1.wait_for_metric_value(
+        "catalog-server.active-status", expected_value=False, timeout=15)
+    assert(not 
catalogd_service_1.get_metric_value("catalog-server.active-status"))
+
+    # Verify that the query is failed due to the Catalogd HA fail-over.
+    self.wait_for_state(handle, QueryState.EXCEPTION, 30, client=client)
+    client.close()
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",

Reply via email to