This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 5fc66bfab IMPALA-14220 (part 2): Delay AcceptRequest until catalog is
stable
5fc66bfab is described below
commit 5fc66bfabc7e5328f024287df7ac5e4aeadb87c5
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Jul 18 15:44:36 2025 -0700
IMPALA-14220 (part 2): Delay AcceptRequest until catalog is stable
CatalogD availability is improving since reading is_active_ no longer
requires holding catalog_lock_. However, during a failover scenario,
requests may slip into the passive-turn-to-active CatalogD and obtain
stale metadata.
This patch improves the situation in two steps. First, it adds a new
mutex ha_transition_lock_ that must be obtained by AcceptRequest() in HA
mode. This mutex protects both CatalogServer::WaitPendingResetStarts() and
CatalogServer::UpdateActiveCatalogd(). WaitPendingResetStarts() will
only exit and return to AcceptRequest() after the triggered_first_reset_
flag is True (initial metadata reset has completed) or
min_catalog_resets_to_serve_ is met. If only the latter happens,
request will goes through the Catalog JVM and subsequently blocked by
CatalogResetManager.waitOngoingMetadataFetch() until metadata reset has
progress beyond requested database/table.
Second, it increments numCatalogResetStarts_ on every global reset
(Invalidate Metadata) initiated by catalog-server.cc.
CatalogServer::MarkPendingMetadataReset() matches this logic to
increment min_catalog_resets_to_serve_ before setting
triggered_first_reset_ flag to False (consequently waking up
TriggerResetMetadata thread).
Rename WaitForCatalogReady() to
WaitCatalogReadinessForWorkloadManagement() since this wait mechanism is
specific to Workload Management initialization and has stricter
requirements.
Removed CatalogServer::IsActive() since the only call site is replaced
with CatalogServer::WaitHATransition().
Testing:
Added test_metadata_after_failover_with_delayed_reset and
test_metadata_after_failover_with_hms_sync.
Change-Id: I370d21319335318e441ec3c3455bac4227803900
Reviewed-on: http://gerrit.cloudera.org:8080/23194
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/catalog/catalog-server.cc | 132 ++++++++++++++-------
be/src/catalog/catalog-server.h | 47 ++++++--
be/src/catalog/catalog.cc | 10 ++
be/src/catalog/catalog.h | 6 +
be/src/catalog/catalogd-main.cc | 2 +-
be/src/catalog/workload-management-init.cc | 6 +-
.../java/org/apache/impala/catalog/Catalog.java | 3 -
.../impala/catalog/CatalogServiceCatalog.java | 32 ++---
.../apache/impala/service/CatalogOpExecutor.java | 5 +-
.../java/org/apache/impala/service/JniCatalog.java | 4 +
tests/custom_cluster/test_catalogd_ha.py | 20 ++++
11 files changed, 192 insertions(+), 75 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index f74e1419b..e4fa671c7 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -377,9 +377,6 @@ const string HADOOP_VARZ_TEMPLATE = "hadoop-varz.tmpl";
const string HADOOP_VARZ_WEB_PAGE = "/hadoop-varz";
const int REFRESH_METRICS_INTERVAL_MS = 1000;
-// Catalog version that signal that the first metadata reset has begun.
-// This should match Catalog.CATALOG_VERSION_AFTER_FIRST_RESET
-const int MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST = 100;
// Implementation for the CatalogService thrift interface.
class CatalogServiceThriftIf : public CatalogServiceIf {
@@ -598,35 +595,17 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
private:
CatalogServer* catalog_server_;
string server_address_;
- AtomicBool has_initiated_first_reset_{false};
// Check if catalog protocols are compatible between client and catalog
server.
// Return Status::OK() if the protocols are compatible and catalog server is
active.
Status AcceptRequest(CatalogServiceVersion::type client_version) {
Status status = Status::OK();
if (client_version < catalog_server_->GetProtocolVersion()) {
- status = Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL,
client_version + 1,
+ return Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL, client_version
+ 1,
catalog_server_->GetProtocolVersion() + 1);
- } else if (FLAGS_enable_catalogd_ha && !catalog_server_->IsActive()) {
- status = Status(Substitute("Request for Catalog service is rejected
since "
- "catalogd $0 is in standby mode", server_address_));
}
- if (!status.ok()) return status;
-
- while (!has_initiated_first_reset_.Load()) {
- long current_catalog_version = 0;
- status =
catalog_server_->catalog()->GetCatalogVersion(¤t_catalog_version);
- if (!status.ok()) break;
- if (current_catalog_version >= MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST) {
- has_initiated_first_reset_.Store(true);
- } else {
- VLOG(1) << "Catalog is not initialized yet. Waiting for catalog
version ("
- << current_catalog_version << ") to be >= "
- << MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST;
- SleepForMs(100);
- }
- }
- return status;
+
+ return catalog_server_->WaitPendingMetadataResetStarts(server_address_);
}
};
@@ -635,11 +614,10 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
thrift_iface_(new CatalogServiceThriftIf(this)),
thrift_serializer_(FLAGS_compact_catalog_topic),
metrics_(metrics),
- is_active_{!FLAGS_enable_catalogd_ha},
+ is_active_(!FLAGS_enable_catalogd_ha),
is_ha_determined_(!FLAGS_enable_catalogd_ha),
topic_updates_ready_(false),
last_sent_catalog_version_(0L),
- triggered_first_reset_(false),
catalog_objects_max_version_(0L) {
topic_processing_time_metric_ =
StatsMetric<double>::CreateAndRegister(metrics,
CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
@@ -721,7 +699,7 @@ Status CatalogServer::Start() {
}
RETURN_IF_ERROR(statestore_subscriber_->Start());
- if (FLAGS_force_catalogd_active && !IsActive()) {
+ if (FLAGS_force_catalogd_active && !is_active_.Load()) {
// If both catalogd are started with 'force_catalogd_active' as true in
short time,
// the second election overwrite the first election. The one which
registering with
// statestore first will be inactive.
@@ -813,8 +791,55 @@ void CatalogServer::UpdateCatalogTopicCallback(
catalog_update_cv_.NotifyOne();
}
+Status CatalogServer::WaitPendingMetadataResetStarts(const string&
server_address) {
+ long current_num_reset_starts = 0;
+ bool has_waited = false;
+
+ if (!FLAGS_enable_catalogd_ha) {
+ while (!triggered_pending_reset_.Load()) {
+
RETURN_IF_ERROR(catalog_->GetNumCatalogResetStarts(¤t_num_reset_starts));
+ if (current_num_reset_starts > 0) break;
+ if (!has_waited) {
+ has_waited = true;
+ VLOG(1) << "Catalog is not initialized yet. Waiting for catalog reset
to start";
+ }
+ SleepForMs(100);
+ }
+ return Status::OK();
+ }
+
+ lock_guard<mutex> meta_lock(ha_transition_lock_);
+ while (true) {
+ // Return error if this catalogd is not active.
+ if (!is_active_.Load()) {
+ return Status(Substitute("Request for Catalog service is rejected since "
+ "catalogd $0 is in standby mode",
+ server_address));
+ }
+
+ // Early return if first reset has completed.
+ // triggered_pending_reset_=true means initial reset has fully completed.
+ if (triggered_pending_reset_.Load()) return Status::OK();
+
+ // Initial reset maybe ongoing.
+
RETURN_IF_ERROR(catalog_->GetNumCatalogResetStarts(¤t_num_reset_starts));
+ int64_t min_catalog_resets_to_serve = min_catalog_resets_to_serve_.Load();
+ if (current_num_reset_starts >= min_catalog_resets_to_serve) break;
+ // Wait a bit until min_catalog_resets_to_serve_ is reached.
+ if (!has_waited) {
+ has_waited = true;
+ VLOG(1) << "Catalog is not initialized yet. Waiting for num of resets ("
+ << current_num_reset_starts << ") to be >= " <<
min_catalog_resets_to_serve;
+ }
+ SleepForMs(100);
+ }
+
+ return Status::OK();
+}
+
void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
int64_t active_catalogd_version, const TCatalogRegistration&
catalogd_registration) {
+ lock_guard<mutex> meta_lock(ha_transition_lock_);
unique_lock<mutex> l(catalog_lock_);
if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
is_registration_reply, active_catalogd_version)) {
@@ -841,8 +866,9 @@ void CatalogServer::UpdateActiveCatalogd(bool
is_registration_reply,
catalog_->RegenerateServiceId();
// Clear pending topic updates.
pending_topic_updates_.clear();
+
if (FLAGS_catalogd_ha_reset_metadata_on_failover) {
- triggered_first_reset_ = false;
+ MarkPendingMetadataReset(l);
} else {
// Refresh DataSource objects when the catalogd becomes active.
Status status = catalog_->RefreshDataSources();
@@ -894,7 +920,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
Status status = catalog_->GetEventProcessorSummary(req, &response);
DCHECK(status.ok());
if (response.__isset.error_msg) {
- triggered_first_reset_ = false;
+ MarkPendingMetadataReset(lock);
LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata in
failover";
return;
}
@@ -911,7 +937,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
latest_hms_event_id = response.progress.latest_event_id;
}
if (latest_hms_event_id < 0) {
- triggered_first_reset_ = false;
+ MarkPendingMetadataReset(lock);
LOG(ERROR) << "Timed out getting the latest event id from HMS. Resetting
all metadata"
"in failover";
return;
@@ -931,7 +957,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
status = catalog_->GetEventProcessorSummary(req, &response);
DCHECK(status.ok());
if (response.__isset.error_msg) {
- triggered_first_reset_ = false;
+ MarkPendingMetadataReset(lock);
LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata
in "
"failover";
return;
@@ -942,7 +968,7 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
if (last_synced_hms_event_id < latest_hms_event_id) {
LOG(WARNING) << "Timed out waiting for catching up HMS events.";
if (FLAGS_catalogd_ha_reset_metadata_on_failover_catchup_timeout) {
- triggered_first_reset_ = false;
+ MarkPendingMetadataReset(lock);
LOG(INFO) << "Fallback to resetting all metadata.";
} else {
LOG(WARNING) << "Continue with the current cache. Metadata might be
stale.";
@@ -953,6 +979,30 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
<< latest_hms_event_id;
}
+void CatalogServer::MarkPendingMetadataReset(const unique_lock<std::mutex>&
lock) {
+ DCHECK(lock.mutex() == &catalog_lock_ && lock.owns_lock())
+ << "Must hold catalog_lock_ to avoid concurrency with
TriggerResetMetadata thread";
+ long current_catalog_resets;
+ Status status = catalog_->GetNumCatalogResetStarts(¤t_catalog_resets);
+
+ if (status.ok()) {
+ min_catalog_resets_to_serve_.Store(current_catalog_resets + 1);
+ LOG(INFO) << "Marking pending metadata reset. Min number of catalog resets
to serve: "
+ << min_catalog_resets_to_serve_.Load();
+ } else {
+ /// If somehow we failed to get the number, sets a conservative value.
+ min_catalog_resets_to_serve_.Add(1);
+ LOG(ERROR) << "Failed to get current catalog version: " <<
status.GetDetail()
+ << "Some requests might run on stale metadata.";
+ }
+ // Sets this after we updated min_catalog_resets_to_serve_ since the
consumers check
+ // this before read/write on num of catalog resets. The consumers are
+ // - Requests in WaitPendingMetadataResetStarts(). Read this and num of
catalog resets
+ // in JVM.
+ // - TriggerResetMetadata thread. Reads this and updates num of catalog
resets in JVM.
+ triggered_pending_reset_.Store(false);
+}
+
[[noreturn]] void CatalogServer::TriggerResetMetadata() {
while (true) {
bool must_reset = false;
@@ -961,14 +1011,15 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
while (!must_reset) {
catalog_update_cv_.NotifyOne();
catalog_update_cv_.Wait(unique_lock);
- must_reset = is_ha_determined_ && !triggered_first_reset_;
+ must_reset = is_ha_determined_ && !triggered_pending_reset_.Load();
}
}
// Run ResetMetadata without holding 'catalog_lock_' so that it does not
block
// gathering thread from starting. Note that gathering thread will still
compete
// for CatalogServiceCatalog.versionLock_.writeLock() in JVM.
- VLOG(1) << "Triggering first catalog invalidation.";
+ DebugActionNoFail(FLAGS_debug_actions, "TRIGGER_RESET_METADATA_DELAY");
+ VLOG(1) << "(Re)Triggering first catalog invalidation.";
TResetMetadataRequest req;
TResetMetadataResponse resp;
req.__set_header(TCatalogServiceRequestHeader());
@@ -983,16 +1034,12 @@ void CatalogServer::WaitUntilHmsEventsSynced(const
unique_lock<std::mutex>& lock
{
// Mark to true, regardless of status.
unique_lock<mutex> unique_lock(catalog_lock_);
- triggered_first_reset_ = true;
+ triggered_pending_reset_.Store(true);
catalog_update_cv_.NotifyOne();
}
}
}
-bool CatalogServer::IsActive() {
- return is_active_.Load();
-}
-
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
// catalog_topic_mode=minimal does not require initial reset to happen ahead
of catalog
// update gathering because coordinator will request metadata on-demand.
@@ -1005,8 +1052,9 @@ bool CatalogServer::IsActive() {
// processing a heartbeat.
// If require_initial_reset is True, this thread need to let
TriggerResetMetadata
// thread to proceed first.
- while (topic_updates_ready_ || (require_initial_reset &&
!triggered_first_reset_)) {
- if (!triggered_first_reset_) {
+ while (topic_updates_ready_
+ || (require_initial_reset && !triggered_pending_reset_.Load())) {
+ if (!triggered_pending_reset_.Load()) {
// Wake up TriggerResetMetadata thread.
catalog_update_cv_.NotifyOne();
}
@@ -1040,7 +1088,7 @@ bool CatalogServer::IsActive() {
topic_processing_time_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0
* 1000.0));
topic_updates_ready_ = true;
- if (!triggered_first_reset_) catalog_update_cv_.NotifyOne();
+ if (!triggered_pending_reset_.Load()) catalog_update_cv_.NotifyOne();
}
}
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 5a01d7966..cb6b637a8 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -141,7 +141,7 @@ class CatalogServer {
///
/// Returns `true` or `false` indicating if this catalogd is the active
catalogd.
/// If catalog HA is not enabled, returns `true`.
- bool WaitForCatalogReady();
+ bool WaitCatalogReadinessForWorkloadManagement();
// Initializes workload management by creating or upgrading the necessary
database and
// tables. Does not check if the current catalogd is active or if workload
management is
@@ -219,14 +219,30 @@ class CatalogServer {
/// Thread that periodically wakes up and refreshes certain Catalog metrics.
std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
- /// Protects is_active_, active_catalogd_version_checker_,
- /// catalog_update_cv_, pending_topic_updates_,
catalog_objects_to/from_version_,
- /// last_sent_catalog_version, and is_ha_determined_.
- std::mutex catalog_lock_;
+ /// Min number of started catalog resets that we are safe to serve requests.
Set on
+ /// each call of MarkPendingMetadataReset. Inits to 1 to account for the
initial reset
+ /// done in startup. Only used when catalogd HA is enabled.
+ AtomicInt64 min_catalog_resets_to_serve_ = 1;
/// Set to true if this catalog instance is active.
AtomicBool is_active_;
+ /// Mark if the pending metadata reset required in startup or HA transition
has been
+ /// triggered.
+ AtomicBool triggered_pending_reset_ = false;
+
+ /// Protect metadata state change during HA transition.
+ /// If operation needs to obtain both catalog_lock_ and this, this lock must
be
+ /// obtained first.
+ std::mutex ha_transition_lock_;
+
+ /// Protects several field members below.
+ std::mutex catalog_lock_;
+
+ ///
-----------------------------------------------------------------------------
+ /// BEGIN: Members that must be protected by catalog_lock_.
+ ///
-----------------------------------------------------------------------------
+
/// Set to true after active catalog has been determined. Will be true if
catalog ha
/// is not enabled.
bool is_ha_determined_;
@@ -256,15 +272,15 @@ class CatalogServer {
/// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_.
int64_t last_sent_catalog_version_;
- /// Mark if the first metadata reset has been triggered.
- /// Protected by the catalog_lock_.
- bool triggered_first_reset_;
-
/// The max catalog version in pending_topic_updates_. Set by the
/// catalog_update_gathering_thread_ and protected by catalog_lock_.
/// Value -1 means catalog_update_gathering_thread_ has not set it.
int64_t catalog_objects_max_version_;
+ ///
-----------------------------------------------------------------------------
+ /// END: Members that must be protected by catalog_lock_.
+ ///
-----------------------------------------------------------------------------
+
/// Called during each Statestore heartbeat and is responsible for updating
the current
/// set of catalog objects in the IMPALA_CATALOG_TOPIC. Responds to each
heartbeat with a
/// delta update containing the set of changes since the last heartbeat.
This function
@@ -295,8 +311,17 @@ class CatalogServer {
/// catalog_lock_.
void WaitUntilHmsEventsSynced(const std::unique_lock<std::mutex>& lock);
- /// Returns the current active status of the catalogd.
- bool IsActive();
+ /// Request pending full metadata reset.
+ /// The actual reset will be performed by the TriggerResetMetadata thread
later.
+ /// Set min_catalog_resets_to_serve_ to the current catalog resets + 1 to
delay
+ /// AcceptRequest() until the reset operation begin in JVM.
+ void MarkPendingMetadataReset(const std::unique_lock<std::mutex>& lock);
+
+ /// If there is a pending metadata reset going to start, e.g. in the initial
startup or
+ /// marked by MarkPendingMetadataReset(), wait until it actually starts.
This means the
+ /// metadata reset thread has started and acquired the catalog version lock
in JVM, the
+ /// request will proceed to wait inside JVM.
+ Status WaitPendingMetadataResetStarts(const std::string& server_address);
/// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
/// to get the latest set of catalog objects that exist, along with some
metadata on
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index c6af237ec..2993215eb 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -69,6 +69,7 @@ Catalog::Catalog() {
{"getCatalogUsage", "()[B", &get_catalog_usage_id_},
{"getOperationUsage", "()[B", &get_operation_usage_id_},
{"getCatalogVersion", "()J", &get_catalog_version_id_},
+ {"getNumCatalogResetStarts", "()J", &get_num_catalog_reset_starts_id_},
{"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
{"getEventProcessorSummary", "([B)[B", &get_event_processor_summary_},
{"prioritizeLoad", "([B)V", &prioritize_load_id_},
@@ -128,6 +129,15 @@ Status Catalog::GetCatalogVersion(long* version) {
return Status::OK();
}
+Status Catalog::GetNumCatalogResetStarts(long* num) {
+ JNIEnv* jni_env = JniUtil::GetJNIEnv();
+ JniLocalFrame jni_frame;
+ RETURN_IF_ERROR(jni_frame.push(jni_env));
+ *num = jni_env->CallLongMethod(catalog_, get_num_catalog_reset_starts_id_);
+ RETURN_ERROR_IF_EXC(jni_env);
+ return Status::OK();
+}
+
Status Catalog::GetCatalogDelta(CatalogServer* caller, int64_t from_version,
TGetCatalogDeltaResponse* resp) {
TGetCatalogDeltaRequest request;
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 47fb8d619..6a4a912b2 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -58,6 +58,11 @@ class Catalog {
/// Status object with information on the error will be returned.
Status GetCatalogVersion(long* version);
+ /// Queries the catalog to get the number of reset() has been starts.
Returns OK if the
+ /// operation was successful, otherwise a Status object with information on
the error
+ /// will be returned.
+ Status GetNumCatalogResetStarts(long* num);
+
/// Retrieves the catalog objects that were added/modified/deleted since
version
/// 'from_version'. Returns OK if the operation was successful, otherwise a
Status
/// object with information on the error will be returned. 'caller' is a
pointer to
@@ -176,6 +181,7 @@ class Catalog {
jmethodID get_partial_catalog_object_id_; //
JniCatalog.getPartialCatalogObject()
jmethodID get_catalog_delta_id_; // JniCatalog.getCatalogDelta()
jmethodID get_catalog_version_id_; // JniCatalog.getCatalogVersion()
+ jmethodID get_num_catalog_reset_starts_id_; //
JniCatalog.getNumCatalogResetStarts()
jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
jmethodID get_operation_usage_id_; // JniCatalog.getOperationUsage()
jmethodID get_catalog_server_metrics_; //
JniCatalog.getCatalogServerMetrics()
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index 260fddcbd..598f305bb 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -104,7 +104,7 @@ int CatalogdMain(int argc, char** argv) {
LOG(INFO) << "CatalogService started on port: " <<
FLAGS_catalog_service_port;
if (FLAGS_enable_workload_mgmt) {
- if (catalog_server.WaitForCatalogReady()) {
+ if (catalog_server.WaitCatalogReadinessForWorkloadManagement()) {
ABORT_IF_ERROR(catalog_server.InitWorkloadManagement());
} else {
LOG(INFO) << "Skipping workload management initialization since catalogd
HA is "
diff --git a/be/src/catalog/workload-management-init.cc
b/be/src/catalog/workload-management-init.cc
index 8810cff57..df08b6f9a 100644
--- a/be/src/catalog/workload-management-init.cc
+++ b/be/src/catalog/workload-management-init.cc
@@ -507,14 +507,14 @@ inline bool CatalogServer::IsCatalogInitialized() {
return last_sent_catalog_version_ > 0 || (is_ha_determined_ &&
!is_active_.Load());
} // CatalogServer::IsCatalogInitialized
-bool CatalogServer::WaitForCatalogReady() {
+bool CatalogServer::WaitCatalogReadinessForWorkloadManagement() {
while (!IsCatalogInitialized()) {
LOG(INFO) << "Waiting for first catalog update";
SleepForMs(WM_INIT_CHECK_SLEEP_MS);
}
- return IsActive();
-} // function CatalogServer::WaitForCatalogReady
+ return is_active_.Load();
+} // function CatalogServer::WaitCatalogReadinessForWorkloadManagement
Status CatalogServer::InitWorkloadManagement() {
DCHECK_NE(nullptr, thrift_iface_.get());
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index df07b63aa..b75d041b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -78,9 +78,6 @@ import com.google.common.base.Preconditions;
public abstract class Catalog implements AutoCloseable {
// Initial catalog version and ID.
public final static long INITIAL_CATALOG_VERSION = 0L;
- // Catalog version that signal that the first metadata reset has begun.
- // This should match MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST.
- public final static long CATALOG_VERSION_AFTER_FIRST_RESET = 100L;
public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L,
0L);
public static final String DEFAULT_DB = "default";
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 73f89ddff..d81008a89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -302,6 +302,10 @@ public class CatalogServiceCatalog extends Catalog {
// The catalog version when we ran reset() last time. Protected by
versionLock_.
private long lastResetStartVersion_ = INITIAL_CATALOG_VERSION;
+ // Number of reset() triggered from BE (catalog-server.cc) that has been
started.
+ // It's increased everytime reset() starts holding versionLock_.
+ private AtomicLong numCatalogResetStarts_ = new AtomicLong(0);
+
// Manages the scheduling of background table loading.
private final TableLoadingMgr tableLoadingMgr_;
@@ -2153,7 +2157,7 @@ public class CatalogServiceCatalog extends Catalog {
}
long duration = System.currentTimeMillis() - startTime;
if (!nativeFuncs.isEmpty() || !javaFuncs.isEmpty()) {
- LOG.info("Loaded {} native {} and {} java functions for database {} in
{} ms.",
+ LOG.info("Loaded {} native and {} java functions for database {} in {}
ms.",
nativeFuncs.size(), javaFuncs.size(), db.getName(), duration);
}
}
@@ -2383,7 +2387,7 @@ public class CatalogServiceCatalog extends Catalog {
}
public long reset(EventSequence catalogTimeline) throws CatalogException {
- return reset(catalogTimeline, false);
+ return reset(catalogTimeline, false, false);
}
/**
@@ -2392,10 +2396,11 @@ public class CatalogServiceCatalog extends Catalog {
* requesting impalad will use that version to determine when the
* effects of reset have been applied to its local catalog cache.
*/
- public long reset(EventSequence catalogTimeline, boolean isSyncDdl)
- throws CatalogException {
+ public long reset(EventSequence catalogTimeline, boolean isSyncDdl,
+ boolean isCatalogServerRequest) throws CatalogException {
long startVersion = getCatalogVersion();
- LOG.info("Invalidating all metadata. Version: " + startVersion);
+ LOG.info("Invalidating all metadata. Version: " + startVersion
+ + ", IsCatalogServerRequest: " + isCatalogServerRequest);
Stopwatch resetTimer = Stopwatch.createStarted();
Stopwatch unlockedTimer = Stopwatch.createStarted();
// First update the policy metadata.
@@ -2434,19 +2439,14 @@ public class CatalogServiceCatalog extends Catalog {
LOG.error("Couldn't identify the default FS. Cache Pool reader will be
disabled.");
}
versionLock_.writeLock().lock();
+ // The BE only cares about resets triggered from itself. So not increasing
this for
+ // user requests.
+ if (isCatalogServerRequest) numCatalogResetStarts_.incrementAndGet();
try {
resetManager_.waitFullMetadataFetch();
unlockedTimer.stop();
catalogTimeline.markEvent(GOT_CATALOG_VERSION_WRITE_LOCK);
- // In case of an empty new catalog, the version should still change to
reflect the
- // reset operation itself and to unblock impalads by making the catalog
version >
- // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
- if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) {
- catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET;
- LOG.info("First reset initiated. Version: " + catalogVersion_);
- } else {
- ++catalogVersion_;
- }
+ catalogVersion_++;
// Update data source, db and table metadata.
// First, refresh DataSource objects from HMS and assign new versions.
@@ -3725,6 +3725,10 @@ public class CatalogServiceCatalog extends Catalog {
}
}
+ public long getNumCatalogResetStarts() {
+ return numCatalogResetStarts_.get();
+ }
+
private void acquireVersionReadLock(EventSequence catalogTimeline) {
versionLock_.readLock().lock();
catalogTimeline.markEvent(GOT_CATALOG_VERSION_READ_LOCK);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 76040125f..adb8fe360 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7287,7 +7287,10 @@ public class CatalogOpExecutor {
} else {
// Invalidate the entire catalog if no table name is provided.
Preconditions.checkArgument(!req.isIs_refresh());
- resp.getResult().setVersion(catalog_.reset(catalogTimeline,
req.isSync_ddl()));
+ boolean isCatalogServerRequest =
+ !req.header.isSetRequesting_user() &&
!req.header.isSetCoordinator_hostname();
+ resp.getResult().setVersion(
+ catalog_.reset(catalogTimeline, req.isSync_ddl(),
isCatalogServerRequest));
resp.getResult().setIs_invalidate(true);
}
catalogTimeline.markEvent("Finished resetMetadata request");
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 8248f2788..aae49245d 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -289,6 +289,10 @@ public class JniCatalog {
return catalog_.getCatalogVersion();
}
+ public long getNumCatalogResetStarts() {
+ return catalog_.getNumCatalogResetStarts();
+ }
+
public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
throws ImpalaException, TException {
TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
diff --git a/tests/custom_cluster/test_catalogd_ha.py
b/tests/custom_cluster/test_catalogd_ha.py
index 289d8ec4f..8fd02b198 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -527,6 +527,26 @@ class TestCatalogdHA(CustomClusterTestSuite):
def test_metadata_after_failover(self, unique_database):
self._test_metadata_after_failover(unique_database)
+ @CustomClusterTestSuite.with_args(
+ statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+ catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
+ "--catalog_topic_mode=minimal "
+ "--debug_actions=TRIGGER_RESET_METADATA_DELAY:SLEEP@1000",
+ impalad_args="--use_local_catalog=true",
+ start_args="--enable_catalogd_ha")
+ def test_metadata_after_failover_with_delayed_reset(self, unique_database):
+ self._test_metadata_after_failover(unique_database)
+
+ @CustomClusterTestSuite.with_args(
+ statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+ catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+ "--catalog_topic_mode=minimal "
+ "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
+ impalad_args="--use_local_catalog=true",
+ start_args="--enable_catalogd_ha")
+ def test_metadata_after_failover_with_hms_sync(self, unique_database):
+ self._test_metadata_after_failover(unique_database, skip_func_test=True)
+
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "