This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fc66bfab IMPALA-14220 (part 2): Delay AcceptRequest until catalog is 
stable
5fc66bfab is described below

commit 5fc66bfabc7e5328f024287df7ac5e4aeadb87c5
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Jul 18 15:44:36 2025 -0700

    IMPALA-14220 (part 2): Delay AcceptRequest until catalog is stable
    
    CatalogD availability is improving since reading is_active_ no longer
    requires holding catalog_lock_. However, during a failover scenario,
    requests may slip into the passive-turn-to-active CatalogD and obtain
    stale metadata.
    
    This patch improves the situation in two steps. First, it adds a new
    mutex ha_transition_lock_ that must be obtained by AcceptRequest() in HA
    mode. This mutex protects both CatalogServer::WaitPendingResetStarts() and
    CatalogServer::UpdateActiveCatalogd(). WaitPendingResetStarts() will
    only exit and return to AcceptRequest() after the triggered_first_reset_
    flag is True (initial metadata reset has completed) or
    min_catalog_resets_to_serve_ is met. If only the latter happens,
    request will goes through the Catalog JVM and subsequently blocked by
    CatalogResetManager.waitOngoingMetadataFetch() until metadata reset has
    progress beyond requested database/table.
    
    Second, it increments numCatalogResetStarts_ on every global reset
    (Invalidate Metadata) initiated by catalog-server.cc.
    CatalogServer::MarkPendingMetadataReset() matches this logic to
    increment min_catalog_resets_to_serve_ before setting
    triggered_first_reset_ flag to False (consequently waking up
    TriggerResetMetadata thread).
    
    Rename WaitForCatalogReady() to
    WaitCatalogReadinessForWorkloadManagement() since this wait mechanism is
    specific to Workload Management initialization and has stricter
    requirements.
    
    Removed CatalogServer::IsActive() since the only call site is replaced
    with CatalogServer::WaitHATransition().
    
    Testing:
    Added test_metadata_after_failover_with_delayed_reset and
    test_metadata_after_failover_with_hms_sync.
    
    Change-Id: I370d21319335318e441ec3c3455bac4227803900
    Reviewed-on: http://gerrit.cloudera.org:8080/23194
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   | 132 ++++++++++++++-------
 be/src/catalog/catalog-server.h                    |  47 ++++++--
 be/src/catalog/catalog.cc                          |  10 ++
 be/src/catalog/catalog.h                           |   6 +
 be/src/catalog/catalogd-main.cc                    |   2 +-
 be/src/catalog/workload-management-init.cc         |   6 +-
 .../java/org/apache/impala/catalog/Catalog.java    |   3 -
 .../impala/catalog/CatalogServiceCatalog.java      |  32 ++---
 .../apache/impala/service/CatalogOpExecutor.java   |   5 +-
 .../java/org/apache/impala/service/JniCatalog.java |   4 +
 tests/custom_cluster/test_catalogd_ha.py           |  20 ++++
 11 files changed, 192 insertions(+), 75 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index f74e1419b..e4fa671c7 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -377,9 +377,6 @@ const string HADOOP_VARZ_TEMPLATE = "hadoop-varz.tmpl";
 const string HADOOP_VARZ_WEB_PAGE = "/hadoop-varz";
 
 const int REFRESH_METRICS_INTERVAL_MS = 1000;
-// Catalog version that signal that the first metadata reset has begun.
-// This should match Catalog.CATALOG_VERSION_AFTER_FIRST_RESET
-const int MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST = 100;
 
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
@@ -598,35 +595,17 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
  private:
   CatalogServer* catalog_server_;
   string server_address_;
-  AtomicBool has_initiated_first_reset_{false};
 
   // Check if catalog protocols are compatible between client and catalog 
server.
   // Return Status::OK() if the protocols are compatible and catalog server is 
active.
   Status AcceptRequest(CatalogServiceVersion::type client_version) {
     Status status = Status::OK();
     if (client_version < catalog_server_->GetProtocolVersion()) {
-      status = Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL, 
client_version + 1,
+      return Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL, client_version 
+ 1,
           catalog_server_->GetProtocolVersion() + 1);
-    } else if (FLAGS_enable_catalogd_ha && !catalog_server_->IsActive()) {
-      status = Status(Substitute("Request for Catalog service is rejected 
since "
-          "catalogd $0 is in standby mode", server_address_));
     }
