This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 44c85e85a51bad4faca95f8771b2c2c5a686ca90 Author: wzhou-code <[email protected]> AuthorDate: Wed Nov 1 20:28:39 2023 -0700 IMPALA-12525: Fix flaky test test_statestored_manual_failover In test_statestored_manual_failover, statestore service failover is not triggered sometimes when the network of active statestored is disabled after manually forced failover. During test, the network of active statestored could be disabled before all subscribers re-registered with restarted statestored. This caused some subscribers to not receive the notification of active statestored change so that they could not correctly report connection states for the requests from standby statestored. This patch made following changes: 1) Updated the test case test_statestored_manual_failover to disable the network of active statestored after all subscribers re-registering with the restarted statestored. 2) Defined a new mutex active_lock_ in class StatestoreStub to protect is_active_ since the mutex lock_ could be held for long time if the subscriber lose the connection with statestored and enter recovery mode. 3) Found one case that was not handled on Statestore subscribers. The subscribers could be started before both statestore instances are ready to accept registration requests. This caused impalad hit DCHECK. Changed code to handle this case in this patch. Added test cases to inject a real delay in statestored startup and verify impalads and catalogd are able to tolerate this delay. 4) Updated address of active catalogd in the metrics of statestored after statestore service failover. 5) Another test test_statestored_auto_failover_with_disabling_network failed occasionally due to delay of HA Handshake RPC between two statestore instances. The issue is tracked with IMPALA-12550. The last two lines of the test are commented out temporarily. Testing: - Repeatedly ran test_statestored_manual_failover on Jenkins for hundreds of times. - Repeatedly ran test_statestored_manual_failover on local machine for thousand times without failure. - Passed core tests Change-Id: If03bf09d22a2875d2c1eec8a4f62eeefc5d855dc Reviewed-on: http://gerrit.cloudera.org:8080/20657 Reviewed-by: Riza Suminto <[email protected]> Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/statestore/statestore-subscriber.cc | 93 +++++++++++++++++++++-------- be/src/statestore/statestore-subscriber.h | 21 ++++--- be/src/statestore/statestore.cc | 12 +++- tests/custom_cluster/test_statestored_ha.py | 64 +++++++++++++++++++- 4 files changed, 150 insertions(+), 40 deletions(-) diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 4cb281f2f..e62e89219 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -381,17 +381,18 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id, // is not received. statestore_->Heartbeat(registration_id); // Report connection state with active statestore instance for the request from - // standby statestore. - if (request_active_conn_state && !statestore_->IsStatestoreActive() - && statestore2_ != nullptr) { + // standby statestore. It's possible that the notification of active statestored + // change has not been received. + if (request_active_conn_state && statestore2_ != nullptr) { *active_statestore_conn_state = statestore2_->GetStatestoreConnState(); } } else if (statestore2_ != nullptr && statestore2_->IsMatchingStatestoreId(statestore_id)) { statestore2_->Heartbeat(registration_id); // Report connection state with active statestore instance for the request from - // standby statestore. - if (request_active_conn_state && !statestore2_->IsStatestoreActive()) { + // standby statestore. It's possible that the notification of active statestored + // change has not been received. + if (request_active_conn_state) { *active_statestore_conn_state = statestore_->GetStatestoreConnState(); } } else { @@ -426,9 +427,11 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool is_active, bool* update_skipped) { DCHECK(enable_statestored_ha_); // Accept UpdateStatestoredRole RPC from standby statestored + StatestoreStub* active_statestore = GetActiveStatestore(); StatestoreStub* standby_statestore = GetStandbyStatestore(); if (standby_statestore != nullptr - && standby_statestore_->IsMatchingStatestoreId(statestore_id)) { + && standby_statestore->IsMatchingStatestoreId(statestore_id)) { + LOG(INFO) << "Receive UpdateStatestoredRole message from standby statestored"; // Receive notification of statestore service fail over, switch active and standby // statestoreds. standby_statestore->IncCountForUpdateStatestoredRoleRPC(); @@ -440,14 +443,51 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool is_active, standby_statestore_ = tmp; active_statestore_->SetStatestoreActive(is_active, active_statestored_version); standby_statestore_->SetStatestoreActive(!is_active, active_statestored_version); + LOG(INFO) << "Updated active statestored as " << active_statestore_->GetAddress(); } + if (update_active_catalogd) { - StatestoreStub* active_statestore = GetActiveStatestore(); + active_statestore = GetActiveStatestore(); + active_statestore->UpdateCatalogd(*catalogd_registration, registration_id, + active_catalogd_version, /* statestore_failover */true, update_skipped); + DCHECK(!(*update_skipped)); + } + } else if (active_statestore == nullptr) { + { + lock_guard<mutex> r(statestore_ha_lock_); + if (active_statestore_ == nullptr) { + LOG(INFO) << "Subscriber was started before both statestore instances were " + "ready to accept registration requests."; + DCHECK(standby_statestore_ == nullptr); + // Active/standby statestored are not set. This could happen if statestoreds were + // started after subscribers' registration attemption. + if (statestore_->IsMatchingStatestoreId(statestore_id)) { + active_statestore_ = statestore_; + standby_statestore_ = statestore2_; + } else { + DCHECK(statestore2_->IsMatchingStatestoreId(statestore_id)); + active_statestore_ = statestore2_; + standby_statestore_ = statestore_; + } + active_statestore_->SetStatestoreActive(is_active, active_statestored_version); + standby_statestore_->SetStatestoreActive(!is_active, active_statestored_version); + LOG(INFO) << "Updated active statestored as " << active_statestore_->GetAddress(); + } else { + LOG(INFO) << "Active statestored " << active_statestore_->GetAddress() + << " has been updated."; + } + } + + if (update_active_catalogd) { + active_statestore = GetActiveStatestore(); DCHECK(active_statestore != nullptr); active_statestore->UpdateCatalogd(*catalogd_registration, registration_id, active_catalogd_version, /* statestore_failover */true, update_skipped); DCHECK(!(*update_skipped)); } + } else if (active_statestore->IsMatchingStatestoreId(statestore_id)) { + LOG(INFO) << "statestored " << active_statestore->GetAddress() + << " is in active state."; } else { // It's possible the statestored update RPC is received before the registration // response is received. Skip this update so that the statestore will retry this @@ -465,7 +505,6 @@ StatestoreSubscriber::StatestoreStub* StatestoreSubscriber::GetActiveStatestore( StatestoreSubscriber::StatestoreStub* StatestoreSubscriber::GetStandbyStatestore() { lock_guard<mutex> r(statestore_ha_lock_); - DCHECK(standby_statestore_ != nullptr); return standby_statestore_; } @@ -662,6 +701,9 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd, } else { VLOG(1) << "No statestore ID received from statestore"; } + } + { + lock_guard<mutex> l(active_lock_); if (status.ok() && response.__isset.statestore_is_active) { is_active_ = response.statestore_is_active; if (is_active_) { @@ -671,16 +713,16 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd, active_statestored_version_ = response.active_statestored_version; active_status_metric_->SetValue(is_active_); } - if (status.ok() && response.__isset.catalogd_registration) { - VLOG(1) << "Active catalogd address: " - << TNetworkAddressToString(response.catalogd_registration.address); - if (has_active_catalogd != nullptr) *has_active_catalogd = true; - if (active_catalogd_version != nullptr && response.__isset.catalogd_version) { - *active_catalogd_version = response.catalogd_version; - } - if (active_catalogd_registration != nullptr) { - *active_catalogd_registration = response.catalogd_registration; - } + } + if (status.ok() && response.__isset.catalogd_registration) { + VLOG(1) << "Active catalogd address: " + << TNetworkAddressToString(response.catalogd_registration.address); + if (has_active_catalogd != nullptr) *has_active_catalogd = true; + if (active_catalogd_version != nullptr && response.__isset.catalogd_version) { + *active_catalogd_version = response.catalogd_version; + } + if (active_catalogd_registration != nullptr) { + *active_catalogd_registration = response.catalogd_registration; } } heartbeat_interval_timer_.Start(); @@ -1056,27 +1098,22 @@ bool StatestoreSubscriber::StatestoreStub::IsRegistered() { void StatestoreSubscriber::StatestoreStub::SetStatestoreActive( bool is_active, int64_t active_statestored_version) { - lock_guard<shared_mutex> exclusive_lock(lock_); + lock_guard<mutex> l(active_lock_); is_active_ = is_active; DCHECK(active_statestored_version_ <= active_statestored_version); active_statestored_version_ = active_statestored_version; active_status_metric_->SetValue(is_active); } -bool StatestoreSubscriber::StatestoreStub::IsStatestoreActive() { - lock_guard<shared_mutex> exclusive_lock(lock_); - return is_active_; -} - int64_t StatestoreSubscriber::StatestoreStub::GetActiveVersion(bool* is_active) { - lock_guard<shared_mutex> exclusive_lock(lock_); + lock_guard<mutex> l(active_lock_); *is_active = is_active_; return active_statestored_version_; } void StatestoreSubscriber::StatestoreStub::GetRegistrationIdAndStatestoreId( RegistrationId* registration_id, TUniqueId* statestore_id) { - lock_guard<shared_mutex> exclusive_lock(lock_); + lock_guard<mutex> r(id_lock_); *registration_id = registration_id_; *statestore_id = statestore_id_; } @@ -1102,4 +1139,8 @@ StatestoreSubscriber::StatestoreStub::GetStatestoreConnState() { } } +std::string StatestoreSubscriber::StatestoreStub::GetAddress() const { + return TNetworkAddressToString(statestore_address_); +} + } diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h index 5f406b567..b492f7e2b 100644 --- a/be/src/statestore/statestore-subscriber.h +++ b/be/src/statestore/statestore-subscriber.h @@ -350,8 +350,7 @@ class StatestoreSubscriber { /// Returns true if the registration with statestore is completed. bool IsRegistered(); - /// Get/set the active state of the registered statestore instance. - bool IsStatestoreActive(); + /// Set the active state of the registered statestore instance. void SetStatestoreActive(bool is_active, int64_t active_statestored_version); /// Return the version of active statestore. @@ -367,6 +366,8 @@ class StatestoreSubscriber { /// Get connection state with the registered statestore instance. TStatestoreConnState::type GetStatestoreConnState(); + std::string GetAddress() const; + private: /// Pointer to parent StatestoreSubscriber object StatestoreSubscriber* subscriber_; @@ -374,6 +375,16 @@ class StatestoreSubscriber { /// Address of the statestore TNetworkAddress statestore_address_; + /// True if the registered statestore instance is active. + bool is_active_ = false; + + /// The version of active statestored. + int64_t active_statestored_version_ = 0; + + /// Protects is_active_ and active_statestored_version_. Must be taken after lock_ + /// if both are to be taken together. + std::mutex active_lock_; + /// Object-wide lock that protects the below members. Must be held exclusively when /// modifying the members, except when modifying TopicRegistrations - see /// TopicRegistration::update_lock for details of locking there. Held in shared mode @@ -382,12 +393,6 @@ class StatestoreSubscriber { /// comments. boost::shared_mutex lock_; - /// True if the registered statestore instance is active. - bool is_active_ = false; - - /// The version of active statestored. - int64_t active_statestored_version_ = 0; - /// Failure detector that tracks heartbeat messages from the statestore. boost::scoped_ptr<impala::TimeoutFailureDetector> failure_detector_; diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 4a598f610..349ebc091 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -1620,6 +1620,10 @@ void Statestore::SendUpdateStatestoredRoleNotification( TCatalogRegistration catalogd_registration = catalog_manager_.GetActiveCatalogRegistration( &has_active_catalogd, &active_catalogd_version); + if (has_active_catalogd) { + active_catalogd_address_metric_->SetValue( + TNetworkAddressToString(catalogd_registration.address)); + } bool resend_rpc = false; if (active_statestored_version > *last_active_statestored_version) { @@ -2032,7 +2036,11 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& dst_statestore_id, << " subscribers lost connections with active statestored."; } continue; - } else if (majority_failed) { + } + + found_peer_ = false; + connected_peer_metric_->SetValue(found_peer_); + if (majority_failed) { // When standby statestored lost connection with active statestored, take over // active role if the majority of subscribers lost connections with active // statestored. @@ -2042,8 +2050,6 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& dst_statestore_id, is_active_ = true; active_status_metric_->SetValue(is_active_); active_version_ = UnixMicros(); - found_peer_ = false; - connected_peer_metric_->SetValue(found_peer_); // Send notification to all subscribers. update_statestored_cv_.NotifyAll(); } else if (total_subscribers == 0) { diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 1707fd950..3eec88a96 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -23,6 +23,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout from tests.common.impala_cluster import ( DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT) +from tests.common.skip import SkipIfBuildType from time import sleep from thrift.protocol import TBinaryProtocol @@ -445,6 +446,12 @@ class TestStatestoredHA(CustomClusterTestSuite): # Trigger second fail over by disabling the network of active statestored. if second_failover: + # Wait till all subscribers re-registering with the restarted statestored. + wait_time_s = build_flavor_timeout(90, slow_build_timeout=180) + statestore_service_0.wait_for_metric_value('statestore.live-backends', + expected_value=4, timeout=wait_time_s) + + sleep(1) self.__disable_statestored_network(disable_network=True) # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself with active role. @@ -622,6 +629,57 @@ class TestStatestoredHA(CustomClusterTestSuite): # Re-enable network for standby statestored. Verify that the statestored exits # HA recovery mode. self.__disable_statestored_network(disable_network=False) - statestore_service_0.wait_for_metric_value( - "statestore.in-ha-recovery-mode", expected_value=False, timeout=120) - assert(not statestore_service_0.get_metric_value("statestore.active-status")) + # IMPALA-12550: sometimes the active statestore takes a few minutes to response + # the HA handshake from standby statestore. Temporarily comment out following + # two lines util the issue is fixed. + # statestore_service_0.wait_for_metric_value( + # "statestore.in-ha-recovery-mode", expected_value=False, timeout=120) + # assert(not statestore_service_0.get_metric_value("statestore.active-status")) + + +class TestStatestoredHAStartupDelay(CustomClusterTestSuite): + """This test injects a real delay in statestored startup. The impalads and catalogd are + expected to be able to tolerate this delay with FLAGS_tolerate_statestore_startup_delay + set as true. This is not testing anything beyond successful startup.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def setup_class(cls): + if cls.exploration_strategy() != 'exhaustive': + pytest.skip('Statestore startup delay tests only run in exhaustive') + super(TestStatestoredHAStartupDelay, cls).setup_class() + + @SkipIfBuildType.not_dev_build + @CustomClusterTestSuite.with_args( + impalad_args="--tolerate_statestore_startup_delay=true", + catalogd_args="--tolerate_statestore_startup_delay=true", + statestored_args="--stress_statestore_startup_delay_ms=60000 " + "--use_network_address_as_statestore_priority=true", + start_args="--enable_statestored_ha") + def test_subscriber_tolerate_startup_delay(self): + """The impalads and catalogd are expected to be able to tolerate the delay of + statestored startup with starting flags FLAGS_tolerate_statestore_startup_delay + set as true.""" + # The actual test here is successful startup, and we assume nothing about the + # functionality of the impalads before the coordinator and catalogd finish + # starting up. + statestoreds = self.cluster.statestoreds() + assert(len(statestoreds) == 2) + assert(statestoreds[0].service.get_metric_value("statestore.active-status")) + assert(not statestoreds[1].service.get_metric_value("statestore.active-status")) + + # Verify that impalad and catalogd entered recovery mode and tried to re-register + # with statestore. + re_register_attempt = self.cluster.impalads[0].service.get_metric_value( + "statestore-subscriber.num-re-register-attempt") + assert re_register_attempt > 0 + re_register_attempt = self.cluster.catalogd.service.get_metric_value( + "statestore-subscriber.num-re-register-attempt") + assert re_register_attempt > 0 + + # Verify simple queries are ran successfully. + self.execute_query_expect_success( + self.client, "select count(*) from functional.alltypes")
