This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c2c27c68 IMPALA-13159: Fix query cancellation caused by statestore 
failover
9c2c27c68 is described below

commit 9c2c27c68ce27b6a6d227379581ac39a34f8f348
Author: wzhou-code <[email protected]>
AuthorDate: Thu Jun 13 16:55:13 2024 -0700

    IMPALA-13159: Fix query cancellation caused by statestore failover
    
    A momentary inconsistent cluster membership state after statestore
    failover results in query cancellation.
    We already have code to handle inconsistent cluster membership after
    statestore restarting by defining a post-recovery grace period. During
    the grace period, don't update the current cluster membership so that
    the inconsistent membership will not be used to cancel queries on
    coordinators and executors.
    This patch handles inconsistent cluster membership state after
    statestore failover in the same way.
    
    Testing:
     - Added a new test case to verify that inconsistent cluster
       membership after statestore failover will not result in query
       cancellation.
     - Fixed closing client issue for Catalogd HA test case
       test_catalogd_failover_with_sync_ddl when the test fails.
     - Passed core test.
    
    Change-Id: I720bec5199df46475b954558abb0637ca7e6298b
    Reviewed-on: http://gerrit.cloudera.org:8080/21520
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/cluster-membership-mgr.cc |   4 +-
 be/src/statestore/statestore-subscriber.cc  |  20 +++--
 be/src/statestore/statestore-subscriber.h   |  12 ++-
 tests/custom_cluster/test_catalogd_ha.py    |  48 ++++++------
 tests/custom_cluster/test_statestored_ha.py | 111 +++++++++++++++++++++++++++-
 5 files changed, 162 insertions(+), 33 deletions(-)

diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index d01787752..89024ef97 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -213,8 +213,8 @@ void ClusterMembershipMgr::UpdateMembership(
   BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor();
   bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot, 
local_be_desc);
 
-  // We consider the statestore to be recovering from a connection failure 
until its post
-  // recovery grace period has elapsed.
+  // We consider the statestore service to be recovering from a connection 
failure or
+  // fail-over until its post recovery grace period has elapsed.
   bool ss_is_recovering = statestore_subscriber_ != nullptr
       && statestore_subscriber_->IsInPostRecoveryGracePeriod();
 
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index 98cb21cb2..2bf0afac2 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -446,8 +446,10 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool 
is_active,
       StatestoreStub* tmp = active_statestore_;
       active_statestore_ = standby_statestore_;
       standby_statestore_ = tmp;
-      active_statestore_->SetStatestoreActive(is_active, 
active_statestored_version);
-      standby_statestore_->SetStatestoreActive(!is_active, 
active_statestored_version);
+      active_statestore_->SetStatestoreActive(
+          is_active, active_statestored_version, /* has_failover */ true);
+      standby_statestore_->SetStatestoreActive(
+          !is_active, active_statestored_version, /* has_failover */ true);
       LOG(INFO) << "Updated active statestored as " << 
active_statestore_->GetAddress();
     }
 
