This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 9c2c27c68 IMPALA-13159: Fix query cancellation caused by statestore
failover
9c2c27c68 is described below
commit 9c2c27c68ce27b6a6d227379581ac39a34f8f348
Author: wzhou-code <[email protected]>
AuthorDate: Thu Jun 13 16:55:13 2024 -0700
IMPALA-13159: Fix query cancellation caused by statestore failover
A momentary inconsistent cluster membership state after statestore
failover results in query cancellation.
We already have code to handle inconsistent cluster membership after
statestore restarting by defining a post-recovery grace period. During
the grace period, don't update the current cluster membership so that
the inconsistent membership will not be used to cancel queries on
coordinators and executors.
This patch handles inconsistent cluster membership state after
statestore failover in the same way.
Testing:
- Added a new test case to verify that inconsistent cluster
membership after statestore failover will not result in query
cancellation.
- Fixed closing client issue for Catalogd HA test case
test_catalogd_failover_with_sync_ddl when the test fails.
- Passed core test.
Change-Id: I720bec5199df46475b954558abb0637ca7e6298b
Reviewed-on: http://gerrit.cloudera.org:8080/21520
Reviewed-by: Michael Smith <[email protected]>
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/scheduling/cluster-membership-mgr.cc | 4 +-
be/src/statestore/statestore-subscriber.cc | 20 +++--
be/src/statestore/statestore-subscriber.h | 12 ++-
tests/custom_cluster/test_catalogd_ha.py | 48 ++++++------
tests/custom_cluster/test_statestored_ha.py | 111 +++++++++++++++++++++++++++-
5 files changed, 162 insertions(+), 33 deletions(-)
diff --git a/be/src/scheduling/cluster-membership-mgr.cc
b/be/src/scheduling/cluster-membership-mgr.cc
index d01787752..89024ef97 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -213,8 +213,8 @@ void ClusterMembershipMgr::UpdateMembership(
BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor();
bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot,
local_be_desc);
- // We consider the statestore to be recovering from a connection failure
until its post
- // recovery grace period has elapsed.
+ // We consider the statestore service to be recovering from a connection
failure or
+ // fail-over until its post recovery grace period has elapsed.
bool ss_is_recovering = statestore_subscriber_ != nullptr
&& statestore_subscriber_->IsInPostRecoveryGracePeriod();
diff --git a/be/src/statestore/statestore-subscriber.cc
b/be/src/statestore/statestore-subscriber.cc
index 98cb21cb2..2bf0afac2 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -446,8 +446,10 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool
is_active,
StatestoreStub* tmp = active_statestore_;
active_statestore_ = standby_statestore_;
standby_statestore_ = tmp;
- active_statestore_->SetStatestoreActive(is_active,
active_statestored_version);
- standby_statestore_->SetStatestoreActive(!is_active,
active_statestored_version);
+ active_statestore_->SetStatestoreActive(
+ is_active, active_statestored_version, /* has_failover */ true);
+ standby_statestore_->SetStatestoreActive(
+ !is_active, active_statestored_version, /* has_failover */ true);
LOG(INFO) << "Updated active statestored as " <<
active_statestore_->GetAddress();
}
@@ -1090,10 +1092,13 @@ Status
StatestoreSubscriber::StatestoreStub::UpdateState(
}
bool StatestoreSubscriber::StatestoreStub::IsInPostRecoveryGracePeriod() const
{
- bool has_failed_before = connection_failure_metric_->GetValue() > 0;
- bool in_grace_period = MilliSecondsSinceLastRegistration()
+ bool has_disconnect_before = connection_failure_metric_->GetValue() > 0;
+ bool in_disconnect_grace_period = MilliSecondsSinceLastRegistration()
< FLAGS_statestore_subscriber_recovery_grace_period_ms;
- return has_failed_before && in_grace_period;
+ bool in_failover_grace_period = MilliSecondsSinceLastFailover()
+ < FLAGS_statestore_subscriber_recovery_grace_period_ms;
+ return (has_disconnect_before && in_disconnect_grace_period)
+ || in_failover_grace_period;
}
bool StatestoreSubscriber::StatestoreStub::IsRegistered() {
@@ -1102,11 +1107,14 @@ bool
StatestoreSubscriber::StatestoreStub::IsRegistered() {
}
void StatestoreSubscriber::StatestoreStub::SetStatestoreActive(
- bool is_active, int64_t active_statestored_version) {
+ bool is_active, int64_t active_statestored_version, bool has_failover) {
lock_guard<mutex> l(active_lock_);
is_active_ = is_active;
DCHECK(active_statestored_version_ <= active_statestored_version);
active_statestored_version_ = active_statestored_version;
+ if (has_failover) {
+ last_failover_time_.Store(MonotonicMillis());
+ }
active_status_metric_->SetValue(is_active);
}
diff --git a/be/src/statestore/statestore-subscriber.h
b/be/src/statestore/statestore-subscriber.h
index b492f7e2b..18d289a67 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -341,6 +341,12 @@ class StatestoreSubscriber {
return MonotonicMillis() - last_registration_ms_.Load();
}
+ int64_t MilliSecondsSinceLastFailover() const {
+ int64_t time_ms = MonotonicMillis() - last_failover_time_.Load();
+ DCHECK_GE(time_ms, 0);
+ return time_ms;
+ }
+
bool IsInPostRecoveryGracePeriod() const;
/// Check if the subscriber is interesting to receive the notification of
catalogd
@@ -351,7 +357,8 @@ class StatestoreSubscriber {
bool IsRegistered();
/// Set the active state of the registered statestore instance.
- void SetStatestoreActive(bool is_active, int64_t
active_statestored_version);
+ void SetStatestoreActive(bool is_active, int64_t
active_statestored_version,
+ bool has_failover = false);
/// Return the version of active statestore.
int64_t GetActiveVersion(bool* is_active);
@@ -381,6 +388,9 @@ class StatestoreSubscriber {
/// The version of active statestored.
int64_t active_statestored_version_ = 0;
+ /// Monotonic timestamp of the last failover.
+ AtomicInt64 last_failover_time_{0};
+
/// Protects is_active_ and active_statestored_version_. Must be taken
after lock_
/// if both are to be taken together.
std::mutex active_lock_;
diff --git a/tests/custom_cluster/test_catalogd_ha.py
b/tests/custom_cluster/test_catalogd_ha.py
index 10c1109a6..675585674 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -463,29 +463,31 @@ class TestCatalogdHA(CustomClusterTestSuite):
# Run DDL with SYNC_DDL enabled.
client = self.cluster.impalads[0].service.create_beeswax_client()
assert client is not None
- self.execute_query_expect_success(client, "set SYNC_DDL=1")
- ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
- handle = client.execute_async(ddl_query.format(database=unique_database))
-
- # Restart standby catalogd with force_catalogd_active as true.
- start_s = time.time()
- catalogds[1].kill()
- catalogds[1].start(wait_until_ready=True,
- additional_args="--force_catalogd_active=true")
- # Wait until original active catalogd becomes in-active.
- catalogd_service_1 = catalogds[0].service
- catalogd_service_1.wait_for_metric_value(
- "catalog-server.active-status", expected_value=False, timeout=15)
- assert(not
catalogd_service_1.get_metric_value("catalog-server.active-status"))
- elapsed_s = time.time() - start_s
- assert elapsed_s < SYNC_DDL_DELAY_S, \
- "Catalogd failover took %s seconds to complete" % (elapsed_s)
- LOG.info("Catalogd failover took %s seconds to complete" %
round(elapsed_s, 1))
-
- # Verify that the query is failed due to the Catalogd HA fail-over.
- self.wait_for_state(
- handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10, client=client)
- client.close()
+ try:
+ self.execute_query_expect_success(client, "set SYNC_DDL=1")
+ ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
+ handle = client.execute_async(ddl_query.format(database=unique_database))
+
+ # Restart standby catalogd with force_catalogd_active as true.
+ start_s = time.time()
+ catalogds[1].kill()
+ catalogds[1].start(wait_until_ready=True,
+ additional_args="--force_catalogd_active=true")
+ # Wait until original active catalogd becomes in-active.
+ catalogd_service_1 = catalogds[0].service
+ catalogd_service_1.wait_for_metric_value(
+ "catalog-server.active-status", expected_value=False, timeout=15)
+ assert(not
catalogd_service_1.get_metric_value("catalog-server.active-status"))
+ elapsed_s = time.time() - start_s
+ assert elapsed_s < SYNC_DDL_DELAY_S, \
+ "Catalogd failover took %s seconds to complete" % (elapsed_s)
+ LOG.info("Catalogd failover took %s seconds to complete" %
round(elapsed_s, 1))
+
+ # Verify that the query is failed due to the Catalogd HA fail-over.
+ self.wait_for_state(
+ handle, QueryState.EXCEPTION, SYNC_DDL_DELAY_S * 2 + 10,
client=client)
+ finally:
+ client.close()
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
diff --git a/tests/custom_cluster/test_statestored_ha.py
b/tests/custom_cluster/test_statestored_ha.py
index 1e9ec17f4..6ee77b592 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -18,12 +18,15 @@
from __future__ import absolute_import, division, print_function
import logging
import pytest
+import time
+from beeswaxd.BeeswaxService import QueryState
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout
from tests.common.impala_cluster import (
DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT)
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster
from time import sleep
from thrift.protocol import TBinaryProtocol
@@ -633,6 +636,112 @@ class TestStatestoredHA(CustomClusterTestSuite):
"statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
assert(not
statestore_service_0.get_metric_value("statestore.active-status"))
+ SUBSCRIBER_TIMEOUT_S = 2
+ SS_PEER_TIMEOUT_S = 2
+ RECOVERY_GRACE_PERIOD_S = 5
+
+ @pytest.mark.execute_serially
+ @SkipIfNotHdfsMinicluster.scheduling
+ @CustomClusterTestSuite.with_args(
+ statestored_args="--use_network_address_as_statestore_priority=true "
+ "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+ "--statestore_peer_timeout_seconds={timeout_s} "
+ "--use_subscriber_id_as_catalogd_priority=true"
+ .format(timeout_s=SS_PEER_TIMEOUT_S),
+ impalad_args="--statestore_subscriber_timeout_seconds={timeout_s} "
+
"--statestore_subscriber_recovery_grace_period_ms={recovery_period_ms}"
+ .format(timeout_s=SUBSCRIBER_TIMEOUT_S,
+ recovery_period_ms=(RECOVERY_GRACE_PERIOD_S * 1000)),
+ catalogd_args="--statestore_subscriber_timeout_seconds={timeout_s}"
+ .format(timeout_s=SUBSCRIBER_TIMEOUT_S),
+ start_args="--enable_statestored_ha --enable_catalogd_ha")
+ def test_statestore_failover_query_resilience(self):
+ """Test that a momentary inconsistent cluster membership state after
statestore
+ service fail-over will not result in query cancellation. Also make sure
that query
+ get cancelled if a backend actually went down after recovery grace
period."""
+ # Verify two statestored instances are created with one in active role.
+ statestoreds = self.cluster.statestoreds()
+ assert (len(statestoreds) == 2)
+ statestore_service_0 = statestoreds[0].service
+ statestore_service_1 = statestoreds[1].service
+ assert
(statestore_service_0.get_metric_value("statestore.active-status")), \
+ "First statestored must be active"
+ assert (not
statestore_service_1.get_metric_value("statestore.active-status")), \
+ "Second statestored must not be active"
+
+ slow_query = \
+ "select distinct * from tpch_parquet.lineitem where l_orderkey >
sleep(1000)"
+ impalad = self.cluster.impalads[0]
+ client = impalad.service.create_beeswax_client()
+ try:
+ # Run a slow query
+ handle = client.execute_async(slow_query)
+ # Make sure query starts running.
+ self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+ profile = client.get_runtime_profile(handle)
+ assert "NumBackends: 3" in profile, profile
+ # Kill active statestored
+ statestoreds[0].kill()
+ # Wait for long enough for the standby statestored to detect the failure
of active
+ # statestored and assign itself in active role.
+ statestore_service_1.wait_for_metric_value(
+ "statestore.active-status", expected_value=True, timeout=120)
+ assert
(statestore_service_1.get_metric_value("statestore.active-status")), \
+ "Second statestored must be active now"
+ statestore_service_1.wait_for_live_subscribers(5)
+ # Wait till the grace period ends + some buffer to verify the slow query
is still
+ # running.
+ sleep(self.RECOVERY_GRACE_PERIOD_S + 1)
+ assert client.get_state(handle) == QueryState.RUNNING, \
+ "Query expected to be in running state"
+ # Now kill a backend, and make sure the query fails.
+ self.cluster.impalads[2].kill()
+ try:
+ client.wait_for_finished_timeout(handle, 100)
+ assert False, "Query expected to fail"
+ except ImpalaBeeswaxException as e:
+ assert "Failed due to unreachable impalad" in str(e), str(e)
+
+ # Restart original active statestored. Verify that the statestored does
not resume
+ # its active role.
+ statestoreds[0].start(wait_until_ready=True)
+ statestore_service_0.wait_for_metric_value(
+ "statestore.active-status", expected_value=False, timeout=120)
+ assert (not
statestore_service_0.get_metric_value("statestore.active-status")), \
+ "First statestored must not be active"
+ assert
(statestore_service_1.get_metric_value("statestore.active-status")), \
+ "Second statestored must be active"
+ # Run a slow query
+ handle = client.execute_async(slow_query)
+ # Make sure query starts running.
+ self.wait_for_state(handle, QueryState.RUNNING, 120, client)
+ profile = client.get_runtime_profile(handle)
+ assert "NumBackends: 2" in profile, profile
+ # Kill current active statestored
+ start_time = time.time()
+ statestoreds[1].kill()
+ # Wait till the standby statestored becomes active.
+ query_state = client.get_state(handle)
+ assert query_state == QueryState.RUNNING
+ statestore_service_0.wait_for_metric_value(
+ "statestore.active-status", expected_value=True, timeout=120)
+ assert
(statestore_service_0.get_metric_value("statestore.active-status")), \
+ "First statestored must be active now"
+ # Kill one backend
+ self.cluster.impalads[1].kill()
+ # Verify that it has to wait longer than the RECOVERY_GRACE_PERIOD_S for
the
+ # query to fail. Combine failover time (SS_PEER_TIMEOUT_S) and recovery
grace
+ # period (RECOVERY_GRACE_PERIOD_S) to avoid flaky test.
+ timeout_s = self.SS_PEER_TIMEOUT_S + self.RECOVERY_GRACE_PERIOD_S * 2
+ self.wait_for_state(handle, QueryState.EXCEPTION, timeout_s, client)
+ client.close_query(handle)
+ elapsed_s = time.time() - start_time
+ assert elapsed_s >= self.SS_PEER_TIMEOUT_S +
self.RECOVERY_GRACE_PERIOD_S, \
+ ("Query was canceled in %s seconds, less than failover time +
grace-period"
+ % (elapsed_s))
+ finally:
+ client.close()
+
class TestStatestoredHAStartupDelay(CustomClusterTestSuite):
"""This test injects a real delay in statestored startup. The impalads and
catalogd are