-    if (!status.ok()) return status;
-
-    while (!has_initiated_first_reset_.Load()) {
-      long current_catalog_version = 0;
-      status = 
catalog_server_->catalog()->GetCatalogVersion(&current_catalog_version);
-      if (!status.ok()) break;
-      if (current_catalog_version >= MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST) {
-        has_initiated_first_reset_.Store(true);
-      } else {
-        VLOG(1) << "Catalog is not initialized yet. Waiting for catalog 
version ("
-                << current_catalog_version << ") to be >= "
-                << MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST;
-        SleepForMs(100);
-      }
-    }
-    return status;
+
+    return catalog_server_->WaitPendingMetadataResetStarts(server_address_);
   }
 };
 
@@ -635,11 +614,10 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
     thrift_iface_(new CatalogServiceThriftIf(this)),
     thrift_serializer_(FLAGS_compact_catalog_topic),
     metrics_(metrics),
-    is_active_{!FLAGS_enable_catalogd_ha},
+    is_active_(!FLAGS_enable_catalogd_ha),
     is_ha_determined_(!FLAGS_enable_catalogd_ha),
     topic_updates_ready_(false),
     last_sent_catalog_version_(0L),
-    triggered_first_reset_(false),
     catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
@@ -721,7 +699,7 @@ Status CatalogServer::Start() {
   }
 
   RETURN_IF_ERROR(statestore_subscriber_->Start());