@@ -1090,10 +1092,13 @@ Status 
StatestoreSubscriber::StatestoreStub::UpdateState(
 }
 
 bool StatestoreSubscriber::StatestoreStub::IsInPostRecoveryGracePeriod() const 
{
-  bool has_failed_before = connection_failure_metric_->GetValue() > 0;
-  bool in_grace_period = MilliSecondsSinceLastRegistration()
+  bool has_disconnect_before = connection_failure_metric_->GetValue() > 0;
+  bool in_disconnect_grace_period = MilliSecondsSinceLastRegistration()
       < FLAGS_statestore_subscriber_recovery_grace_period_ms;
-  return has_failed_before && in_grace_period;
+  bool in_failover_grace_period = MilliSecondsSinceLastFailover()
+      < FLAGS_statestore_subscriber_recovery_grace_period_ms;
+  return (has_disconnect_before && in_disconnect_grace_period)
+      || in_failover_grace_period;
 }
 
 bool StatestoreSubscriber::StatestoreStub::IsRegistered() {
@@ -1102,11 +1107,14 @@ bool 
StatestoreSubscriber::StatestoreStub::IsRegistered() {
 }
 
 void StatestoreSubscriber::StatestoreStub::SetStatestoreActive(
-    bool is_active, int64_t active_statestored_version) {
+    bool is_active, int64_t active_statestored_version, bool has_failover) {
   lock_guard<mutex> l(active_lock_);
   is_active_ = is_active;
   DCHECK(active_statestored_version_ <= active_statestored_version);
   active_statestored_version_ = active_statestored_version;
+  if (has_failover) {
+    last_failover_time_.Store(MonotonicMillis());
+  }
   active_status_metric_->SetValue(is_active);
 }
 
diff --git a/be/src/statestore/statestore-subscriber.h 
b/be/src/statestore/statestore-subscriber.h
index b492f7e2b..18d289a67 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -341,6 +341,12 @@ class StatestoreSubscriber {
       return MonotonicMillis() - last_registration_ms_.Load();
     }
 
+    int64_t MilliSecondsSinceLastFailover() const {
+      int64_t time_ms = MonotonicMillis() - last_failover_time_.Load();
+      DCHECK_GE(time_ms, 0);
+      return time_ms;
+    }
+
     bool IsInPostRecoveryGracePeriod() const;
 
     /// Check if the subscriber is interesting to receive the notification of 
catalogd
@@ -351,7 +357,8 @@ class StatestoreSubscriber {
     bool IsRegistered();
 
     /// Set the active state of the registered statestore instance.
-    void SetStatestoreActive(bool is_active, int64_t 
active_statestored_version);
+    void SetStatestoreActive(bool is_active, int64_t 
active_statestored_version,
+        bool has_failover = false);
 
     /// Return the version of active statestore.
     int64_t GetActiveVersion(bool* is_active);
@@ -381,6 +388,9 @@ class StatestoreSubscriber {
     /// The version of active statestored.
     int64_t active_statestored_version_ = 0;
 
+    /// Monotonic timestamp of the last failover.
+    AtomicInt64 last_failover_time_{0};
+
     /// Protects is_active_ and active_statestored_version_. Must be taken 
after lock_
     /// if both are to be taken together.
     std::mutex active_lock_;
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 10c1109a6..675585674 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -463,29 +463,31 @@ class TestCatalogdHA(CustomClusterTestSuite):
     # Run DDL with SYNC_DDL enabled.
     client = self.cluster.impalads[0].service.create_beeswax_client()
     assert client is not None
-    self.execute_query_expect_success(client, "set SYNC_DDL=1")
-    ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
-    handle = client.execute_async(ddl_query.format(database=unique_database))
-
-    # Restart standby catalogd with force_catalogd_active as true.
-    start_s = time.time()
-    catalogds[1].kill()
-    catalogds[1].start(wait_until_ready=True,
-                       additional_args="--force_catalogd_active=true")
-    # Wait until original active catalogd becomes in-active.
-    catalogd_service_1 = catalogds[0].service
-    catalogd_service_1.wait_for_metric_value(
-        "catalog-server.active-status", expected_value=False, timeout=15)
-    assert(not 
catalogd_service_1.get_metric_value("catalog-server.active-status"))
-    elapsed_s = time.time() - start_s
-    assert elapsed_s < SYNC_DDL_DELAY_S, \
-        "Catalogd failover took %s seconds to complete" % (elapsed_s)
-    LOG.info("Catalogd failover took %s seconds to complete" % 
round(elapsed_s, 1))
-
-    # Verify that the query is failed due to the Catalogd HA fail-over.
-    self.wait_for_state(
-        handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, client=client)
-    client.close()
+    try:
+      self.execute_query_expect_success(client, "set SYNC_DDL=1")
+      ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
+      handle = client.execute_async(ddl_query.format(database=unique_database))
+
+      # Restart standby catalogd with force_catalogd_active as true.
+      start_s = time.time()
+      catalogds[1].kill()
+      catalogds[1].start(wait_until_ready=True,
+                         additional_args="--force_catalogd_active=true")
+      # Wait until original active catalogd becomes in-active.
+      catalogd_service_1 = catalogds[0].service
+      catalogd_service_1.wait_for_metric_value(
+          "catalog-server.active-status", expected_value=False, timeout=15)
+      assert(not 
catalogd_service_1.get_metric_value("catalog-server.active-status"))
+      elapsed_s = time.time() - start_s
+      assert elapsed_s < SYNC_DDL_DELAY_S, \
+          "Catalogd failover took %s seconds to complete" % (elapsed_s)
+      LOG.info("Catalogd failover took %s seconds to complete" % 
round(elapsed_s, 1))
+
+      # Verify that the query is failed due to the Catalogd HA fail-over.
+      self.wait_for_state(
+          handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, 
client=client)
+    finally:
+      client.close()
 
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 1e9ec17f4..6ee77b592 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -18,12 +18,15 @@
 from __future__ import absolute_import, division, print_function
 import logging
 import pytest
+import time
 
+from beeswaxd.BeeswaxService import QueryState
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_cluster import (
     DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT)
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster
 from time import sleep
 
 from thrift.protocol import TBinaryProtocol
@@ -633,6 +636,112 @@ class TestStatestoredHA(CustomClusterTestSuite):
         "statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
     assert(not 
statestore_service_0.get_metric_value("statestore.active-status"))
 
+  SUBSCRIBER_TIMEOUT_S = 2
+  SS_PEER_TIMEOUT_S = 2
+  RECOVERY_GRACE_PERIOD_S = 5
+
+  @pytest.mark.execute_serially
+  @SkipIfNotHdfsMinicluster.scheduling
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_network_address_as_statestore_priority=true "
+                     "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--statestore_peer_timeout_seconds={timeout_s} "
+                     "--use_subscriber_id_as_catalogd_priority=true"
+                     .format(timeout_s=SS_PEER_TIMEOUT_S),
+    impalad_args="--statestore_subscriber_timeout_seconds={timeout_s} "
+                 
"--statestore_subscriber_recovery_grace_period_ms={recovery_period_ms}"
+                 .format(timeout_s=SUBSCRIBER_TIMEOUT_S,
+                         recovery_period_ms=(RECOVERY_GRACE_PERIOD_S * 1000)),
+    catalogd_args="--statestore_subscriber_timeout_seconds={timeout_s}"
+                  .format(timeout_s=SUBSCRIBER_TIMEOUT_S),
+    start_args="--enable_statestored_ha --enable_catalogd_ha")
+  def test_statestore_failover_query_resilience(self):
+    """Test that a momentary inconsistent cluster membership state after 
statestore
+    service fail-over will not result in query cancellation. Also make sure 
that query
+    get cancelled if a backend actually went down after recovery grace 
period."""
+    # Verify two statestored instances are created with one in active role.
+    statestoreds = self.cluster.statestoreds()
+    assert (len(statestoreds) == 2)
+    statestore_service_0 = statestoreds[0].service
+    statestore_service_1 = statestoreds[1].service
+    assert 
(statestore_service_0.get_metric_value("statestore.active-status")), \
+        "First statestored must be active"
+    assert (not 
statestore_service_1.get_metric_value("statestore.active-status")), \
+        "Second statestored must not be active"
+
+    slow_query = \
+        "select distinct * from tpch_parquet.lineitem where l_orderkey > 
sleep(1000)"
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    try:
+      # Run a slow query
+      handle = client.execute_async(slow_query)
+      # Make sure query starts running.
+      self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+      profile = client.get_runtime_profile(handle)
+      assert "NumBackends: 3" in profile, profile
+      # Kill active statestored
+      statestoreds[0].kill()
+      # Wait for long enough for the standby statestored to detect the failure 
of active
+      # statestored and assign itself in active role.
+      statestore_service_1.wait_for_metric_value(
+          "statestore.active-status", expected_value=True, timeout=120)
+      assert 
(statestore_service_1.get_metric_value("statestore.active-status")), \
+          "Second statestored must be active now"
+      statestore_service_1.wait_for_live_subscribers(5)
+      # Wait till the grace period ends + some buffer to verify the slow query 
is still
+      # running.
+      sleep(self.RECOVERY_GRACE_PERIOD_S + 1)
+      assert client.get_state(handle) == QueryState.RUNNING, \
+          "Query expected to be in running state"
+      # Now kill a backend, and make sure the query fails.
+      self.cluster.impalads[2].kill()
+      try:
+        client.wait_for_finished_timeout(handle, 100)
+        assert False, "Query expected to fail"
+      except ImpalaBeeswaxException as e:
+        assert "Failed due to unreachable impalad" in str(e), str(e)
+
+      # Restart original active statestored. Verify that the statestored does 
not resume
+      # its active role.
+      statestoreds[0].start(wait_until_ready=True)
+      statestore_service_0.wait_for_metric_value(
+          "statestore.active-status", expected_value=False, timeout=120)
+      assert (not 
statestore_service_0.get_metric_value("statestore.active-status")), \
+          "First statestored must not be active"
+      assert 
(statestore_service_1.get_metric_value("statestore.active-status")), \
+          "Second statestored must be active"
+      # Run a slow query
+      handle = client.execute_async(slow_query)
+      # Make sure query starts running.
+      self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+      profile = client.get_runtime_profile(handle)
+      assert "NumBackends: 2" in profile, profile
+      # Kill current active statestored
+      start_time = time.time()
+      statestoreds[1].kill()
+      # Wait till the standby statestored becomes active.
+      query_state = client.get_state(handle)
+      assert query_state == QueryState.RUNNING
+      statestore_service_0.wait_for_metric_value(
+          "statestore.active-status", expected_value=True, timeout=120)
+      assert 
(statestore_service_0.get_metric_value("statestore.active-status")), \
+          "First statestored must be active now"
+      # Kill one backend
+      self.cluster.impalads[1].kill()
+      # Verify that it has to wait longer than the RECOVERY_GRACE_PERIOD_S for 
the
+      # query to fail. Combine failover time (SS_PEER_TIMEOUT_S) and recovery 
grace
+      # period (RECOVERY_GRACE_PERIOD_S) to avoid flaky test.
+      timeout_s = self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S * 2
+      self.wait_for_state(handle, QueryState.EXCEPTION, timeout_s, client)
+      client.close_query(handle)
+      elapsed_s = time.time() - start_time
+      assert elapsed_s >= self.SS_PEER_TIMEOUT_S + 
self.RECOVERY_GRACE_PERIOD_S, \
+          ("Query was canceled in %s seconds, less than failover time + 
grace-period"
+           % (elapsed_s))
+    finally:
+      client.close()
+
 
 class TestStatestoredHAStartupDelay(CustomClusterTestSuite):
   """This test injects a real delay in statestored startup. The impalads and 
catalogd are

Reply via email to