This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d2cd9b51a03dbd8b2e485ee446bf7530656ab214
Author: wzhou-code <[email protected]>
AuthorDate: Tue Sep 17 22:44:43 2024 -0700

    IMPALA-13388: fix unit-tests of Statestore HA for UBSAN builds
    
    Sometimes in UBSAN builds, unit-tests of Statestore HA failed due to
    Thrift RPC receiving timeout. Standby statestored failed to send
    heartbeats to its subscribers so that failover was not triggered.
    The Thrift RPC failures still happened after increasing TCP timeout
    for Thrift RPCs between statestored and its subscribers.
    
    This patch adds a metric for number of subscribers which recevied
    heartbeats from statestored in a monitoring period. Unit-tests of
    Statestored HA for UBSAN build will be skipped if statestored failed
    to send heartbeats to more than half of subscribers.
    For other builds, throw exception with error message which complain
    Thrift RPC failure if statestored failed to send heartbeats to more
    than half of subscribers.
    Also fixed a bug which calls SecondsSinceHeartbeat() but compares
    the retutned value with time value in milli-seconds.
    
    Filed following up JIRA IMPALA-13399 to track the very root cause.
    
    Testing:
     - Looped to run test_statestored_ha.py for 100 times in UBSAN
       build without failed case, but 4 iterations out of 100 have
       skipped test cases.
     - Verified that the issue did not happen for ASAN build by
       running test_statestored_ha.py for 100 times in ASAN build.
     - Passed core test.
    
    Change-Id: Ie59d1e93c635411723f7044da52e4ab19c7d2fac
    Reviewed-on: http://gerrit.cloudera.org:8080/21820
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/statestore/statestore.cc             | 29 ++++++++++-----
 be/src/statestore/statestore.h              | 14 ++++++-
 common/thrift/metrics.json                  | 10 +++++
 tests/custom_cluster/test_statestored_ha.py | 58 +++++++++++++++++++++++------
 4 files changed, 89 insertions(+), 22 deletions(-)

diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index ce78f9119..5b5ec184f 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -180,6 +180,8 @@ const string STATESTORE_ID = "STATESTORE";
 // TODO: Replace 'backend' with 'subscriber' when we can coordinate a change 
with CM
 const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
 const string STATESTORE_LIVE_SUBSCRIBERS_LIST = 
"statestore.live-backends.list";
+const string STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT =
+    "statestore.subscribers-received-heartbeat";
 const string STATESTORE_TOTAL_KEY_SIZE_BYTES = 
"statestore.total-key-size-bytes";
 const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = 
"statestore.total-value-size-bytes";
 const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = 
"statestore.total-topic-size-bytes";
@@ -566,7 +568,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& 
subscriber_id,
   LOG(INFO) << "Subscriber '" << subscriber_id_
             << "' with type " << SubscriberTypeToString(subscriber_type_)
             << " registered (registration id: " << PrintId(registration_id_) 
<< ")";
-  RefreshLastHeartbeatTimestamp();
+  RefreshLastHeartbeatTimestamp(false);
   for (const TTopicRegistration& topic : subscribed_topics) {
     GetTopicsMapForId(topic.topic_name)
         ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
@@ -644,9 +646,10 @@ void 
Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i
   topic_it->second.last_version.Store(version);
 }
 
