This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3a9b604275d1926594b522148851008f5af3dc21 Author: wzhou-code <[email protected]> AuthorDate: Thu Jun 13 16:55:13 2024 -0700 IMPALA-13159: Fix query cancellation caused by statestore failover A momentary inconsistent cluster membership state after statestore failover results in query cancellation. We already have code to handle inconsistent cluster membership after statestore restarting by defining a post-recovery grace period. During the grace period, don't update the current cluster membership so that the inconsistent membership will not be used to cancel queries on coordinators and executors. This patch handles inconsistent cluster membership state after statestore failover in the same way. Testing: - Added a new test case to verify that inconsistent cluster membership after statestore failover will not result in query cancellation. - Fixed closing client issue for Catalogd HA test case test_catalogd_failover_with_sync_ddl when the test fails. - Passed core test. Change-Id: I720bec5199df46475b954558abb0637ca7e6298b Reviewed-on: http://gerrit.cloudera.org:8080/21520 Reviewed-by: Michael Smith <[email protected]> Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/cluster-membership-mgr.cc | 4 +- be/src/statestore/statestore-subscriber.cc | 20 +++-- be/src/statestore/statestore-subscriber.h | 12 ++- tests/custom_cluster/test_catalogd_ha.py | 48 ++++++------ tests/custom_cluster/test_statestored_ha.py | 111 +++++++++++++++++++++++++++- 5 files changed, 162 insertions(+), 33 deletions(-) diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index a296b2f35..72e82ef76 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -213,8 +213,8 @@ void ClusterMembershipMgr::UpdateMembership( BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor(); bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot, local_be_desc); - // We consider the statestore to be recovering from a connection failure until its post - // recovery grace period has elapsed. + // We consider the statestore service to be recovering from a connection failure or + // fail-over until its post recovery grace period has elapsed. bool ss_is_recovering = statestore_subscriber_ != nullptr && statestore_subscriber_->IsInPostRecoveryGracePeriod(); diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 98cb21cb2..2bf0afac2 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -446,8 +446,10 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool is_active, StatestoreStub* tmp = active_statestore_; active_statestore_ = standby_statestore_; standby_statestore_ = tmp; - active_statestore_->SetStatestoreActive(is_active, active_statestored_version); - standby_statestore_->SetStatestoreActive(!is_active, active_statestored_version); + active_statestore_->SetStatestoreActive( + is_active, active_statestored_version, /* has_failover */ true); + standby_statestore_->SetStatestoreActive( + !is_active, active_statestored_version, /* has_failover */ true); LOG(INFO) << "Updated active statestored as " << active_statestore_->GetAddress(); } @@ -1090,10 +1092,13 @@ Status StatestoreSubscriber::StatestoreStub::UpdateState( } bool StatestoreSubscriber::StatestoreStub::IsInPostRecoveryGracePeriod() const { - bool has_failed_before = connection_failure_metric_->GetValue() > 0; - bool in_grace_period = MilliSecondsSinceLastRegistration() + bool has_disconnect_before = connection_failure_metric_->GetValue() > 0; + bool in_disconnect_grace_period = MilliSecondsSinceLastRegistration() < FLAGS_statestore_subscriber_recovery_grace_period_ms; - return has_failed_before && in_grace_period; + bool in_failover_grace_period = MilliSecondsSinceLastFailover() + < FLAGS_statestore_subscriber_recovery_grace_period_ms; + return (has_disconnect_before && in_disconnect_grace_period) + || in_failover_grace_period; } bool StatestoreSubscriber::StatestoreStub::IsRegistered() { @@ -1102,11 +1107,14 @@ bool StatestoreSubscriber::StatestoreStub::IsRegistered() { } void StatestoreSubscriber::StatestoreStub::SetStatestoreActive( - bool is_active, int64_t active_statestored_version) { + bool is_active, int64_t active_statestored_version, bool has_failover) { lock_guard<mutex> l(active_lock_); is_active_ = is_active; DCHECK(active_statestored_version_ <= active_statestored_version); active_statestored_version_ = active_statestored_version; + if (has_failover) { + last_failover_time_.Store(MonotonicMillis()); + } active_status_metric_->SetValue(is_active); } diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h index b492f7e2b..18d289a67 100644 --- a/be/src/statestore/statestore-subscriber.h +++ b/be/src/statestore/statestore-subscriber.h @@ -341,6 +341,12 @@ class StatestoreSubscriber { return MonotonicMillis() - last_registration_ms_.Load(); } + int64_t MilliSecondsSinceLastFailover() const { + int64_t time_ms = MonotonicMillis() - last_failover_time_.Load(); + DCHECK_GE(time_ms, 0); + return time_ms; + } + bool IsInPostRecoveryGracePeriod() const; /// Check if the subscriber is interesting to receive the notification of catalogd @@ -351,7 +357,8 @@ class StatestoreSubscriber { bool IsRegistered(); /// Set the active state of the registered statestore instance. - void SetStatestoreActive(bool is_active, int64_t active_statestored_version); + void SetStatestoreActive(bool is_active, int64_t active_statestored_version, + bool has_failover = false); /// Return the version of active statestore. int64_t GetActiveVersion(bool* is_active); @@ -381,6 +388,9 @@ class StatestoreSubscriber { /// The version of active statestored. int64_t active_statestored_version_ = 0; + /// Monotonic timestamp of the last failover. + AtomicInt64 last_failover_time_{0}; + /// Protects is_active_ and active_statestored_version_. Must be taken after lock_ /// if both are to be taken together. std::mutex active_lock_; diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 37dee5603..da9430458 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -455,29 +455,31 @@ class TestCatalogdHA(CustomClusterTestSuite): # Run DDL with SYNC_DDL enabled. client = self.cluster.impalads[0].service.create_beeswax_client() assert client is not None - self.execute_query_expect_success(client, "set SYNC_DDL=1") - ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)" - handle = client.execute_async(ddl_query.format(database=unique_database)) - - # Restart standby catalogd with force_catalogd_active as true. - start_s = time.time() - catalogds[1].kill() - catalogds[1].start(wait_until_ready=True, - additional_args="--force_catalogd_active=true") - # Wait until original active catalogd becomes in-active. - catalogd_service_1 = catalogds[0].service - catalogd_service_1.wait_for_metric_value( - "catalog-server.active-status", expected_value=False, timeout=15) - assert(not catalogd_service_1.get_metric_value("catalog-server.active-status")) - elapsed_s = time.time() - start_s - assert elapsed_s < SYNC_DDL_DELAY_S, \ - "Catalogd failover took %s seconds to complete" % (elapsed_s) - LOG.info("Catalogd failover took %s seconds to complete" % round(elapsed_s, 1)) - - # Verify that the query is failed due to the Catalogd HA fail-over. - self.wait_for_state( - handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, client=client) - client.close() + try: + self.execute_query_expect_success(client, "set SYNC_DDL=1") + ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)" + handle = client.execute_async(ddl_query.format(database=unique_database)) + + # Restart standby catalogd with force_catalogd_active as true. + start_s = time.time() + catalogds[1].kill() + catalogds[1].start(wait_until_ready=True, + additional_args="--force_catalogd_active=true") + # Wait until original active catalogd becomes in-active. + catalogd_service_1 = catalogds[0].service + catalogd_service_1.wait_for_metric_value( + "catalog-server.active-status", expected_value=False, timeout=15) + assert(not catalogd_service_1.get_metric_value("catalog-server.active-status")) + elapsed_s = time.time() - start_s + assert elapsed_s < SYNC_DDL_DELAY_S, \ + "Catalogd failover took %s seconds to complete" % (elapsed_s) + LOG.info("Catalogd failover took %s seconds to complete" % round(elapsed_s, 1)) + + # Verify that the query is failed due to the Catalogd HA fail-over. + self.wait_for_state( + handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, client=client) + finally: + client.close() @CustomClusterTestSuite.with_args( statestored_args="--use_subscriber_id_as_catalogd_priority=true", diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 1e9ec17f4..6ee77b592 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -18,12 +18,15 @@ from __future__ import absolute_import, division, print_function import logging import pytest +import time +from beeswaxd.BeeswaxService import QueryState +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout from tests.common.impala_cluster import ( DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT) -from tests.common.skip import SkipIfBuildType +from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster from time import sleep from thrift.protocol import TBinaryProtocol @@ -633,6 +636,112 @@ class TestStatestoredHA(CustomClusterTestSuite): "statestore.in-ha-recovery-mode", expected_value=False, timeout=120) assert(not statestore_service_0.get_metric_value("statestore.active-status")) + SUBSCRIBER_TIMEOUT_S = 2 + SS_PEER_TIMEOUT_S = 2 + RECOVERY_GRACE_PERIOD_S = 5 + + @pytest.mark.execute_serially + @SkipIfNotHdfsMinicluster.scheduling + @CustomClusterTestSuite.with_args( + statestored_args="--use_network_address_as_statestore_priority=true " + "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--statestore_peer_timeout_seconds={timeout_s} " + "--use_subscriber_id_as_catalogd_priority=true" + .format(timeout_s=SS_PEER_TIMEOUT_S), + impalad_args="--statestore_subscriber_timeout_seconds={timeout_s} " + "--statestore_subscriber_recovery_grace_period_ms={recovery_period_ms}" + .format(timeout_s=SUBSCRIBER_TIMEOUT_S, + recovery_period_ms=(RECOVERY_GRACE_PERIOD_S * 1000)), + catalogd_args="--statestore_subscriber_timeout_seconds={timeout_s}" + .format(timeout_s=SUBSCRIBER_TIMEOUT_S), + start_args="--enable_statestored_ha --enable_catalogd_ha") + def test_statestore_failover_query_resilience(self): + """Test that a momentary inconsistent cluster membership state after statestore + service fail-over will not result in query cancellation. Also make sure that query + get cancelled if a backend actually went down after recovery grace period.""" + # Verify two statestored instances are created with one in active role. + statestoreds = self.cluster.statestoreds() + assert (len(statestoreds) == 2) + statestore_service_0 = statestoreds[0].service + statestore_service_1 = statestoreds[1].service + assert (statestore_service_0.get_metric_value("statestore.active-status")), \ + "First statestored must be active" + assert (not statestore_service_1.get_metric_value("statestore.active-status")), \ + "Second statestored must not be active" + + slow_query = \ + "select distinct * from tpch_parquet.lineitem where l_orderkey > sleep(1000)" + impalad = self.cluster.impalads[0] + client = impalad.service.create_beeswax_client() + try: + # Run a slow query + handle = client.execute_async(slow_query) + # Make sure query starts running. + self.wait_for_state(handle, QueryState.RUNNING, 120, client) + profile = client.get_runtime_profile(handle) + assert "NumBackends: 3" in profile, profile + # Kill active statestored + statestoreds[0].kill() + # Wait for long enough for the standby statestored to detect the failure of active + # statestored and assign itself in active role. + statestore_service_1.wait_for_metric_value( + "statestore.active-status", expected_value=True, timeout=120) + assert (statestore_service_1.get_metric_value("statestore.active-status")), \ + "Second statestored must be active now" + statestore_service_1.wait_for_live_subscribers(5) + # Wait till the grace period ends + some buffer to verify the slow query is still + # running. + sleep(self.RECOVERY_GRACE_PERIOD_S + 1) + assert client.get_state(handle) == QueryState.RUNNING, \ + "Query expected to be in running state" + # Now kill a backend, and make sure the query fails. + self.cluster.impalads[2].kill() + try: + client.wait_for_finished_timeout(handle, 100) + assert False, "Query expected to fail" + except ImpalaBeeswaxException as e: + assert "Failed due to unreachable impalad" in str(e), str(e) + + # Restart original active statestored. Verify that the statestored does not resume + # its active role. + statestoreds[0].start(wait_until_ready=True) + statestore_service_0.wait_for_metric_value( + "statestore.active-status", expected_value=False, timeout=120) + assert (not statestore_service_0.get_metric_value("statestore.active-status")), \ + "First statestored must not be active" + assert (statestore_service_1.get_metric_value("statestore.active-status")), \ + "Second statestored must be active" + # Run a slow query + handle = client.execute_async(slow_query) + # Make sure query starts running. + self.wait_for_state(handle, QueryState.RUNNING, 120, client) + profile = client.get_runtime_profile(handle) + assert "NumBackends: 2" in profile, profile + # Kill current active statestored + start_time = time.time() + statestoreds[1].kill() + # Wait till the standby statestored becomes active. + query_state = client.get_state(handle) + assert query_state == QueryState.RUNNING + statestore_service_0.wait_for_metric_value( + "statestore.active-status", expected_value=True, timeout=120) + assert (statestore_service_0.get_metric_value("statestore.active-status")), \ + "First statestored must be active now" + # Kill one backend + self.cluster.impalads[1].kill() + # Verify that it has to wait longer than the RECOVERY_GRACE_PERIOD_S for the + # query to fail. Combine failover time (SS_PEER_TIMEOUT_S) and recovery grace + # period (RECOVERY_GRACE_PERIOD_S) to avoid flaky test. + timeout_s = self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S * 2 + self.wait_for_state(handle, QueryState.EXCEPTION, timeout_s, client) + client.close_query(handle) + elapsed_s = time.time() - start_time + assert elapsed_s >= self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S, \ + ("Query was canceled in %s seconds, less than failover time + grace-period" + % (elapsed_s)) + finally: + client.close() + class TestStatestoredHAStartupDelay(CustomClusterTestSuite): """This test injects a real delay in statestored startup. The impalads and catalogd are
