This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit dfe47c639a53fb10feffc596d10ea369e1212c9b Author: wzhou-code <[email protected]> AuthorDate: Fri Jun 23 15:59:05 2023 -0700 IMPALA-12235: Fixed start-impala-cluster failure IMPALA-12150 added statestore_id in the structures of requests for StatestoreSubscriber service, and check statestore_id when handling requests from statestore. But it's possible the topic update messages are received before receiving the registration response. In the case, statestore_id is not received yet when handling topic update messages. We should handle the incoming messages instead of skipping those messages. Otherwise, subscibers may lose cluster membership messages during starting impala cluster. This patch also changes the default value of starting flag variable tolerate_statestore_startup_delay as true. Testing: - Passed the core-tests. Change-Id: Ifa5fe5e644b8c1662e3cc77aa724ed2690f83ae6 Reviewed-on: http://gerrit.cloudera.org:8080/20126 Reviewed-by: Andrew Sherman <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/common/global-flags.cc | 2 +- be/src/statestore/statestore-subscriber.cc | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 0b1515f4f..0c8d99e23 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -378,7 +378,7 @@ DEFINE_bool(pull_table_types_and_comments, false, "catalogd-only flag. Required if users want GET_TABLES requests return correct table " "types or comments."); -DEFINE_bool(tolerate_statestore_startup_delay, false, "If set to true, the subscriber " +DEFINE_bool(tolerate_statestore_startup_delay, true, "If set to true, the subscriber " "is able to tolerate the delay of the statestore's availability. The subscriber's " "process will not exit if it cannot register with the specified statestore on " "startup. But instead it enters into Recovery mode, it will loop, sleep and retry " diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 909914e6a..134c614f0 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -268,6 +268,8 @@ void StatestoreSubscriber::Heartbeat( // It's possible the heartbeat is received for previous registration. if (statestore_->IsMatchingStatestoreId(statestore_id)) { statestore_->Heartbeat(registration_id); + } else { + VLOG(3) << "Ignore heartbeat message from unknown statestored: " << statestore_id; } } @@ -277,6 +279,9 @@ void StatestoreSubscriber::UpdateCatalogd( const TUniqueId& statestore_id, int64 sequence) { if (statestore_->IsMatchingStatestoreId(statestore_id)) { statestore_->UpdateCatalogd(catalogd_registration, registration_id, sequence); + } else { + VLOG(3) << "Ignore updating catalogd message from unknown statestored: " + << statestore_id; } } @@ -286,6 +291,8 @@ Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_del if (statestore_->IsMatchingStatestoreId(statestore_id)) { return statestore_->UpdateState( incoming_topic_deltas, registration_id, subscriber_topic_updates, skipped); + } else { + VLOG(3) << "Ignore topic update message from unknown statestored: " << statestore_id; } return Status::OK(); } @@ -483,7 +490,7 @@ Status StatestoreSubscriber::StatestoreStub::Start(bool* has_active_catalogd, } else { LOG(INFO) << "statestore registration unsuccessful on startup: " << status.GetDetail(); - if (FLAGS_tolerate_statestore_startup_delay) { + if (FLAGS_tolerate_statestore_startup_delay && !TestInfo::is_be_test()) { LOG(INFO) << "Tolerate the delay of the statestore's availability on startup"; status = Status::OK(); } @@ -612,7 +619,11 @@ Status StatestoreSubscriber::StatestoreStub::CheckRegistrationIdAndUpdateCatalog bool StatestoreSubscriber::StatestoreStub::IsMatchingStatestoreId( const TUniqueId statestore_id) { lock_guard<mutex> r(id_lock_); - return statestore_id == statestore_id_; + // It's possible the topic update messages are received before receiving the + // registration response. In the case, statestore_id_ and is_registered_ are not set. + // TODO: need to revisit this when supporting statestored HA. + return statestore_id == statestore_id_ || + (!is_registered_ && statestore_id_.hi == 0 && statestore_id_.lo == 0); } void StatestoreSubscriber::StatestoreStub::Heartbeat(
