This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 7403d10a5 IMPALA-12550: Fix flaky test 
test_statestored_auto_failover_with_disabling_network
7403d10a5 is described below

commit 7403d10a55397f81784f369aa501b9c072d198b6
Author: wzhou-code <[email protected]>
AuthorDate: Wed Nov 8 12:20:19 2023 -0800

    IMPALA-12550: Fix flaky test 
test_statestored_auto_failover_with_disabling_network
    
    Test test_statestored_auto_failover_with_disabling_network failed
    occasionally due to delay of HA Handshake or HA heartbeat RPCs between
    two statestore instances. Sometimes the active statestore took a few
    minutes to respond to the handshake requests from standby statestore.
    
    This patch fixes the issue by not holding mutex ha_lock_ when sending
    HA handshake and HA heartbeat. Redundant HA heartbeats are handled
    on receiver side. Redundant HA handshakes are harmless.
    
    Testing:
     - Repeatedly ran test_statestored_auto_failover_with_disabling_network
       on Jenkins for hundreds of times without failure.
     - Repeatedly ran test_statestored_auto_failover_with_disabling_network
       on local machine for thousand times without failure.
     - Repeatedly ran all tests in test_statestored_ha.py for over 12 hours
       on Jenkins without failure.
     - Passed core tests.
    
    Change-Id: I515bbaaddfb4bf9bd2a39414cd6e3e4590dfbfb1
    Reviewed-on: http://gerrit.cloudera.org:8080/20689
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/statestore/statestore-subscriber.cc  |  9 ++--
 be/src/statestore/statestore.cc             | 81 ++++++++++++++++++++++++-----
 be/src/statestore/statestore.h              | 10 ++++
 tests/custom_cluster/test_statestored_ha.py |  9 ++--
 4 files changed, 87 insertions(+), 22 deletions(-)

diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index e62e89219..24b9e7e59 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -396,7 +396,8 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId& 
registration_id,
       *active_statestore_conn_state = statestore_->GetStatestoreConnState();
     }
   } else {
-    VLOG(3) << "Ignore heartbeat message from unknown statestored: " << 
statestore_id;
+    VLOG(3) << "Ignore heartbeat message from unknown statestored: "
+            << PrintId(statestore_id);
   }
 }
 
@@ -416,7 +417,7 @@ void StatestoreSubscriber::UpdateCatalogd(
     // the future.
     *update_skipped = true;
     LOG(INFO) << "Skipped updating catalogd message from unknown or inactive "
-              << "statestored: " << statestore_id;
+              << "statestored: " << PrintId(statestore_id);
   }
 }
 
@@ -494,7 +495,7 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool 
is_active,
     // update in the future.
     *update_skipped = true;
     LOG(INFO) << "Skipped updating statestored message from unknown 
statestored: "
-              << statestore_id;
+              << PrintId(statestore_id);
   }
 }
 
@@ -523,7 +524,7 @@ Status StatestoreSubscriber::UpdateState(const 
TopicDeltaMap& incoming_topic_del
     // future.
     *skipped = true;
     VLOG(3) << "Skipped topic update message from unknown or inactive 
statestored: "
-            << statestore_id;
+            << PrintId(statestore_id);
     return Status::OK();
   }
 }
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 349ebc091..df2573b2a 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -208,6 +208,13 @@ const string STATESTORE_CONNECTED_PEER = 
"statestore.connected-with-peer-statest
 // an entry with the initial version.
 const Statestore::TopicEntry::Version 
Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
 