-void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() {
+void Statestore::Subscriber::RefreshLastHeartbeatTimestamp(bool 
received_heartbeat) {
   DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load());
   last_heartbeat_ts_.Store(MonotonicMillis());
+  if (received_heartbeat) received_heartbeat_.Store(true);
 }
 
 void Statestore::Subscriber::UpdateCatalogInfo(
@@ -701,6 +704,8 @@ Statestore::Statestore(MetricGroup* metrics)
   num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
   subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
       STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
+  num_subscribers_received_heartbeat_metric_ =
+      metrics->AddGauge(STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT, 0);
   key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
   value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
   topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
@@ -1372,7 +1377,7 @@ void Statestore::DoSubscriberUpdate(UpdateKind 
update_kind, int thread_id,
   if (is_heartbeat) {
     status = SendHeartbeat(subscriber.get());
     if (status.ok()) {
-      subscriber->RefreshLastHeartbeatTimestamp();
+      subscriber->RefreshLastHeartbeatTimestamp(true);
     } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the 
stack
       status.AddDetail(Substitute(
@@ -1453,28 +1458,32 @@ void Statestore::DoSubscriberUpdate(UpdateKind 
update_kind, int thread_id,
 [[noreturn]] void Statestore::MonitorSubscriberHeartbeat() {
   while (1) {
     int num_subscribers;
+    int num_subscribers_received_heartbeat = 0;
     vector<SubscriberId> inactive_subscribers;
     SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms);
     {
       lock_guard<mutex> l(subscribers_lock_);
       num_subscribers = subscribers_.size();
       for (const auto& subscriber : subscribers_) {
-        if (subscriber.second->SecondsSinceHeartbeat()
+        if (subscriber.second->MilliSecondsSinceHeartbeat()
             > FLAGS_heartbeat_monitoring_frequency_ms) {
           inactive_subscribers.push_back(subscriber.second->id());
+        } else if (subscriber.second->receivedHeartbeat()) {
+          num_subscribers_received_heartbeat++;
         }
       }
     }
+    num_subscribers_received_heartbeat_metric_->SetValue(
+        num_subscribers_received_heartbeat);
     if (inactive_subscribers.empty()) {
-      LOG(INFO) << "All " << num_subscribers
+      LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers
                 << " subscribers successfully heartbeat in the last "
                 << FLAGS_heartbeat_monitoring_frequency_ms << "ms.";
     } else {
-      int num_active_subscribers = num_subscribers - 
inactive_subscribers.size();
-      LOG(WARNING) << num_active_subscribers << "/" << num_subscribers
-                   << " subscribers successfully heartbeat in the last "
-                   << FLAGS_heartbeat_monitoring_frequency_ms << "ms."
-                   << " Slow subscribers: " << 
boost::join(inactive_subscribers, ", ");
+      LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers
+                << " subscribers successfully heartbeat in the last "
+                << FLAGS_heartbeat_monitoring_frequency_ms << "ms."
+                << " Slow subscribers: " << boost::join(inactive_subscribers, 
", ");
     }
   }
 }
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 949cac7fd..97f2d1cc3 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -521,6 +521,15 @@ class Statestore : public CacheLineAligned {
       return (static_cast<double>(MonotonicMillis() - 
last_heartbeat_ts_.Load()))
           / 1000.0;
     }
+    /// Returns the time elapsed (in milli seconds) since the last heartbeat.
+    int64_t MilliSecondsSinceHeartbeat() const {
+      return MonotonicMillis() - last_heartbeat_ts_.Load();
+    }
+
+    /// Return true if the subscriber received heartbeat.
+    bool receivedHeartbeat() const {
+      return received_heartbeat_.Load();
+    }
 
     /// Get the Topics map that would be used to store 'topic_id'.
     const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
@@ -566,7 +575,7 @@ class Statestore : public CacheLineAligned {
         TopicEntry::Version version);
 
     /// Refresh the subscriber's last heartbeat timestamp to the current 
monotonic time.
-    void RefreshLastHeartbeatTimestamp();
+    void RefreshLastHeartbeatTimestamp(bool received_heartbeat);
 
     /// Check if the subscriber is Catalog daemon.
     bool IsCatalogd() const {
@@ -613,6 +622,8 @@ class Statestore : public CacheLineAligned {
     /// The timestamp of the last successful heartbeat in milliseconds. A 
timestamp much
     /// older than the heartbeat frequency implies an unresponsive subscriber.
     AtomicInt64 last_heartbeat_ts_{0};
+    /// True if the subscriber received heartbeat.
+    AtomicBool received_heartbeat_{false};
 
     /// Lock held when adding or deleting transient entries. See class comment 
for lock
     /// acquisition order.
@@ -849,6 +860,7 @@ class Statestore : public CacheLineAligned {
   /// Metric that track the registered, non-failed subscribers.
   IntGauge* num_subscribers_metric_;
   SetMetric<std::string>* subscriber_set_metric_;
+  IntGauge* num_subscribers_received_heartbeat_metric_;
 
   /// Metrics shared across all topics to sum the size in bytes of keys, 
values and both
   IntGauge* key_size_metric_;
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index d77415dfc..1ec740834 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2248,6 +2248,16 @@
     "kind": "SET",
     "key": "statestore.live-backends.list"
   },
+  {
+    "description": "The number of Statestore subscribers received heartbeat in 
last monitoring interval.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Number of Statestore subscribers received heartbeat",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "statestore.subscribers-received-heartbeat"
+  },
   {
     "description": "The time (sec) spent sending non-priority topic update 
RPCs. Includes subscriber-side processing time and network transmission time.",
     "contexts": [
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 6ee77b592..35dcca7f7 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -23,7 +23,7 @@ import time
 from beeswaxd.BeeswaxService import QueryState
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.environ import build_flavor_timeout
+from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
 from tests.common.impala_cluster import (
     DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT)
 from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster
@@ -138,6 +138,39 @@ class TestStatestoredHA(CustomClusterTestSuite):
       self.execute_query_expect_success(
           self.client, "drop table if exists test_statestored_ha")
 
+  # Wait for long enough for the standby statestored to detect the failure of 
active
+  # statestored and assign itself with active role.
+  def __wait_statestore_to_be_active(self, statestore_service, 
statestored_index):
+    try:
+      statestore_service.wait_for_metric_value(
+          "statestore.active-status", expected_value=True, timeout=120)
+    except Exception as e:
+      num_subscribers = 
statestore_service.get_metric_value("statestore.live-backends")
+      num_subscribers_received_heartbeat = statestore_service.get_metric_value(
+          "statestore.subscribers-received-heartbeat")
+      if num_subscribers_received_heartbeat >= num_subscribers / 2:
+        assert False, str(e)
+      else:
+        num_failed = num_subscribers - num_subscribers_received_heartbeat
+        assert_string = "Standby statestored failed to send heartbeats to " +\
+            "{0} out of {1} subscribers due to Thrift RPC failures".format(
+                num_failed, num_subscribers)
+        if ImpalaTestClusterProperties.get_instance().is_ubsan():
+          # IMPALA-13388: Skip this tests for UBSAN build due to Thrift RPC 
failures.
+          # A following up JIRA IMPALA-13399 was filed to track the very root 
cause.
+          # Remove this skipping when JIRA IMPALA-13399 is resolved.
+          if statestored_index == 1:
+            daemon_name = "statestored_node1"
+          else:
+            daemon_name = "statestored"
+          # Check log file of statestored that it is indeed a Thrift RPC issue 
by
+          # verifying existence of the log message with following pattern.
+          log_regex = "Unable to send heartbeat message to subscriber .*, 
received " +\
+              "error: RPC recv timed out: .*, rpc: 
N6impala18THeartbeatResponseE.*"
+          self.assert_log_contains(daemon_name, "INFO", log_regex, 
expected_count=-1)
+          pytest.skip("Skip this tests for UBSAN builds since " + 
assert_string)
+        assert False, assert_string
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true",
@@ -255,8 +288,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
 
     # Wait for long enough for the standby statestored to detect the failure 
of active
     # statestored and assign itself with active role.
-    statestore_service_1.wait_for_metric_value(
-        "statestore.active-status", expected_value=True, timeout=120)
+    self.__wait_statestore_to_be_active(statestore_service_1, 1)
     assert(statestore_service_1.get_metric_value("statestore.active-status"))
     sleep(1)
 
@@ -331,6 +363,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
     catalogd_args="--statestore_subscriber_timeout_seconds=2",
@@ -343,6 +376,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 "
                      
"--debug_actions=SEND_UPDATE_STATESTORED_RPC_FIRST_ATTEMPT:[email protected]",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
@@ -356,6 +390,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 "
                      "--use_subscriber_id_as_catalogd_priority=true",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
@@ -374,6 +409,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 ",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
     catalogd_args="--statestore_subscriber_timeout_seconds=2",
@@ -410,8 +446,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
 
     # Wait for long enough for the standby statestored to detect the failure 
of active
     # statestored and assign itself with active role.
-    statestore_service_1.wait_for_metric_value(
-        "statestore.active-status", expected_value=True, timeout=120)
+    self.__wait_statestore_to_be_active(statestore_service_1, 1)
     assert(statestore_service_1.get_metric_value("statestore.active-status"))
     sleep(1)
 
@@ -458,8 +493,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       self.__disable_statestored_network(disable_network=True)
       # Wait for long enough for the standby statestored to detect the failure 
of active
       # statestored and assign itself with active role.
-      statestore_service_1.wait_for_metric_value(
-          "statestore.active-status", expected_value=True, timeout=120)
+      self.__wait_statestore_to_be_active(statestore_service_1, 1)
       assert(statestore_service_1.get_metric_value("statestore.active-status"))
       # Verify that original active statestored is in HA recovery mode and is 
not active.
       statestore_service_0.wait_for_metric_value(
@@ -500,6 +534,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=100 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 "
                      "--debug_actions=DISABLE_STATESTORE_NETWORK",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
@@ -513,6 +548,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=100 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 "
                      
"--debug_actions=SEND_UPDATE_STATESTORED_RPC_FIRST_ATTEMPT:[email protected]",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
@@ -559,6 +595,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds=2 "
                      "--debug_actions=DISABLE_STATESTORE_NETWORK",
     impalad_args="--statestore_subscriber_timeout_seconds=2",
@@ -588,8 +625,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
 
     # Wait for long enough for the standby statestored to detect the failure 
of active
     # statestored and assign itself with active role.
-    statestore_service_1.wait_for_metric_value(
-        "statestore.active-status", expected_value=True, timeout=120)
+    self.__wait_statestore_to_be_active(statestore_service_1, 1)
     assert(statestore_service_1.get_metric_value("statestore.active-status"))
     # Verify that original active statestored is in HA recovery mode.
     statestore_service_0.wait_for_metric_value(
@@ -645,6 +681,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_network_address_as_statestore_priority=true "
                      "--statestore_ha_heartbeat_monitoring_frequency_ms=50 "
+                     "--heartbeat_monitoring_frequency_ms=6000 "
                      "--statestore_peer_timeout_seconds={timeout_s} "
                      "--use_subscriber_id_as_catalogd_priority=true"
                      .format(timeout_s=SS_PEER_TIMEOUT_S),
@@ -684,8 +721,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       statestoreds[0].kill()
       # Wait for long enough for the standby statestored to detect the failure 
of active
       # statestored and assign itself in active role.
-      statestore_service_1.wait_for_metric_value(
-          "statestore.active-status", expected_value=True, timeout=120)
+      self.__wait_statestore_to_be_active(statestore_service_1, 1)
       assert 
(statestore_service_1.get_metric_value("statestore.active-status")), \
           "Second statestored must be active now"
       statestore_service_1.wait_for_live_subscribers(5)

Reply via email to