-  if (FLAGS_force_catalogd_active && !IsActive()) {
+  if (FLAGS_force_catalogd_active && !is_active_.Load()) {
     // If both catalogd are started with 'force_catalogd_active' as true in 
short time,
     // the second election overwrite the first election. The one which 
registering with
     // statestore first will be inactive.
@@ -813,8 +791,55 @@ void CatalogServer::UpdateCatalogTopicCallback(
   catalog_update_cv_.NotifyOne();
 }
 
+Status CatalogServer::WaitPendingMetadataResetStarts(const string& 
server_address) {
+  long current_num_reset_starts = 0;
+  bool has_waited = false;
+
+  if (!FLAGS_enable_catalogd_ha) {
+    while (!triggered_pending_reset_.Load()) {
+      
RETURN_IF_ERROR(catalog_->GetNumCatalogResetStarts(&current_num_reset_starts));
+      if (current_num_reset_starts > 0) break;
+      if (!has_waited) {
+        has_waited = true;
+        VLOG(1) << "Catalog is not initialized yet. Waiting for catalog reset 
to start";
+      }
+      SleepForMs(100);
+    }
+    return Status::OK();
+  }
+
+  lock_guard<mutex> meta_lock(ha_transition_lock_);
+  while (true) {
+    // Return error if this catalogd is not active.
+    if (!is_active_.Load()) {
+      return Status(Substitute("Request for Catalog service is rejected since "
+                               "catalogd $0 is in standby mode",
+          server_address));
+    }
+
+    // Early return if first reset has completed.
+    // triggered_pending_reset_=true means initial reset has fully completed.
+    if (triggered_pending_reset_.Load()) return Status::OK();
+
+    // Initial reset maybe ongoing.
+    
RETURN_IF_ERROR(catalog_->GetNumCatalogResetStarts(&current_num_reset_starts));
+    int64_t min_catalog_resets_to_serve = min_catalog_resets_to_serve_.Load();
+    if (current_num_reset_starts >= min_catalog_resets_to_serve) break;
+    // Wait a bit until min_catalog_resets_to_serve_ is reached.
+    if (!has_waited) {
+      has_waited = true;
+      VLOG(1) << "Catalog is not initialized yet. Waiting for num of resets ("
+              << current_num_reset_starts << ") to be >= " << 
min_catalog_resets_to_serve;
+    }
+    SleepForMs(100);
+  }
+
+  return Status::OK();
+}
+
 void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
     int64_t active_catalogd_version, const TCatalogRegistration& 
catalogd_registration) {
+  lock_guard<mutex> meta_lock(ha_transition_lock_);
   unique_lock<mutex> l(catalog_lock_);
   if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
           is_registration_reply, active_catalogd_version)) {
@@ -841,8 +866,9 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
       catalog_->RegenerateServiceId();
       // Clear pending topic updates.
       pending_topic_updates_.clear();
+
       if (FLAGS_catalogd_ha_reset_metadata_on_failover) {
-        triggered_first_reset_ = false;
+        MarkPendingMetadataReset(l);
       } else {
         // Refresh DataSource objects when the catalogd becomes active.
         Status status = catalog_->RefreshDataSources();
@@ -894,7 +920,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
   Status status = catalog_->GetEventProcessorSummary(req, &response);
   DCHECK(status.ok());
   if (response.__isset.error_msg) {
-    triggered_first_reset_ = false;
+    MarkPendingMetadataReset(lock);
     LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata in 
failover";
     return;
   }
@@ -911,7 +937,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
     latest_hms_event_id = response.progress.latest_event_id;
   }
   if (latest_hms_event_id < 0) {
-    triggered_first_reset_ = false;
+    MarkPendingMetadataReset(lock);
     LOG(ERROR) << "Timed out getting the latest event id from HMS. Resetting 
all metadata"
                   "in failover";
     return;
@@ -931,7 +957,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
     status = catalog_->GetEventProcessorSummary(req, &response);
     DCHECK(status.ok());
     if (response.__isset.error_msg) {
-      triggered_first_reset_ = false;
+      MarkPendingMetadataReset(lock);
       LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata 
in "
                     "failover";
       return;
@@ -942,7 +968,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
   if (last_synced_hms_event_id < latest_hms_event_id) {
     LOG(WARNING) << "Timed out waiting for catching up HMS events.";
     if (FLAGS_catalogd_ha_reset_metadata_on_failover_catchup_timeout) {
-      triggered_first_reset_ = false;
+      MarkPendingMetadataReset(lock);
       LOG(INFO) << "Fallback to resetting all metadata.";
     } else {
       LOG(WARNING) << "Continue with the current cache. Metadata might be 
stale.";
@@ -953,6 +979,30 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
       << latest_hms_event_id;
 }
 
+void CatalogServer::MarkPendingMetadataReset(const unique_lock<std::mutex>& 
lock) {
+  DCHECK(lock.mutex() == &catalog_lock_ && lock.owns_lock())
+      << "Must hold catalog_lock_ to avoid concurrency with 
TriggerResetMetadata thread";
+  long current_catalog_resets;
+  Status status = catalog_->GetNumCatalogResetStarts(&current_catalog_resets);
+
+  if (status.ok()) {
+    min_catalog_resets_to_serve_.Store(current_catalog_resets + 1);
+    LOG(INFO) << "Marking pending metadata reset. Min number of catalog resets 
to serve: "
+              << min_catalog_resets_to_serve_.Load();
+  } else {
+    /// If somehow we failed to get the number, sets a conservative value.
+    min_catalog_resets_to_serve_.Add(1);
+    LOG(ERROR) << "Failed to get current catalog version: " << 
status.GetDetail()
+               << "Some requests might run on stale metadata.";
+  }
+  // Sets this after we updated min_catalog_resets_to_serve_ since the 
consumers check
+  // this before read/write on num of catalog resets. The consumers are
+  //  - Requests in WaitPendingMetadataResetStarts(). Read this and num of 
catalog resets
+  //    in JVM.
+  //  - TriggerResetMetadata thread. Reads this and updates num of catalog 
resets in JVM.
+  triggered_pending_reset_.Store(false);
+}
+
 [[noreturn]] void CatalogServer::TriggerResetMetadata() {
   while (true) {
     bool must_reset = false;
@@ -961,14 +1011,15 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
       while (!must_reset) {
         catalog_update_cv_.NotifyOne();
         catalog_update_cv_.Wait(unique_lock);
-        must_reset = is_ha_determined_ && !triggered_first_reset_;
+        must_reset = is_ha_determined_ && !triggered_pending_reset_.Load();
       }
     }
 
     // Run ResetMetadata without holding 'catalog_lock_' so that it does not 
block
     // gathering thread from starting. Note that gathering thread will still 
compete
     // for CatalogServiceCatalog.versionLock_.writeLock() in JVM.
-    VLOG(1) << "Triggering first catalog invalidation.";
+    DebugActionNoFail(FLAGS_debug_actions, "TRIGGER_RESET_METADATA_DELAY");
+    VLOG(1) << "(Re)Triggering first catalog invalidation.";
     TResetMetadataRequest req;
     TResetMetadataResponse resp;
     req.__set_header(TCatalogServiceRequestHeader());
@@ -983,16 +1034,12 @@ void CatalogServer::WaitUntilHmsEventsSynced(const 
unique_lock<std::mutex>& lock
     {
       // Mark to true, regardless of status.
       unique_lock<mutex> unique_lock(catalog_lock_);
-      triggered_first_reset_ = true;
+      triggered_pending_reset_.Store(true);
       catalog_update_cv_.NotifyOne();
     }
   }
 }
 
-bool CatalogServer::IsActive() {
-  return is_active_.Load();
-}
-
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
   // catalog_topic_mode=minimal does not require initial reset to happen ahead 
of catalog
   // update gathering because coordinator will request metadata on-demand.
@@ -1005,8 +1052,9 @@ bool CatalogServer::IsActive() {
     // processing a heartbeat.
     // If require_initial_reset is True, this thread need to let 
TriggerResetMetadata
     // thread to proceed first.
-    while (topic_updates_ready_ || (require_initial_reset && 
!triggered_first_reset_)) {
-      if (!triggered_first_reset_) {
+    while (topic_updates_ready_
+        || (require_initial_reset && !triggered_pending_reset_.Load())) {
+      if (!triggered_pending_reset_.Load()) {
         // Wake up TriggerResetMetadata thread.
         catalog_update_cv_.NotifyOne();
       }
@@ -1040,7 +1088,7 @@ bool CatalogServer::IsActive() {
 
     topic_processing_time_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 
* 1000.0));
     topic_updates_ready_ = true;
-    if (!triggered_first_reset_) catalog_update_cv_.NotifyOne();
+    if (!triggered_pending_reset_.Load()) catalog_update_cv_.NotifyOne();
   }
 }
 
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 5a01d7966..cb6b637a8 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -141,7 +141,7 @@ class CatalogServer {
   ///
   /// Returns `true` or `false` indicating if this catalogd is the active 
catalogd.
   /// If catalog HA is not enabled, returns `true`.
-  bool WaitForCatalogReady();
+  bool WaitCatalogReadinessForWorkloadManagement();
 
   // Initializes workload management by creating or upgrading the necessary 
database and
   // tables. Does not check if the current catalogd is active or if workload 
management is
@@ -219,14 +219,30 @@ class CatalogServer {
   /// Thread that periodically wakes up and refreshes certain Catalog metrics.
   std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
 
-  /// Protects is_active_, active_catalogd_version_checker_,
-  /// catalog_update_cv_, pending_topic_updates_, 
catalog_objects_to/from_version_,
-  /// last_sent_catalog_version, and is_ha_determined_.
-  std::mutex catalog_lock_;
+  /// Min number of started catalog resets that we are safe to serve requests. 
Set on
+  /// each call of MarkPendingMetadataReset. Inits to 1 to account for the 
initial reset
+  /// done in startup. Only used when catalogd HA is enabled.
+  AtomicInt64 min_catalog_resets_to_serve_ = 1;
 
   /// Set to true if this catalog instance is active.
   AtomicBool is_active_;
 
+  /// Mark if the pending metadata reset required in startup or HA transition 
has been
+  /// triggered.
+  AtomicBool triggered_pending_reset_ = false;
+
+  /// Protect metadata state change during HA transition.
+  /// If operation needs to obtain both catalog_lock_ and this, this lock must 
be
+  /// obtained first.
+  std::mutex ha_transition_lock_;
+
+  /// Protects several field members below.
+  std::mutex catalog_lock_;
+
+  /// 
-----------------------------------------------------------------------------
+  /// BEGIN: Members that must be protected by catalog_lock_.
+  /// 
-----------------------------------------------------------------------------
+
   /// Set to true after active catalog has been determined. Will be true if 
catalog ha
   /// is not enabled.
   bool is_ha_determined_;
@@ -256,15 +272,15 @@ class CatalogServer {
   /// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_.
   int64_t last_sent_catalog_version_;
 
-  /// Mark if the first metadata reset has been triggered.
-  /// Protected by the catalog_lock_.
-  bool triggered_first_reset_;
-
   /// The max catalog version in pending_topic_updates_. Set by the
   /// catalog_update_gathering_thread_ and protected by catalog_lock_.
   /// Value -1 means catalog_update_gathering_thread_ has not set it.
   int64_t catalog_objects_max_version_;
 
+  /// 
-----------------------------------------------------------------------------
+  /// END: Members that must be protected by catalog_lock_.
+  /// 
-----------------------------------------------------------------------------
+
   /// Called during each Statestore heartbeat and is responsible for updating 
the current
   /// set of catalog objects in the IMPALA_CATALOG_TOPIC. Responds to each 
heartbeat with a
   /// delta update containing the set of changes since the last heartbeat. 
This function
@@ -295,8 +311,17 @@ class CatalogServer {
   /// catalog_lock_.
   void WaitUntilHmsEventsSynced(const std::unique_lock<std::mutex>& lock);
 
-  /// Returns the current active status of the catalogd.
-  bool IsActive();
+  /// Request pending full metadata reset.
+  /// The actual reset will be performed by the TriggerResetMetadata thread 
later.
+  /// Set min_catalog_resets_to_serve_ to the current catalog resets + 1 to 
delay
+  /// AcceptRequest() until the reset operation begin in JVM.
+  void MarkPendingMetadataReset(const std::unique_lock<std::mutex>& lock);
+
+  /// If there is a pending metadata reset going to start, e.g. in the initial 
startup or
+  /// marked by MarkPendingMetadataReset(), wait until it actually starts. 
This means the
+  /// metadata reset thread has started and acquired the catalog version lock 
in JVM, the
+  /// request will proceed to wait inside JVM.
+  Status WaitPendingMetadataResetStarts(const std::string& server_address);
 
   /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
   /// to get the latest set of catalog objects that exist, along with some 
metadata on
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index c6af237ec..2993215eb 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -69,6 +69,7 @@ Catalog::Catalog() {
     {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getOperationUsage", "()[B", &get_operation_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
+    {"getNumCatalogResetStarts", "()J", &get_num_catalog_reset_starts_id_},
     {"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
     {"getEventProcessorSummary", "([B)[B", &get_event_processor_summary_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_},
@@ -128,6 +129,15 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
+Status Catalog::GetNumCatalogResetStarts(long* num) {
+  JNIEnv* jni_env = JniUtil::GetJNIEnv();
+  JniLocalFrame jni_frame;
+  RETURN_IF_ERROR(jni_frame.push(jni_env));
+  *num = jni_env->CallLongMethod(catalog_, get_num_catalog_reset_starts_id_);
+  RETURN_ERROR_IF_EXC(jni_env);
+  return Status::OK();
+}
+
 Status Catalog::GetCatalogDelta(CatalogServer* caller, int64_t from_version,
     TGetCatalogDeltaResponse* resp) {
   TGetCatalogDeltaRequest request;
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 47fb8d619..6a4a912b2 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -58,6 +58,11 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
+  /// Queries the catalog to get the number of reset() has been starts. 
Returns OK if the
+  /// operation was successful, otherwise a Status object with information on 
the error
+  /// will be returned.
+  Status GetNumCatalogResetStarts(long* num);
+
   /// Retrieves the catalog objects that were added/modified/deleted since 
version
   /// 'from_version'. Returns OK if the operation was successful, otherwise a 
Status
   /// object with information on the error will be returned. 'caller' is a 
pointer to
@@ -176,6 +181,7 @@ class Catalog {
   jmethodID get_partial_catalog_object_id_;  // 
JniCatalog.getPartialCatalogObject()
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
+  jmethodID get_num_catalog_reset_starts_id_;  // 
JniCatalog.getNumCatalogResetStarts()
   jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
   jmethodID get_operation_usage_id_; // JniCatalog.getOperationUsage()
   jmethodID get_catalog_server_metrics_; // 
JniCatalog.getCatalogServerMetrics()
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index 260fddcbd..598f305bb 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -104,7 +104,7 @@ int CatalogdMain(int argc, char** argv) {
   LOG(INFO) << "CatalogService started on port: " << 
FLAGS_catalog_service_port;
 
   if (FLAGS_enable_workload_mgmt) {
-    if (catalog_server.WaitForCatalogReady()) {
+    if (catalog_server.WaitCatalogReadinessForWorkloadManagement()) {
       ABORT_IF_ERROR(catalog_server.InitWorkloadManagement());
     } else {
       LOG(INFO) << "Skipping workload management initialization since catalogd 
HA is "
diff --git a/be/src/catalog/workload-management-init.cc 
b/be/src/catalog/workload-management-init.cc
index 8810cff57..df08b6f9a 100644
--- a/be/src/catalog/workload-management-init.cc
+++ b/be/src/catalog/workload-management-init.cc
@@ -507,14 +507,14 @@ inline bool CatalogServer::IsCatalogInitialized() {
   return last_sent_catalog_version_ > 0 || (is_ha_determined_ && 
!is_active_.Load());
 } // CatalogServer::IsCatalogInitialized
 
-bool CatalogServer::WaitForCatalogReady() {
+bool CatalogServer::WaitCatalogReadinessForWorkloadManagement() {
   while (!IsCatalogInitialized()) {
     LOG(INFO) << "Waiting for first catalog update";
     SleepForMs(WM_INIT_CHECK_SLEEP_MS);
   }
 
-  return IsActive();
-} // function CatalogServer::WaitForCatalogReady
+  return is_active_.Load();
+} // function CatalogServer::WaitCatalogReadinessForWorkloadManagement
 
 Status CatalogServer::InitWorkloadManagement() {
   DCHECK_NE(nullptr, thrift_iface_.get());
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index df07b63aa..b75d041b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -78,9 +78,6 @@ import com.google.common.base.Preconditions;
 public abstract class Catalog implements AutoCloseable {
   // Initial catalog version and ID.
   public final static long INITIAL_CATALOG_VERSION = 0L;
-  // Catalog version that signal that the first metadata reset has begun.
-  // This should match MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST.
-  public final static long CATALOG_VERSION_AFTER_FIRST_RESET = 100L;
   public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 
0L);
   public static final String DEFAULT_DB = "default";
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 73f89ddff..d81008a89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -302,6 +302,10 @@ public class CatalogServiceCatalog extends Catalog {
   // The catalog version when we ran reset() last time. Protected by 
versionLock_.
   private long lastResetStartVersion_ = INITIAL_CATALOG_VERSION;
 
+  // Number of reset() triggered from BE (catalog-server.cc) that has been 
started.
+  // It's increased everytime reset() starts holding versionLock_.
+  private AtomicLong numCatalogResetStarts_ = new AtomicLong(0);
+
   // Manages the scheduling of background table loading.
   private final TableLoadingMgr tableLoadingMgr_;
 
@@ -2153,7 +2157,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     long duration = System.currentTimeMillis() - startTime;
     if (!nativeFuncs.isEmpty() || !javaFuncs.isEmpty()) {
-      LOG.info("Loaded {} native {} and {} java functions for database {} in 
{} ms.",
+      LOG.info("Loaded {} native and {} java functions for database {} in {} 
ms.",
           nativeFuncs.size(), javaFuncs.size(), db.getName(), duration);
     }
   }
@@ -2383,7 +2387,7 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   public long reset(EventSequence catalogTimeline) throws CatalogException {
-    return reset(catalogTimeline, false);
+    return reset(catalogTimeline, false, false);
   }
 
   /**
@@ -2392,10 +2396,11 @@ public class CatalogServiceCatalog extends Catalog {
    * requesting impalad will use that version to determine when the
    * effects of reset have been applied to its local catalog cache.
    */
-  public long reset(EventSequence catalogTimeline, boolean isSyncDdl)
-      throws CatalogException {
+  public long reset(EventSequence catalogTimeline, boolean isSyncDdl,
+      boolean isCatalogServerRequest) throws CatalogException {
     long startVersion = getCatalogVersion();
-    LOG.info("Invalidating all metadata. Version: " + startVersion);
+    LOG.info("Invalidating all metadata. Version: " + startVersion
+        + ", IsCatalogServerRequest: " + isCatalogServerRequest);
     Stopwatch resetTimer = Stopwatch.createStarted();
     Stopwatch unlockedTimer = Stopwatch.createStarted();
     // First update the policy metadata.
@@ -2434,19 +2439,14 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.error("Couldn't identify the default FS. Cache Pool reader will be 
disabled.");
     }
     versionLock_.writeLock().lock();
+    // The BE only cares about resets triggered from itself. So not increasing 
this for
+    // user requests.
+    if (isCatalogServerRequest) numCatalogResetStarts_.incrementAndGet();
     try {
       resetManager_.waitFullMetadataFetch();
       unlockedTimer.stop();
       catalogTimeline.markEvent(GOT_CATALOG_VERSION_WRITE_LOCK);
-      // In case of an empty new catalog, the version should still change to 
reflect the
-      // reset operation itself and to unblock impalads by making the catalog 
version >
-      // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
-      if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) {
-        catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET;
-        LOG.info("First reset initiated. Version: " + catalogVersion_);
-      } else {
-        ++catalogVersion_;
-      }
+      catalogVersion_++;
 
       // Update data source, db and table metadata.
       // First, refresh DataSource objects from HMS and assign new versions.
@@ -3725,6 +3725,10 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  public long getNumCatalogResetStarts() {
+    return numCatalogResetStarts_.get();
+  }
+
   private void acquireVersionReadLock(EventSequence catalogTimeline) {
     versionLock_.readLock().lock();
     catalogTimeline.markEvent(GOT_CATALOG_VERSION_READ_LOCK);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 76040125f..adb8fe360 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7287,7 +7287,10 @@ public class CatalogOpExecutor {
     } else {
       // Invalidate the entire catalog if no table name is provided.
       Preconditions.checkArgument(!req.isIs_refresh());
-      resp.getResult().setVersion(catalog_.reset(catalogTimeline, 
req.isSync_ddl()));
+      boolean isCatalogServerRequest =
+          !req.header.isSetRequesting_user() && 
!req.header.isSetCoordinator_hostname();
+      resp.getResult().setVersion(
+          catalog_.reset(catalogTimeline, req.isSync_ddl(), 
isCatalogServerRequest));
       resp.getResult().setIs_invalidate(true);
     }
     catalogTimeline.markEvent("Finished resetMetadata request");
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 8248f2788..aae49245d 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -289,6 +289,10 @@ public class JniCatalog {
     return catalog_.getCatalogVersion();
   }
 
+  public long getNumCatalogResetStarts() {
+    return catalog_.getNumCatalogResetStarts();
+  }
+
   public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
       throws ImpalaException, TException {
     TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 289d8ec4f..8fd02b198 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -527,6 +527,26 @@ class TestCatalogdHA(CustomClusterTestSuite):
   def test_metadata_after_failover(self, unique_database):
     self._test_metadata_after_failover(unique_database)
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
+                  "--catalog_topic_mode=minimal "
+                  "--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@1000",
+    impalad_args="--use_local_catalog=true",
+    start_args="--enable_catalogd_ha")
+  def test_metadata_after_failover_with_delayed_reset(self, unique_database):
+    self._test_metadata_after_failover(unique_database)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--catalog_topic_mode=minimal "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
+    impalad_args="--use_local_catalog=true",
+    start_args="--enable_catalogd_ha")
+  def test_metadata_after_failover_with_hms_sync(self, unique_database):
+    self._test_metadata_after_failover(unique_database, skip_func_test=True)
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "

Reply via email to