+// If statestore instance in active state receives more than 10 heartbeats 
from its peer,
+// enter recovery mode to re-negotiate role with its peer.
+// Heartbeat period is set by 
FALGS_statestore_ha_heartbeat_monitoring_frequency_ms, its
+// default value is 1000 ms. That means statestore instance in active state 
will enter
+// recovery mode in 10 seconds if it repeatedly receives heartbeats from its 
peer.
+#define MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE 10
+
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
@@ -674,6 +681,7 @@ Statestore::Statestore(MetricGroup* metrics)
         FLAGS_statestore_max_missed_heartbeats,
         FLAGS_statestore_max_missed_heartbeats / 2)) {
   UUIDToTUniqueId(boost::uuids::random_generator()(), &statestore_id_);
+  LOG(INFO) << "Statestore ID: " << PrintId(statestore_id_);
   DCHECK(metrics != NULL);
   metrics_ = metrics;
   num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
@@ -712,6 +720,7 @@ Statestore::Statestore(MetricGroup* metrics)
     is_active_ = true;
     active_status_metric_->SetValue(is_active_);
     active_version_ = UnixMicros();
+    num_received_heartbeat_in_active_ = 0;
   } else {
     is_active_ = false;
     active_status_metric_->SetValue(is_active_);
@@ -1823,6 +1832,7 @@ Status Statestore::InitStatestoreHa(
     is_active_ = true;
     active_status_metric_->SetValue(is_active_);
     active_version_ = UnixMicros();
+    num_received_heartbeat_in_active_ = 0;
     LOG(INFO) << "Set Statestore as active since it does not receive handshake 
"
               << "response in HA preemption waiting period";
     found_peer_ = false;
@@ -1855,6 +1865,11 @@ int64_t Statestore::GetActiveVersion(bool* is_active) {
   return active_version_;
 }
 
+bool Statestore::IsInRecoveryMode() {
+  lock_guard<mutex> l(ha_lock_);
+  return in_recovery_mode_;
+}
+
 Status Statestore::SendHaHandshake(TStatestoreHaHandshakeResponse* response) {
   if (disable_network_.Load()) {
     return Status("Don't send HA handshake since network is disabled.");
@@ -1906,6 +1921,7 @@ Status Statestore::ReceiveHaHandshakeRequest(const 
TUniqueId& peer_statestore_id
       is_active_ = true;
       active_status_metric_->SetValue(is_active_);
       active_version_ = UnixMicros();
+      num_received_heartbeat_in_active_ = 0;
       ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
       LOG(INFO) << "Set the statestored as active since it's started with 
force active "
                 << "flag";
@@ -1924,6 +1940,8 @@ Status Statestore::ReceiveHaHandshakeRequest(const 
TUniqueId& peer_statestore_id
       }
       LOG(INFO) << "Set the statestored as " << (is_active_ ? "active" : 
"standby");
     }
+  } else {
+    LOG(INFO) << "Active state of statestored is not changed";
   }
   *statestore_active = is_active_;
   if (!found_peer_) {
@@ -1942,11 +1960,16 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
     // process HA heartbeat from active statestore
     ha_active_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
   } else {
-    // Receive heartbeat from its peer statestored. That means both 
statestored designate
-    // themselves as active. Enter recovery mode to restart negotiation.
+    num_received_heartbeat_in_active_++;
+    if (num_received_heartbeat_in_active_ <= 
MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE) {
+      return;
+    }
+    // Repeatedly receive heartbeat from its peer statestored. That means both 
statestored
+    // designate themselves as active. Enter recovery mode to restart 
negotiation.
     LOG(WARNING)
         << "Both statestoreds designate themselves as active, restart 
negotiation.";
     in_recovery_mode_ = true;
+    recovery_start_time_ = MonotonicMillis();
     in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
     is_active_ = false;
     active_status_metric_->SetValue(is_active_);
@@ -1954,19 +1977,30 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
   }
 }
 
+// TODO: break this function to 3 functions for each branch: recovery-mode, 
active state,
+// and standby state.
 [[noreturn]] void Statestore::MonitorStatestoredHaHeartbeat() {
+  bool sleep_between_processing = true;
   while (1) {
-    SleepForMs(FLAGS_statestore_ha_heartbeat_monitoring_frequency_ms);
-    lock_guard<mutex> l(ha_lock_);
-    if (in_recovery_mode_) {
+    if (sleep_between_processing) {
+      SleepForMs(FLAGS_statestore_ha_heartbeat_monitoring_frequency_ms);
+    } else {
+      sleep_between_processing = true;
+    }
+    if (IsInRecoveryMode()) {
       // Keep sending HA handshake request to its peer periodically until 
receiving
-      // response.
+      // response. Don't hold the ha_lock_ when sending HA handshake.
       TStatestoreHaHandshakeResponse response;
       Status status = SendHaHandshake(&response);
       if (!status.ok()) continue;
       status = Status(response.status);
       DCHECK(status.ok());
 
+      lock_guard<mutex> l(ha_lock_);
+      if (!in_recovery_mode_) {
+        sleep_between_processing = false;
+        continue;
+      }
       // Exit "recovery" mode.
       in_recovery_mode_ = false;
       in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
@@ -1975,20 +2009,32 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
       active_status_metric_->SetValue(is_active_);
       found_peer_ = true;
       connected_peer_metric_->SetValue(found_peer_);
-      LOG(INFO) << "Receive Statestore HA handshake response, exit HA recovery 
mode. "
-                << "Set the statestored as " << (is_active_ ? "active" : 
"standby");
+      int64_t elapsed_ms = MonotonicMillis() - recovery_start_time_;
+      LOG(INFO) << "Receive Statestore HA handshake response, exit HA recovery 
mode in "
+                << PrettyPrinter::Print(elapsed_ms, TUnit::TIME_MS)
+                << ". Set the statestored as " << (is_active_ ? "active" : 
"standby");
       if (is_active_) {
         active_version_ = UnixMicros();
         // Send notification to all subscribers.
         update_statestored_cv_.NotifyAll();
       }
-    } else if (is_active_) {
+    } else if (IsActive()) {
       // Statestored in active state
       // Send HA heartbeat to standby statestored.
-      if (found_peer_) {
+      bool send_heartbeat = false;
+      {
+        lock_guard<mutex> l(ha_lock_);
+        if (is_active_ && found_peer_) send_heartbeat = true;
+      }
+      if (send_heartbeat) {
         Status status = SendHaHeartbeat();
         if (status.ok()) continue;
       }
+      lock_guard<mutex> l(ha_lock_);
+      if (!is_active_) {
+        sleep_between_processing = false;
+        continue;
+      }
       // Check if standby statestored is reachable.
       FailureDetector::PeerState state =
           ha_standby_ss_failure_detector_->GetPeerState(STATESTORE_ID);
@@ -2001,12 +2047,13 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
         LOG(INFO) << "Statestored lost connection with peer statestored";
       }
 
-      lock_guard<mutex> l(subscribers_lock_);
+      lock_guard<mutex> l2(subscribers_lock_);
       if (subscribers_.size() == 0) {
         // To avoid race with new active statestored, original active 
statestored enter
         // "recovery" mode if it does not receive heartbeat responses from 
standby
         // statestored and all subscribers.
         in_recovery_mode_ = true;
+        recovery_start_time_ = MonotonicMillis();
         in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
         is_active_ = false;
         active_status_metric_->SetValue(is_active_);
@@ -2017,6 +2064,11 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
     } else {
       // Statestored in standby state
       // Monitor connection state with its peer statestored.
+      lock_guard<mutex> l(ha_lock_);
+      if (is_active_) {
+        sleep_between_processing = false;
+        continue;
+      }
       FailureDetector::PeerState state =
           ha_active_ss_failure_detector_->GetPeerState(STATESTORE_ID);
       // Check if the majority of subscribers lost connection with active 
statestored.
@@ -2050,12 +2102,14 @@ void Statestore::HaHeartbeatRequest(const TUniqueId& 
dst_statestore_id,
         is_active_ = true;
         active_status_metric_->SetValue(is_active_);
         active_version_ = UnixMicros();
+        num_received_heartbeat_in_active_ = 0;
         // Send notification to all subscribers.
         update_statestored_cv_.NotifyAll();
       } else if (total_subscribers == 0) {
         // If there is no subscriber, it means this statestored lost 
connection with
         // other nodes in the cluster, enter "recovery" mode.
         in_recovery_mode_ = true;
+        recovery_start_time_ = MonotonicMillis();
         in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
         LOG(WARNING) << "Enter HA recovery mode.";
       } else {
@@ -2080,7 +2134,10 @@ Status Statestore::SendHaHeartbeat() {
 
   TStatestoreHaHeartbeatRequest request;
   TStatestoreHaHeartbeatResponse response;
-  request.__set_dst_statestore_id(peer_statestore_id_);
+  {
+    lock_guard<mutex> l(ha_lock_);
+    request.__set_dst_statestore_id(peer_statestore_id_);
+  }
   request.__set_src_statestore_id(statestore_id_);
   status = 
client.DoRpc(&StatestoreHaServiceClientWrapper::StatestoreHaHeartbeat,
       request, &response);
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index c55cc227b..11b0fb6fe 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -794,6 +794,13 @@ class Statestore : public CacheLineAligned {
   /// True if the statestore instance is in recovery mode.
   bool in_recovery_mode_ = false;
 
+  /// Starting time to enter recovery mode.
+  int64_t recovery_start_time_;
+
+  /// Number of HA heartbeat received in active state.
+  /// Reset this variable whenever `is_active_` is set to true.
+  int num_received_heartbeat_in_active_ = 0;
+
   /// Disable network if this variable is set as true by statestore service 
API.
   /// This is only used for unit-test.
   AtomicBool disable_network_{false};
@@ -999,6 +1006,9 @@ class Statestore : public CacheLineAligned {
   /// Raw callback to indicate whether the service is ready.
   void HealthzHandler(const Webserver::WebRequest& req, std::stringstream* 
data,
       HttpStatusCode* response);
+
+  // Return true if this statestore instance is in recovery mode.
+  bool IsInRecoveryMode();
 };
 
 } // namespace impala
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 3eec88a96..1e9ec17f4 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -629,12 +629,9 @@ class TestStatestoredHA(CustomClusterTestSuite):
     # Re-enable network for standby statestored. Verify that the statestored 
exits
     # HA recovery mode.
     self.__disable_statestored_network(disable_network=False)
-    # IMPALA-12550: sometimes the active statestore takes a few minutes to 
response
-    # the HA handshake from standby statestore. Temporarily comment out 
following
-    # two lines util the issue is fixed.
-    # statestore_service_0.wait_for_metric_value(
-    #    "statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
-    # assert(not 
statestore_service_0.get_metric_value("statestore.active-status"))
+    statestore_service_0.wait_for_metric_value(
+        "statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
+    assert(not 
statestore_service_0.get_metric_value("statestore.active-status"))
 
 
 class TestStatestoredHAStartupDelay(CustomClusterTestSuite):

Reply via email to