This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d2cd9b51a03dbd8b2e485ee446bf7530656ab214 Author: wzhou-code <[email protected]> AuthorDate: Tue Sep 17 22:44:43 2024 -0700 IMPALA-13388: fix unit-tests of Statestore HA for UBSAN builds Sometimes in UBSAN builds, unit-tests of Statestore HA failed due to Thrift RPC receiving timeout. Standby statestored failed to send heartbeats to its subscribers so that failover was not triggered. The Thrift RPC failures still happened after increasing TCP timeout for Thrift RPCs between statestored and its subscribers. This patch adds a metric for number of subscribers which recevied heartbeats from statestored in a monitoring period. Unit-tests of Statestored HA for UBSAN build will be skipped if statestored failed to send heartbeats to more than half of subscribers. For other builds, throw exception with error message which complain Thrift RPC failure if statestored failed to send heartbeats to more than half of subscribers. Also fixed a bug which calls SecondsSinceHeartbeat() but compares the retutned value with time value in milli-seconds. Filed following up JIRA IMPALA-13399 to track the very root cause. Testing: - Looped to run test_statestored_ha.py for 100 times in UBSAN build without failed case, but 4 iterations out of 100 have skipped test cases. - Verified that the issue did not happen for ASAN build by running test_statestored_ha.py for 100 times in ASAN build. - Passed core test. Change-Id: Ie59d1e93c635411723f7044da52e4ab19c7d2fac Reviewed-on: http://gerrit.cloudera.org:8080/21820 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/statestore/statestore.cc | 29 ++++++++++----- be/src/statestore/statestore.h | 14 ++++++- common/thrift/metrics.json | 10 +++++ tests/custom_cluster/test_statestored_ha.py | 58 +++++++++++++++++++++++------ 4 files changed, 89 insertions(+), 22 deletions(-) diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index ce78f9119..5b5ec184f 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -180,6 +180,8 @@ const string STATESTORE_ID = "STATESTORE"; // TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends"; const string STATESTORE_LIVE_SUBSCRIBERS_LIST = "statestore.live-backends.list"; +const string STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT = + "statestore.subscribers-received-heartbeat"; const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes"; const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes"; const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes"; @@ -566,7 +568,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id, LOG(INFO) << "Subscriber '" << subscriber_id_ << "' with type " << SubscriberTypeToString(subscriber_type_) << " registered (registration id: " << PrintId(registration_id_) << ")"; - RefreshLastHeartbeatTimestamp(); + RefreshLastHeartbeatTimestamp(false); for (const TTopicRegistration& topic : subscribed_topics) { GetTopicsMapForId(topic.topic_name) ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name), @@ -644,9 +646,10 @@ void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i topic_it->second.last_version.Store(version); } -void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() { +void Statestore::Subscriber::RefreshLastHeartbeatTimestamp(bool received_heartbeat) { DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load()); last_heartbeat_ts_.Store(MonotonicMillis()); + if (received_heartbeat) received_heartbeat_.Store(true); } void Statestore::Subscriber::UpdateCatalogInfo( @@ -701,6 +704,8 @@ Statestore::Statestore(MetricGroup* metrics) num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0); subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics, STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>()); + num_subscribers_received_heartbeat_metric_ = + metrics->AddGauge(STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT, 0); key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0); value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0); topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0); @@ -1372,7 +1377,7 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id, if (is_heartbeat) { status = SendHeartbeat(subscriber.get()); if (status.ok()) { - subscriber->RefreshLastHeartbeatTimestamp(); + subscriber->RefreshLastHeartbeatTimestamp(true); } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) { // Add details to status to make it more useful, while preserving the stack status.AddDetail(Substitute( @@ -1453,28 +1458,32 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id, [[noreturn]] void Statestore::MonitorSubscriberHeartbeat() { while (1) { int num_subscribers; + int num_subscribers_received_heartbeat = 0; vector<SubscriberId> inactive_subscribers; SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms); { lock_guard<mutex> l(subscribers_lock_); num_subscribers = subscribers_.size(); for (const auto& subscriber : subscribers_) { - if (subscriber.second->SecondsSinceHeartbeat() + if (subscriber.second->MilliSecondsSinceHeartbeat() > FLAGS_heartbeat_monitoring_frequency_ms) { inactive_subscribers.push_back(subscriber.second->id()); + } else if (subscriber.second->receivedHeartbeat()) { + num_subscribers_received_heartbeat++; } } } + num_subscribers_received_heartbeat_metric_->SetValue( + num_subscribers_received_heartbeat); if (inactive_subscribers.empty()) { - LOG(INFO) << "All " << num_subscribers + LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers << " subscribers successfully heartbeat in the last " << FLAGS_heartbeat_monitoring_frequency_ms << "ms."; } else { - int num_active_subscribers = num_subscribers - inactive_subscribers.size(); - LOG(WARNING) << num_active_subscribers << "/" << num_subscribers - << " subscribers successfully heartbeat in the last " - << FLAGS_heartbeat_monitoring_frequency_ms << "ms." - << " Slow subscribers: " << boost::join(inactive_subscribers, ", "); + LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers + << " subscribers successfully heartbeat in the last " + << FLAGS_heartbeat_monitoring_frequency_ms << "ms." + << " Slow subscribers: " << boost::join(inactive_subscribers, ", "); } } } diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 949cac7fd..97f2d1cc3 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -521,6 +521,15 @@ class Statestore : public CacheLineAligned { return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load())) / 1000.0; } + /// Returns the time elapsed (in milli seconds) since the last heartbeat. + int64_t MilliSecondsSinceHeartbeat() const { + return MonotonicMillis() - last_heartbeat_ts_.Load(); + } + + /// Return true if the subscriber received heartbeat. + bool receivedHeartbeat() const { + return received_heartbeat_.Load(); + } /// Get the Topics map that would be used to store 'topic_id'. const Topics& GetTopicsMapForId(const TopicId& topic_id) const { @@ -566,7 +575,7 @@ class Statestore : public CacheLineAligned { TopicEntry::Version version); /// Refresh the subscriber's last heartbeat timestamp to the current monotonic time. - void RefreshLastHeartbeatTimestamp(); + void RefreshLastHeartbeatTimestamp(bool received_heartbeat); /// Check if the subscriber is Catalog daemon. bool IsCatalogd() const { @@ -613,6 +622,8 @@ class Statestore : public CacheLineAligned { /// The timestamp of the last successful heartbeat in milliseconds. A timestamp much /// older than the heartbeat frequency implies an unresponsive subscriber. AtomicInt64 last_heartbeat_ts_{0}; + /// True if the subscriber received heartbeat. + AtomicBool received_heartbeat_{false}; /// Lock held when adding or deleting transient entries. See class comment for lock /// acquisition order. @@ -849,6 +860,7 @@ class Statestore : public CacheLineAligned { /// Metric that track the registered, non-failed subscribers. IntGauge* num_subscribers_metric_; SetMetric<std::string>* subscriber_set_metric_; + IntGauge* num_subscribers_received_heartbeat_metric_; /// Metrics shared across all topics to sum the size in bytes of keys, values and both IntGauge* key_size_metric_; diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index d77415dfc..1ec740834 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -2248,6 +2248,16 @@ "kind": "SET", "key": "statestore.live-backends.list" }, + { + "description": "The number of Statestore subscribers received heartbeat in last monitoring interval.", + "contexts": [ + "STATESTORE" + ], + "label": "Number of Statestore subscribers received heartbeat", + "units": "NONE", + "kind": "GAUGE", + "key": "statestore.subscribers-received-heartbeat" + }, { "description": "The time (sec) spent sending non-priority topic update RPCs. Includes subscriber-side processing time and network transmission time.", "contexts": [ diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 6ee77b592..35dcca7f7 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -23,7 +23,7 @@ import time from beeswaxd.BeeswaxService import QueryState from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.environ import build_flavor_timeout +from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties from tests.common.impala_cluster import ( DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT) from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster @@ -138,6 +138,39 @@ class TestStatestoredHA(CustomClusterTestSuite): self.execute_query_expect_success( self.client, "drop table if exists test_statestored_ha") + # Wait for long enough for the standby statestored to detect the failure of active + # statestored and assign itself with active role. + def __wait_statestore_to_be_active(self, statestore_service, statestored_index): + try: + statestore_service.wait_for_metric_value( + "statestore.active-status", expected_value=True, timeout=120) + except Exception as e: + num_subscribers = statestore_service.get_metric_value("statestore.live-backends") + num_subscribers_received_heartbeat = statestore_service.get_metric_value( + "statestore.subscribers-received-heartbeat") + if num_subscribers_received_heartbeat >= num_subscribers / 2: + assert False, str(e) + else: + num_failed = num_subscribers - num_subscribers_received_heartbeat + assert_string = "Standby statestored failed to send heartbeats to " +\ + "{0} out of {1} subscribers due to Thrift RPC failures".format( + num_failed, num_subscribers) + if ImpalaTestClusterProperties.get_instance().is_ubsan(): + # IMPALA-13388: Skip this tests for UBSAN build due to Thrift RPC failures. + # A following up JIRA IMPALA-13399 was filed to track the very root cause. + # Remove this skipping when JIRA IMPALA-13399 is resolved. + if statestored_index == 1: + daemon_name = "statestored_node1" + else: + daemon_name = "statestored" + # Check log file of statestored that it is indeed a Thrift RPC issue by + # verifying existence of the log message with following pattern. + log_regex = "Unable to send heartbeat message to subscriber .*, received " +\ + "error: RPC recv timed out: .*, rpc: N6impala18THeartbeatResponseE.*" + self.assert_log_contains(daemon_name, "INFO", log_regex, expected_count=-1) + pytest.skip("Skip this tests for UBSAN builds since " + assert_string) + assert False, assert_string + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true", @@ -255,8 +288,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself with active role. - statestore_service_1.wait_for_metric_value( - "statestore.active-status", expected_value=True, timeout=120) + self.__wait_statestore_to_be_active(statestore_service_1, 1) assert(statestore_service_1.get_metric_value("statestore.active-status")) sleep(1) @@ -331,6 +363,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2", impalad_args="--statestore_subscriber_timeout_seconds=2", catalogd_args="--statestore_subscriber_timeout_seconds=2", @@ -343,6 +376,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 " "--debug_actions=SEND_UPDATE_STATESTORED_RPC_FIRST_ATTEMPT:[email protected]", impalad_args="--statestore_subscriber_timeout_seconds=2", @@ -356,6 +390,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 " "--use_subscriber_id_as_catalogd_priority=true", impalad_args="--statestore_subscriber_timeout_seconds=2", @@ -374,6 +409,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 ", impalad_args="--statestore_subscriber_timeout_seconds=2", catalogd_args="--statestore_subscriber_timeout_seconds=2", @@ -410,8 +446,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself with active role. - statestore_service_1.wait_for_metric_value( - "statestore.active-status", expected_value=True, timeout=120) + self.__wait_statestore_to_be_active(statestore_service_1, 1) assert(statestore_service_1.get_metric_value("statestore.active-status")) sleep(1) @@ -458,8 +493,7 @@ class TestStatestoredHA(CustomClusterTestSuite): self.__disable_statestored_network(disable_network=True) # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself with active role. - statestore_service_1.wait_for_metric_value( - "statestore.active-status", expected_value=True, timeout=120) + self.__wait_statestore_to_be_active(statestore_service_1, 1) assert(statestore_service_1.get_metric_value("statestore.active-status")) # Verify that original active statestored is in HA recovery mode and is not active. statestore_service_0.wait_for_metric_value( @@ -500,6 +534,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=100 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 " "--debug_actions=DISABLE_STATESTORE_NETWORK", impalad_args="--statestore_subscriber_timeout_seconds=2", @@ -513,6 +548,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=100 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 " "--debug_actions=SEND_UPDATE_STATESTORED_RPC_FIRST_ATTEMPT:[email protected]", impalad_args="--statestore_subscriber_timeout_seconds=2", @@ -559,6 +595,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds=2 " "--debug_actions=DISABLE_STATESTORE_NETWORK", impalad_args="--statestore_subscriber_timeout_seconds=2", @@ -588,8 +625,7 @@ class TestStatestoredHA(CustomClusterTestSuite): # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself with active role. - statestore_service_1.wait_for_metric_value( - "statestore.active-status", expected_value=True, timeout=120) + self.__wait_statestore_to_be_active(statestore_service_1, 1) assert(statestore_service_1.get_metric_value("statestore.active-status")) # Verify that original active statestored is in HA recovery mode. statestore_service_0.wait_for_metric_value( @@ -645,6 +681,7 @@ class TestStatestoredHA(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true " "--statestore_ha_heartbeat_monitoring_frequency_ms=50 " + "--heartbeat_monitoring_frequency_ms=6000 " "--statestore_peer_timeout_seconds={timeout_s} " "--use_subscriber_id_as_catalogd_priority=true" .format(timeout_s=SS_PEER_TIMEOUT_S), @@ -684,8 +721,7 @@ class TestStatestoredHA(CustomClusterTestSuite): statestoreds[0].kill() # Wait for long enough for the standby statestored to detect the failure of active # statestored and assign itself in active role. - statestore_service_1.wait_for_metric_value( - "statestore.active-status", expected_value=True, timeout=120) + self.__wait_statestore_to_be_active(statestore_service_1, 1) assert (statestore_service_1.get_metric_value("statestore.active-status")), \ "Second statestored must be active now" statestore_service_1.wait_for_live_subscribers(5)
