This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b410a90b4707be2e08dcb6c02b9473aa9ade8d96
Author: stiga-huang <[email protected]>
AuthorDate: Wed Jun 28 16:42:38 2023 +0800

    IMPALA-12152: Add query options to wait for HMS events sync up
    
    It's a common scenario to run Impala queries after the dependent
    external changes are done. E.g. running COMPUTE STATS on a table after
    Hive/Spark jobs ingest some new data to it. Currently, it's unsafe to
    run the Impala queries immediately after the Hive/Spark jobs finish
    since EventProcessor might have a long lag in applying the HMS events.
    Note that running REFRESH/INVALIDATE on the table can also solve the
    problem. But one of the motivation of EventProcessor is to get rid of
    such Impala specific commands.
    
    This patch adds a mechanism to let query planning wait until the
    metadata is synced up. Two new query options are added:
     - SYNC_HMS_EVENTS_WAIT_TIME_S configures the timeout in seconds for
       waiting. It's 0 by default, which disables the waiting mechanism.
     - SYNC_HMS_EVENTS_STRICT_MODE controls the behavior if we can't wait
       for metadata to be synced up, e.g. when the waiting times out or
       EventProcessor is in ERROR state. It defaults to false (non-strict
       mode). In the strict mode, coordinator will fail the query. In the
       non-strict mode, coordinator will start planning with a warning
       message in profile (and in client outputs if the client consumes the
       get_log results, e.g. in impala-shell).
    
    Example usage - query the table after inserting into dynamic partitions
    in Hive. We don't know what partitions are modified so running REFRESH
    in Impala is inefficient since it reloads all partitions.
      hive> insert into tbl partition(p) select * from tbl2;
      impala> set sync_hms_events_wait_time_s=300;
      impala> select * from tbl;
    With this new feature, let catalogd reload the updated partitions based
    on HMS events, which is more efficient than REFRESH. The wait time can
    be set to the largest lag of event processing that has been observed in
    the cluster. Note the lag of event processing is shown as the "Lag time"
    in the /events page of catalogd WebUI and "events-processor.lag-time" in
    the /metrics page. Users can monitor it to get a sense of the lag.
    
    Some timeline items are added in query profile for this waiting
    mechanism, e.g.
    A succeeded wait:
        Query Compilation: 937.279ms
           - Synced events from Metastore: 909.162ms (909.162ms)
           - Metadata of all 1 tables cached: 911.005ms (1.843ms)
           - Analysis finished: 919.600ms (8.595ms)
    
    A failed wait:
        Query Compilation: 1s321ms
           - Continuing without syncing Metastore events: 40.883ms (40.883ms)
           - Metadata load started: 41.618ms (735.633us)
    
    Added a histogram metric, impala-server.wait-for-hms-event-durations-ms,
    to track the duration of this waiting.
    
    --------
    Implementation
    
    A new catalogd RPC, WaitForHmsEvent, is added to CatalogService API so
    that coordinator can wait until catalogd processes the latest event when
    this RPC is triggered. Query planning starts or fails after this RPC
    returns. The RPC request contains the potential dbs/tables that are
    required by the query. Catalogd records the latest event id when it
    receives this RPC. When the last synced event id reaches this, catalogd
    returns the catalog updates to the coordinator in the RPC response.
    Before that, the RPC thread is in a waiting loop that sleeps in a
    configurable interval. It's configured by a hidden flag,
    hms_event_sync_sleep_interval_ms (defaults to 100).
    
    Entry-point functions
     - Frontend#waitForHmsEvents()
     - CatalogServiceCatalog#waitForHmsEvent()
    
    Some statements don't need to wait for HMS events, e.g. CREATE/DROP ROLE
    statements. This patch adds an overrided method, requiresHmsMetadata(),
    in each Statement to mark whether they can skip HMS event sync.
    
    Test side changes:
     - Some test codes use EventProcessorUtils.wait_for_event_processing()
       to wait for HMS events being synced up before running a query. Now
       they are updated to just use these new query options in the query.
     - Note that we still need wait_for_event_processing() in test codes
       that verify metrics after HMS events are synced up.
    
    --------
    Limitation
    
    Currently, UPDATE_TBL_COL_STAT_EVENT, UPDATE_PART_COL_STAT_EVENT,
    OPEN_TXN events are ignored by the event processor. If the latest event
    happens to be in these types and there are no more other events, the
    last synced event id can never reach the latest event id. We need to fix
    last synced event id to also consider ignored events (IMPALA-13623).
    
    The current implementation waits for the event id when the
    WaitForHmsEvent RPC is received at catalogd side. We can improve it
    by leveraging HIVE-27499 to efficiently detect whether the given
    dbs/tables have unsynced events and just wait for the *largest* id
    of them. Dbs/tables without unsynced events don't need to block query
    planning. However, this only works for non-transactional tables.
    Transactional tables might be modified by COMMIT_TXN or ABORT_TXN events
    which don't have the table names. So even with HIVE-27499, we can't
    determine whether a transactional table has pending events. IMPALA-13684
    will target on improving this on non-transactional tables.
    
    Tests
     - Add test to verify planning waits until catalogd is synced with HMS
       changes.
     - Add test on the error handling when HMS event processing is disabled
    
    Change-Id: I36ac941bb2c2217b09fcfa2eb567b011b38efa2a
    Reviewed-on: http://gerrit.cloudera.org:8080/20131
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  23 ++
 be/src/catalog/catalog-service-client-wrapper.h    |   8 +
 be/src/catalog/catalog.cc                          |   9 +
 be/src/catalog/catalog.h                           |   7 +
 be/src/exec/catalog-op-executor.cc                 |  20 ++
 be/src/exec/catalog-op-executor.h                  |   4 +
 be/src/service/fe-support.cc                       |  34 +++
 be/src/service/impala-server.cc                    |  37 ++--
 be/src/service/query-options.cc                    |  11 +
 be/src/service/query-options.h                     |   8 +-
 be/src/util/backend-gflag-util.cc                  |   2 +
 be/src/util/impalad-metrics.cc                     |   6 +
 be/src/util/impalad-metrics.h                      |   4 +
 common/thrift/BackendGflags.thrift                 |   2 +
 common/thrift/CatalogService.thrift                |  32 +++
 common/thrift/ImpalaService.thrift                 |  12 ++
 common/thrift/Query.thrift                         |   6 +
 common/thrift/metrics.json                         |  10 +
 .../org/apache/impala/analysis/AdminFnStmt.java    |   3 +
 .../org/apache/impala/analysis/AlterDbStmt.java    |   3 +
 .../java/org/apache/impala/analysis/Analyzer.java  |   9 +
 .../org/apache/impala/analysis/CreateDbStmt.java   |   2 +
 .../apache/impala/analysis/CreateDropRoleStmt.java |   3 +
 .../org/apache/impala/analysis/DescribeDbStmt.java |   2 +
 .../org/apache/impala/analysis/DropDbStmt.java     |   2 +
 .../impala/analysis/GrantRevokeRoleStmt.java       |   3 +
 .../java/org/apache/impala/analysis/SetStmt.java   |   3 +
 .../apache/impala/analysis/ShowDataSrcsStmt.java   |   3 +
 .../apache/impala/analysis/ShowFunctionsStmt.java  |   2 +
 .../impala/analysis/ShowGrantPrincipalStmt.java    |   4 +-
 .../org/apache/impala/analysis/ShowRolesStmt.java  |   3 +
 .../impala/analysis/ShowTablesOrViewsStmt.java     |   1 +
 .../org/apache/impala/analysis/StatementBase.java  |  13 ++
 .../java/org/apache/impala/catalog/Catalog.java    |   5 +
 .../org/apache/impala/catalog/CatalogDeltaLog.java |  51 ++++-
 .../impala/catalog/CatalogServiceCatalog.java      | 189 ++++++++++++++++
 .../org/apache/impala/catalog/ImpaladCatalog.java  |   3 +-
 .../catalog/events/ExternalEventsProcessor.java    |   5 +
 .../catalog/events/MetastoreEventsProcessor.java   |  68 +++++-
 .../java/org/apache/impala/service/FeSupport.java  |  22 ++
 .../java/org/apache/impala/service/Frontend.java   | 146 +++++++++++--
 .../java/org/apache/impala/service/JniCatalog.java |  13 ++
 tests/common/impala_test_suite.py                  |  64 +++---
 .../custom_cluster/test_event_processing_error.py  |  10 +-
 tests/custom_cluster/test_events_custom_configs.py | 107 ++++++++-
 .../test_hive_parquet_codec_interop.py             |   3 +-
 tests/custom_cluster/test_kudu.py                  |   9 +-
 tests/metadata/test_event_processing.py            | 150 +++++++++++--
 tests/metadata/test_event_processing_base.py       | 239 ++++++++++-----------
 tests/metadata/test_metadata_query_statements.py   |   8 +-
 tests/query_test/test_hive_codec_interop.py        |   2 +-
 51 files changed, 1158 insertions(+), 227 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 3c64be3f3..41615005a 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -99,6 +99,10 @@ DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time 
(in seconds) until "
      "before throwing an error indicating that not all the "
      "coordinators might have applied the changes caused due to the ddl.");
 
+DEFINE_int32_hidden(hms_event_sync_sleep_interval_ms, 100, "Sleep interval (in 
ms) "
+     "used in the thread of catalogd processing the WaitForHmsEvent RPC. The 
thread "
+     "sleeps for such an interval when checking for HMS events to be synced.");
+
 DECLARE_string(debug_actions);
 DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a 
HMS "
     "server at a port specified by hms_port flag");
@@ -465,6 +469,25 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     VLOG_RPC << "SetEventProcessorStatus(): response=" << 
ThriftDebugStringNoThrow(resp);
   }
 
+  void WaitForHmsEvent(TWaitForHmsEventResponse& resp,
+      const TWaitForHmsEventRequest& req) override {
+    VLOG_RPC << "WaitForHmsEvent(): request=" << ThriftDebugString(req);
+    Status status = AcceptRequest(req.protocol_version);
+    if (status.ok()) {
+      status = catalog_server_->catalog()->WaitForHmsEvent(req, &resp);
+    }
+    if (!status.ok()) {
+      LOG(WARNING) << status.GetDetail();
+      // Status in response is not set if the error is due to an exception.
+      if (resp.status.status_code == TErrorCode::OK) {
+        TStatus thrift_status;
+        status.ToThrift(&thrift_status);
+        resp.__set_status(thrift_status);
+      }
+    }
+    VLOG_RPC << "WaitForHmsEvent(): response.status=" << resp.status;
+  }
+
  private:
   CatalogServer* catalog_server_;
   string server_address_;
diff --git a/be/src/catalog/catalog-service-client-wrapper.h 
b/be/src/catalog/catalog-service-client-wrapper.h
index 1c1560493..6fe798b98 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -131,6 +131,14 @@ class CatalogServiceClientWrapper : public 
CatalogServiceClient {
     *send_done = true;
     recv_SetEventProcessorStatus(_return);
   }
+
+  void WaitForHmsEvent(TWaitForHmsEventResponse& _return,
+      const TWaitForHmsEventRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_WaitForHmsEvent(req);
+    *send_done = true;
+    recv_WaitForHmsEvent(_return);
+  }
 #pragma clang diagnostic pop
 };
 
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index a22d18ea5..cf329bcac 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -77,6 +77,7 @@ Catalog::Catalog() {
     {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_},
     {"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_},
     {"setEventProcessorStatus", "([B)[B", &set_event_processor_status_id_},
+    {"waitForHmsEvent", "([B)[B", &wait_for_hms_event_id_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -244,3 +245,11 @@ Status Catalog::SetEventProcessorStatus(
     const TSetEventProcessorStatusRequest& req, 
TSetEventProcessorStatusResponse* resp) {
   return JniUtil::CallJniMethod(catalog_, set_event_processor_status_id_, req, 
resp);
 }
+
+Status Catalog::WaitForHmsEvent(const TWaitForHmsEventRequest& req,
+    TWaitForHmsEventResponse* resp) {
+  if (req.header.__isset.query_id) {
+    GetThreadDebugInfo()->SetQueryId(req.header.query_id);
+  }
+  return JniUtil::CallJniMethod(catalog_, wait_for_hms_event_id_, req, resp);
+}
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index d871f26d2..17b95abb0 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -152,6 +152,7 @@ class Catalog {
   /// Returns OK if the refreshing was successful, otherwise a Status object 
with
   /// information on the error will be returned.
   Status RefreshDataSources();
+
   /// Returns all Hadoop configurations in key, value form in result.
   Status GetAllHadoopConfigs(TGetAllHadoopConfigsResponse* result);
 
@@ -159,6 +160,11 @@ class Catalog {
   Status SetEventProcessorStatus(
       const TSetEventProcessorStatusRequest& req, 
TSetEventProcessorStatusResponse* resp);
 
+  /// Waits until catalogd processes the latest HMS event and get the catalog 
version
+  /// that catches up the metadata changes.
+  Status WaitForHmsEvent(const TWaitForHmsEventRequest& req,
+      TWaitForHmsEventResponse* resp);
+
  private:
   jobject catalog_;  // instance of org.apache.impala.service.JniCatalog
   jmethodID update_metastore_id_;  // JniCatalog.updateMetaastore()
@@ -187,6 +193,7 @@ class Catalog {
   jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions()
   jmethodID get_hadoop_configs_id_;  // JniCatalog.getAllHadoopConfigs()
   jmethodID set_event_processor_status_id_; // 
JniCatalog.setEventProcessorStatus()
+  jmethodID wait_for_hms_event_id_; // JniCatalog.waitForHmsEvent()
 };
 
 }
diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index cbaaf2694..b14a57348 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -29,6 +29,8 @@
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/debug-util.h"
+#include "util/histogram-metric.h"
+#include "util/impalad-metrics.h"
 #include "util/runtime-profile-counters.h"
 #include "util/string-parser.h"
 #include "util/test-info.h"
@@ -491,3 +493,21 @@ Status CatalogOpExecutor::SetEventProcessorStatus(
   if (result->status.status_code != TErrorCode::OK) return 
Status(result->status);
   return Status::OK();
 }
+
+Status CatalogOpExecutor::WaitForHmsEvent(const TWaitForHmsEventRequest& req,
+    TWaitForHmsEventResponse* resp) {
+  int attempt = 0; // Used for debug action only.
+  MonotonicStopWatch sw;
+  sw.Start();
+  CatalogServiceConnection::RpcStatus rpc_status =
+      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+          *ExecEnv::GetInstance()->GetCatalogdAddress().get(),
+          &CatalogServiceClientWrapper::WaitForHmsEvent, req,
+          FLAGS_catalog_client_connection_num_retries,
+          FLAGS_catalog_client_rpc_retry_interval_ms,
+          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp);
+  RETURN_IF_ERROR(rpc_status.status);
+  ImpaladMetrics::WAIT_FOR_HMS_EVENT_DURATIONS->Update(
+    sw.ElapsedTime() / NANOS_PER_MICRO / MICROS_PER_MILLI);
+  return Status(resp->status);
+}
diff --git a/be/src/exec/catalog-op-executor.h 
b/be/src/exec/catalog-op-executor.h
index f93acffeb..33eabfb3a 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -110,6 +110,10 @@ class CatalogOpExecutor {
   /// a pointer to the profile of the execution in catalogd.
   const TRuntimeProfileNode* catalog_profile() const { return 
catalog_profile_.get(); }
 
+  /// Makes an RPC to the catalog server to wait until it processes the latest 
HMS event.
+  Status WaitForHmsEvent(const TWaitForHmsEventRequest& req,
+      TWaitForHmsEventResponse* resp);
+
  private:
   /// Helper functions used in ExecComputeStats() for setting the thrift 
structs in params
   /// for the table/column stats based on the results of the corresponding 
child query.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 3182f320c..9c900ab0b 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -765,6 +765,36 @@ 
Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions(
   return result_bytes;
 }
 
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents(JNIEnv* env,
+    jclass fe_support_class, jbyteArray thrift_request, jbyteArray 
thrift_query_options) {
+  TWaitForHmsEventRequest request;
+  TQueryOptions query_options;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_request, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_options, 
&query_options), env,
+      JniUtil::internal_exc_class(), nullptr);
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, 
nullptr);
+  TWaitForHmsEventResponse response;
+  Status status = catalog_op_executor.WaitForHmsEvent(request, &response);
+  TStatus result;
+  jbyteArray result_bytes = nullptr;
+  if (!status.ok()) {
+    status.ToThrift(&result);
+    THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+        JniUtil::internal_exc_class(), result_bytes);
+    return result_bytes;
+  }
+  ImpalaServer* server = ExecEnv::GetInstance()->impala_server();
+  DCHECK(server != nullptr);
+  status = server->ProcessCatalogUpdateResult(response.result,
+      /*wait_for_all_subscribers*/false, query_options, /*timeline*/nullptr);
+  status.ToThrift(&result);
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -856,6 +886,10 @@ static JNINativeMethod native_methods[] = {
     const_cast<char*>("NativeNumLiveQueries"), const_cast<char*>("()J"),
     (void*)::Java_org_apache_impala_service_FeSupport_NativeNumLiveQueries
   },
+  {
+    const_cast<char*>("NativeWaitForHmsEvents"), const_cast<char*>("([B[B)[B"),
+    (void*)::Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index dbc20bc4f..bd70838c4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1583,8 +1583,9 @@ void ImpalaServer::UpdateExecSummary(const QueryHandle& 
query_handle) const {
   query_handle->summary_profile()->SetTExecSummary(t_exec_summary);
   string exec_summary = PrintExecSummary(t_exec_summary);
   query_handle->summary_profile()->AddInfoStringRedacted("ExecSummary", 
exec_summary);
-  query_handle->summary_profile()->AddInfoStringRedacted("Errors",
-      query_handle->GetCoordinator()->GetErrorLog());
+  vector<string> errors = query_handle->GetAnalysisWarnings();
+  errors.emplace_back(query_handle->GetCoordinator()->GetErrorLog());
+  query_handle->summary_profile()->AddInfoStringRedacted("Errors", 
join(errors, "\n"));
 }
 
 Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool 
check_inflight,
@@ -2306,6 +2307,13 @@ void ImpalaServer::CatalogUpdateCallback(
   catalog_version_update_cv_.NotifyAll();
 }
 
+static inline void MarkTimelineEvent(RuntimeProfile::EventSequence* timeline,
+    const string& str) {
+  if (timeline != nullptr) {
+    timeline->MarkEvent(str);
+  }
+}
+
 void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
     const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* 
timeline) {
   unique_lock<mutex> unique_lock(catalog_version_lock_);
@@ -2318,10 +2326,12 @@ void ImpalaServer::WaitForCatalogUpdate(const int64_t 
catalog_update_version,
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
-    timeline->MarkEvent("Detected change in catalog service ID");
-    VLOG_QUERY << "Detected change in catalog service ID";
+    MarkTimelineEvent(timeline, "Catalog service ID changed when waiting for "
+        "catalog update to arrive");
+    VLOG_QUERY << "Detected catalog service ID changed when waiting for "
+        "catalog update to arrive";
   } else {
-    timeline->MarkEvent(Substitute("Applied catalog version $0",
+    MarkTimelineEvent(timeline, Substitute("Applied catalog version $0",
         catalog_update_version));
     VLOG_QUERY << "Applied catalog version: " << catalog_update_version;
   }
@@ -2341,11 +2351,14 @@ void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
-    timeline->MarkEvent("Detected change in catalog service ID");
-    VLOG_QUERY << "Detected change in catalog service ID";
+    MarkTimelineEvent(timeline, "Catalog service ID changed when waiting for "
+        "catalog propagation");
+    VLOG_QUERY << "Detected catalog service ID changed when waiting for "
+        "catalog propagation";
   } else {
-    timeline->MarkEvent(Substitute("Min catalog topic version of coordinators 
reached $0",
-        min_req_subscriber_topic_version));
+    MarkTimelineEvent(timeline,
+        Substitute("Min catalog topic version of coordinators reached $0",
+            min_req_subscriber_topic_version));
     VLOG_QUERY << "Min catalog topic version of coordinators: "
         << min_req_subscriber_topic_version;
   }
@@ -2368,10 +2381,10 @@ void ImpalaServer::WaitForMinCatalogUpdate(const 
int64_t min_req_catalog_object_
   }
 
   if (catalog_update_info_.catalog_service_id != catalog_service_id) {
-    timeline->MarkEvent("Detected change in catalog service ID");
+    MarkTimelineEvent(timeline, "Detected change in catalog service ID");
     VLOG_QUERY << "Detected change in catalog service ID";
   } else {
-    timeline->MarkEvent(Substitute("Local min catalog version reached $0",
+    MarkTimelineEvent(timeline, Substitute("Local min catalog version reached 
$0",
         min_req_catalog_object_version));
     VLOG_QUERY << "Updated catalog object version lower bound: "
         << min_req_catalog_object_version;
@@ -2429,7 +2442,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       // Apply the changes to the local catalog cache.
       TUpdateCatalogCacheResponse resp;
       Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, 
&resp);
-      timeline->MarkEvent("Applied catalog updates from DDL");
+      MarkTimelineEvent(timeline, "Applied catalog updates from DDL");
       if (!status.ok()) LOG(ERROR) << status.GetDetail();
       RETURN_IF_ERROR(status);
     } else {
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5da1c6eb8..5a58a09cf 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1334,6 +1334,17 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type 
option, const string& va
         
query_options->__set_use_legacy_hive_timestamp_conversion(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::SYNC_HMS_EVENTS_WAIT_TIME_S: {
+        int32_t time_s = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &time_s));
+        query_options->__set_sync_hms_events_wait_time_s(time_s);
+        break;
+      }
+      case TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE: {
+        query_options->__set_sync_hms_events_strict_mode(IsTrue(value));
+        break;
+      }
       default:
         string key = to_string(option);
         if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 6ce06aea7..acd0e213f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -51,7 +51,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // plus one. Thus, the second argument to the DCHECK has to be updated every
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 constexpr unsigned NUM_QUERY_OPTIONS =
-    TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION + 1;
+    TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE + 1;
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS);   
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -365,7 +365,11 @@ constexpr unsigned NUM_QUERY_OPTIONS =
   QUERY_OPT_FN(estimate_duplicate_in_preagg,                                   
          \
       ESTIMATE_DUPLICATE_IN_PREAGG, TQueryOptionLevel::ADVANCED)               
          \
   QUERY_OPT_FN(use_legacy_hive_timestamp_conversion,                           
          \
-      USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED)
+      USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED)       
          \
+  QUERY_OPT_FN(sync_hms_events_wait_time_s, SYNC_HMS_EVENTS_WAIT_TIME_S,       
          \
+      TQueryOptionLevel::ADVANCED)                                             
          \
+  QUERY_OPT_FN(sync_hms_events_strict_mode, SYNC_HMS_EVENTS_STRICT_MODE,       
          \
+      TQueryOptionLevel::ADVANCED)                                             
          \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index ccc8feb95..cff239fa7 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -132,6 +132,7 @@ DECLARE_int32(dbcp_max_wait_millis_for_conn);
 DECLARE_int32(dbcp_data_source_idle_timeout_s);
 DECLARE_bool(enable_catalogd_ha);
 DECLARE_string(injected_group_members_debug_only);
+DECLARE_int32(hms_event_sync_sleep_interval_ms);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -509,6 +510,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_is_release_build(false);
 #endif
   cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
+  
cfg.__set_hms_event_sync_sleep_interval_ms(FLAGS_hms_event_sync_sleep_interval_ms);
   return Status::OK();
 }
 
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 2e85d76b2..8ff0fe1ef 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -178,6 +178,8 @@ const char* 
ImpaladMetricKeys::COMPLETED_QUERIES_MAX_RECORDS_WRITES =
     "impala-server.completed-queries.max-records-writes";
 const char* ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS =
     "impala-server.completed-queries.write-durations";
+const char* ImpaladMetricKeys::WAIT_FOR_HMS_EVENT_DURATIONS =
+    "impala-server.wait-for-hms-event-durations-ms";
 const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL = 
"impala.debug_action.fail";
 const char* ImpaladMetricKeys::QUERY_LOG_EST_TOTAL_BYTES =
     "impala-server.query-log-est-total-bytes";
@@ -275,6 +277,7 @@ StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS = 
nullptr;
 HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr;
 HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr;
 HistogramMetric* ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS = nullptr;
+HistogramMetric* ImpaladMetrics::WAIT_FOR_HMS_EVENT_DURATIONS = nullptr;
 
 // Other
 StatsMetric<uint64_t, StatsType::MEAN>*
@@ -465,6 +468,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   COMPLETED_QUERIES_WRITE_DURATIONS = m->RegisterMetric(new HistogramMetric(
       MetricDefs::Get(ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS),
       FIVE_HOURS_IN_MS, 3));
+  WAIT_FOR_HMS_EVENT_DURATIONS = m->RegisterMetric(new HistogramMetric(
+      MetricDefs::Get(ImpaladMetricKeys::WAIT_FOR_HMS_EVENT_DURATIONS),
+      FIVE_HOURS_IN_MS, 3));
 
   // Initialize Hedged read metrics
   HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 2dcf2c568..a383a1fca 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -287,6 +287,9 @@ class ImpaladMetricKeys {
 
   /// Time spent writing completed queries to the query log table.
   static const char* COMPLETED_QUERIES_WRITE_DURATIONS;
+
+  /// Time spent in the WaitForHmsEvent catalogd RPC.
+  static const char* WAIT_FOR_HMS_EVENT_DURATIONS;
 };
 
 /// Global impalad-wide metrics.  This is useful for objects that want to 
update metrics
@@ -387,6 +390,7 @@ class ImpaladMetrics {
   static HistogramMetric* QUERY_DURATIONS;
   static HistogramMetric* DDL_DURATIONS;
   static HistogramMetric* COMPLETED_QUERIES_WRITE_DURATIONS;
+  static HistogramMetric* WAIT_FOR_HMS_EVENT_DURATIONS;
 
   // Other
   static StatsMetric<uint64_t, StatsType::MEAN>* 
IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO;
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index cfa7111ef..c04e08754 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -316,4 +316,6 @@ struct TBackendGflags {
   142: required bool enable_reading_puffin_stats
 
   143: required string injected_group_members_debug_only
+
+  144: required i32 hms_event_sync_sleep_interval_ms
 }
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index 807637ac1..f57f62ec9 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -772,6 +772,34 @@ struct TSetEventProcessorStatusResponse {
   2: optional string info
 }
 
+struct TWaitForHmsEventRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2
+
+  // Common header included in all CatalogService requests.
+  2: required TCatalogServiceRequestHeader header
+
+  // Timeout in seconds for waiting catalogd to catch up the latest event when 
it
+  // receives this request.
+  3: required i32 timeout_s
+
+  // Set to true for SHOW DATABASES statements.
+  4: required bool want_db_list
+
+  // Set to true for SHOW TABLES/VIEWS statements.
+  5: required bool want_table_list
+
+  // Descriptors of catalog objects that might be used by the query.
+  6: optional list<CatalogObjects.TCatalogObject> object_descs
+}
+
+struct TWaitForHmsEventResponse {
+  // The status of the operation, OK if the operation was successful.
+  1: required Status.TStatus status
+
+  // Catalog updates that should be applied on coordinator side
+  2: optional TCatalogUpdateResult result
+}
+
 // The CatalogService API
 service CatalogService {
   // Executes a DDL request and returns details on the result of the operation.
@@ -816,4 +844,8 @@ service CatalogService {
   // Update the status of EventProcessor.
   TSetEventProcessorStatusResponse SetEventProcessorStatus(
       1: TSetEventProcessorStatusRequest req);
+
+  // Waits until catalogd processes the latest HMS event and get the catalog 
version
+  // to catch up the metadata changes.
+  TWaitForHmsEventResponse WaitForHmsEvent(1: TWaitForHmsEventRequest req);
 }
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index b28754848..f4ac57a95 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -986,6 +986,18 @@ enum TImpalaQueryOptions {
   // method and they produce the same results for modern time periods (post 
1970, and in
   // most instances before that).
   USE_LEGACY_HIVE_TIMESTAMP_CONVERSION = 186
+
+  // Maximum time in seconds to wait for catalogd catching up HMS events 
before query
+  // planning. Only events generated before the query is submitted will be 
waited for.
+  // Defaults to 0 which disables waiting. Please only use this on queries 
that depend on
+  // external modifications. Don't set it cluster wide since it impacts 
performance.
+  SYNC_HMS_EVENTS_WAIT_TIME_S = 187
+
+  // Whether to fail the query if coordinator fails to wait for HMS events to 
be synced
+  // in catalogd, e.g. when event-processor is in ERROR state or timed out 
waiting for
+  // SYNC_HMS_EVENTS_WAIT_TIME_S seconds. Defaults to false, which means 
coordinator will
+  // start query planning regardless of the failure.
+  SYNC_HMS_EVENTS_STRICT_MODE = 188
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 0e8458082..ccbff8f99 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -762,6 +762,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   187: optional bool use_legacy_hive_timestamp_conversion = false;
+
+  // See comment in ImpalaService.thrift
+  188: optional i32 sync_hms_events_wait_time_s = 0
+
+  // See comment in ImpalaService.thrift
+  189: optional bool sync_hms_events_strict_mode = false
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index fd74c8e7e..f7d03d1a7 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -3033,6 +3033,16 @@
     "kind": "HISTOGRAM",
     "key": "impala-server.ddl-durations-ms"
   },
+  {
+    "description": "Distribution of WaitForHmsEvent RPC latencies",
+    "contexts": [
+        "IMPALAD"
+    ],
+    "label": "WaitForHmsEvent RPC distribution",
+    "units": "TIME_MS",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.wait-for-hms-event-durations-ms"
+  },
   {
     "description": "Number of currently cached HDFS file handles in the IO 
manager.",
     "contexts": [
diff --git a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
index 90561df99..ac857db58 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
@@ -201,4 +201,7 @@ public class AdminFnStmt extends StatementBase {
       event_id_ = params_.get(1).evalToInteger(analyzer, "event_id");
     }
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java
index b69cc4968..e477f9795 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java
@@ -43,4 +43,7 @@ public abstract class AlterDbStmt extends StatementBase {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     analyzer.getDb(dbName_, Privilege.ALTER);
   }
+
+  @Override
+  public String getParsedDb() { return dbName_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 3d54f23a6..7ba4a488e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -4164,6 +4164,15 @@ public class Analyzer {
     globalState_.warnings.put(msg, count + 1);
   }
 
+  /**
+   * Add a batch of warnings based on addWarning().
+   */
+  public void addWarnings(Iterable<String> warnings) {
+    for (String msg : warnings) {
+      addWarning(msg);
+    }
+  }
+
   /**
    * 'addWarning' method may be called after the warnings are retrieved, e.g. 
in
    * analyzing some substituted/cloned predicates (IMPALA-11021). We need to 
make sure
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
index 538c314f1..40c6e9417 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java
@@ -64,6 +64,8 @@ public class CreateDbStmt extends StatementBase {
   }
 
   public String getComment() { return comment_; }
+  @Override
+  public String getParsedDb() { return dbName_; }
   public String getDb() { return dbName_; }
   public boolean getIfNotExists() { return ifNotExists_; }
 
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
index e85c0ab08..36fe0e362 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java
@@ -52,4 +52,7 @@ public class CreateDropRoleStmt extends AuthorizationStmt {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
index af5182ab9..a3f04e1b7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
@@ -55,6 +55,8 @@ public class DescribeDbStmt extends StatementBase {
     return sb.toString() + dbName_;
   }
 
+  @Override
+  public String getParsedDb() { return dbName_; }
   public String getDb() { return dbName_; }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
index 7efd00c3e..daca8557c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
@@ -44,6 +44,8 @@ public class DropDbStmt extends StatementBase {
     this.cascade_ = cascade;
   }
 
+  @Override
+  public String getParsedDb() { return dbName_; }
   public String getDb() { return dbName_; }
   public boolean getIfExists() { return ifExists_; }
   public boolean getCascade() { return cascade_; }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java
index 06d9347c1..170a24d7e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java
@@ -64,4 +64,7 @@ public class GrantRevokeRoleStmt extends AuthorizationStmt {
       throw new AnalysisException("Group name in GRANT/REVOKE ROLE cannot be 
empty.");
     }
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/SetStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SetStmt.java
index 0d270b877..7a5518a5e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SetStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SetStmt.java
@@ -79,4 +79,7 @@ public class SetStmt extends StatementBase {
     request.setQuery_option_type(queryOptionType_);
     return request;
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java
index 7e7511401..97c44636f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java
@@ -69,4 +69,7 @@ public class ShowDataSrcsStmt extends StatementBase {
     params.setShow_pattern(getPattern());
     return params;
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java
index cb74d4a0b..eefc6356f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java
@@ -65,6 +65,8 @@ public class ShowFunctionsStmt extends StatementBase {
     return postAnalysisDb_;
   }
 
+  @Override
+  public String getParsedDb() { return parsedDb_; }
   public String getPattern() { return pattern_; }
 
   @Override
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java
index 86f1b4960..1b0760159 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java
@@ -25,7 +25,6 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TShowGrantPrincipalParams;
 
-import java.util.HashSet;
 import java.util.List;
 
 /**
@@ -106,4 +105,7 @@ public class ShowGrantPrincipalStmt extends 
AuthorizationStmt {
     }
     return params;
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java
index 8d4d62fb2..9c84d1366 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java
@@ -68,4 +68,7 @@ public class ShowRolesStmt extends AuthorizationStmt {
     super.analyze(analyzer);
     requestingUser_ = analyzer.getUser();
   }
+
+  @Override
+  public boolean requiresHmsMetadata() { return false; }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java
index 85b7ffc5b..21d4d1daa 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java
@@ -66,6 +66,7 @@ public abstract class ShowTablesOrViewsStmt extends 
StatementBase {
     this.postAnalysisDb_ = null;
   }
 
+  @Override
   public String getParsedDb() { return parsedDb_; }
   public String getPattern() { return pattern_; }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java 
b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index 7512e2026..69b5ddba1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -32,6 +32,8 @@ import org.apache.impala.rewrite.ExprRewriter;
 
 import com.google.common.base.Preconditions;
 
+import javax.annotation.Nullable;
+
 import static org.apache.impala.analysis.ToSqlOptions.DEFAULT;
 
 /**
@@ -68,6 +70,17 @@ public abstract class StatementBase extends StmtNode {
    */
   public void collectTableRefs(List<TableRef> tblRefs) { }
 
+  /**
+   * Returns the db name for CREATE/DROP/ALTER database or SHOW TABLES/VIEWS 
statements.
+   */
+  @Nullable
+  public String getParsedDb() { return null; }
+
+  /**
+   * Returns whether the analysis on the statement requires HMS metadata.
+   */
+  public boolean requiresHmsMetadata() { return true; }
+
   /**
    * Analyzes the statement and throws an AnalysisException if analysis fails. 
A failure
    * could be due to a problem with the statement or because one or more 
tables/views
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 46ef622ae..cf90bc9b7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -745,6 +745,11 @@ public abstract class Catalog implements AutoCloseable {
     }
   }
 
+  public static String toCatalogObjectSummary(TCatalogObject catalogObject) {
+    return String.format("%s(%d)", toCatalogObjectKey(catalogObject),
+        catalogObject.catalog_version);
+  }
+
   /**
    * Returns true if the two objects have the same object type and key 
(generated using
    * toCatalogObjectKey()).
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index b4267c0c3..72eff61f5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,12 +17,15 @@
 
 package org.apache.impala.catalog;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
@@ -57,8 +60,9 @@ import com.google.common.collect.ImmutableList;
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
   // to the catalog object, ordered by catalog version.
-  private SortedMap<Long, TCatalogObject> removedCatalogObjects_ =
-      new TreeMap<Long, TCatalogObject>();
+  private SortedMap<Long, TCatalogObject> removedCatalogObjects_ = new 
TreeMap<>();
+  // Map of the catalog object key to its latest removed version.
+  private Map<String, Long> latestRemovedVersions_ = new HashMap<>();
 
   /**
    * Adds a new item to the map of removed catalog objects.
@@ -66,6 +70,8 @@ public class CatalogDeltaLog {
   public synchronized void addRemovedObject(TCatalogObject catalogObject) {
     Preconditions.checkNotNull(catalogObject);
     removedCatalogObjects_.put(catalogObject.getCatalog_version(), 
catalogObject);
+    String key = Catalog.toCatalogObjectKey(catalogObject);
+    latestRemovedVersions_.merge(key, catalogObject.catalog_version, 
Long::max);
   }
 
   /**
@@ -76,7 +82,34 @@ public class CatalogDeltaLog {
       long toVersion) {
     SortedMap<Long, TCatalogObject> objects =
         removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1);
-    return ImmutableList.<TCatalogObject>copyOf(objects.values());
+    return ImmutableList.copyOf(objects.values());
+  }
+
+  /**
+   * Retrieve all the removed db objects.
+   */
+  public synchronized List<TCatalogObject> retrieveDbObjects() {
+    Map<String, TCatalogObject> res = new HashMap<>();
+    for (TCatalogObject obj : removedCatalogObjects_.values()) {
+      if (obj.type != TCatalogObjectType.DATABASE) continue;
+      res.put(Catalog.toCatalogObjectKey(obj), obj);
+    }
+    return ImmutableList.copyOf(res.values());
+  }
+
+  /**
+   * Retrieve all the removed table objects from dbName.
+   */
+  public synchronized List<TCatalogObject> retrieveTableObjects(String dbName) 
{
+    Map<String, TCatalogObject> res = new HashMap<>();
+    for (TCatalogObject obj : removedCatalogObjects_.values()) {
+      if (obj.type != TCatalogObjectType.TABLE
+          || !StringUtils.equals(dbName, obj.table.db_name)) {
+        continue;
+      }
+      res.put(Catalog.toCatalogObjectKey(obj), obj);
+    }
+    return ImmutableList.copyOf(res.values());
   }
 
   /**
@@ -89,8 +122,10 @@ public class CatalogDeltaLog {
     // Nothing will be garbage collected so avoid creating a new object.
     if (!removedCatalogObjects_.isEmpty() &&
         removedCatalogObjects_.firstKey() < currentCatalogVersion) {
-      removedCatalogObjects_ = new TreeMap<Long, TCatalogObject>(
+      removedCatalogObjects_ = new TreeMap<>(
           removedCatalogObjects_.tailMap(currentCatalogVersion));
+      latestRemovedVersions_.entrySet().removeIf(
+          e -> e.getValue() < currentCatalogVersion);
     }
   }
 
@@ -111,4 +146,12 @@ public class CatalogDeltaLog {
     }
     return false;
   }
+
+  /**
+   * Gets the recently removed version of a catalog object.
+   */
+  public synchronized long getLatestRemovedVersion(TCatalogObject 
catalogObject) {
+    String key = Catalog.toCatalogObjectKey(catalogObject);
+    return latestRemovedVersions_.getOrDefault(key, 0L);
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 273497062..9bbd0e915 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -28,6 +28,7 @@ import static 
org.apache.impala.service.CatalogOpExecutor.FETCHED_HMS_TABLE;
 import static 
org.apache.impala.service.CatalogOpExecutor.FETCHED_LATEST_HMS_EVENT_ID;
 import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_READ_LOCK;
 import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_WRITE_LOCK;
+import static org.apache.impala.thrift.TCatalogObjectType.DATABASE;
 import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION;
 import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
 
@@ -131,6 +132,7 @@ import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TSetEventProcessorStatusResponse;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TSystemTableName;
+import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTableType;
@@ -138,6 +140,8 @@ import org.apache.impala.thrift.TTableUsage;
 import org.apache.impala.thrift.TTableUsageMetrics;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
+import org.apache.impala.thrift.TWaitForHmsEventRequest;
+import org.apache.impala.thrift.TWaitForHmsEventResponse;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.CatalogBlacklistUtils;
 import org.apache.impala.util.DebugUtils;
@@ -4192,6 +4196,191 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  private void 
addRemovedTableToWaitForHmsEventResponse(TWaitForHmsEventResponse res,
+      String dbName, String tblName) {
+    TCatalogObject tblDesc = new TCatalogObject();
+    tblDesc.setType(TCatalogObjectType.TABLE);
+    tblDesc.setTable(new TTable(dbName, tblName));
+    long version = deleteLog_.getLatestRemovedVersion(tblDesc);
+    if (version > 0) {
+      tblDesc.setCatalog_version(version);
+      res.result.addToRemoved_catalog_objects(tblDesc);
+    }
+    // If the table is not in the delete log, i.e. version <= 0, it comes from 
an
+    // illegal table name. It's OK to return nothing for it.
+    // Note that coordinators could send illegal table names since the request 
is
+    // sent before the query is analyzed. What we get are potential table names
+    // that could be used by a query. E.g. "SELECT c FROM a.b" might read 
table "a.b"
+    // or table "default.a" in the case when "b" is an array/map column. 
Coordinator
+    // will send the request for table "a.b" and "default.a". Catalogd just 
sends back
+    // info of the legal table.
+  }
+
+  private boolean addTableToWaitForHmsEventResponse(TWaitForHmsEventResponse 
res,
+      Table tbl, boolean wantMinimalResponse) {
+    Preconditions.checkNotNull(tbl);
+    // Collect updates for the local-catalog mode.
+    if (wantMinimalResponse) {
+      res.result.addToUpdated_catalog_objects(tbl.toInvalidationObject());
+      return true;
+    }
+    // Collect updates for the legacy catalog mode.
+    String errMsg;
+    try {
+      // TODO: count the lock waiting time into the request's timeout?
+      if (tbl.readLock().tryLock(600000L, TimeUnit.MILLISECONDS)) {
+        res.result.addToUpdated_catalog_objects(tbl.toTCatalogObject());
+        return true;
+      }
+      errMsg = "HMS events are synced as expected but timed out to get " +
+          "the update of table " + tbl.getFullName();
+    } catch (InterruptedException e) {
+      errMsg = "HMS events are synced as expected but acquiring read lock of 
table " +
+          tbl.getFullName() + " got interrupted";
+    } finally {
+      if (tbl.isReadLockedByCurrentThread()) {
+        tbl.readLock().unlock();
+      }
+    }
+    TStatus errStatus = new TStatus(TErrorCode.RPC_GENERAL_ERROR,
+        Lists.newArrayList(errMsg));
+    res.setStatus(errStatus);
+    // 'status' is a required field of TCatalogUpdateResult, so we have to set
+    // it though it's unused.
+    res.result.setStatus(errStatus);
+    LOG.error(errMsg);
+    return false;
+  }
+
+  private boolean addDbToWaitForHmsEventResponse(TWaitForHmsEventResponse res,
+      String dbName, boolean wantMinimalResponse, boolean wantTableList) {
+    Db db = getDb(dbName);
+    TCatalogObject resDb = new TCatalogObject();
+    resDb.setType(TCatalogObjectType.DATABASE);
+    // If the db was removed, add deletions for it and all tables under it.
+    if (db == null) {
+      resDb.setDb(new TDatabase(dbName));
+      long version = deleteLog_.getLatestRemovedVersion(resDb);
+      if (version > 0) {
+        resDb.setCatalog_version(version);
+        res.result.addToRemoved_catalog_objects(resDb);
+      }
+      for (TCatalogObject obj : deleteLog_.retrieveTableObjects(dbName)) {
+        res.result.addToRemoved_catalog_objects(obj);
+      }
+      return true;
+    }
+    if (wantMinimalResponse) {
+      res.result.addToUpdated_catalog_objects(db.toMinimalTCatalogObject());
+    } else {
+      resDb.setCatalog_version(db.getCatalogVersion());
+      resDb.setDb(db.toThrift());
+      res.result.addToUpdated_catalog_objects(resDb);
+    }
+    if (wantTableList) {
+      // TODO: gets a list of known tables from the coordinator and only sends
+      //  back unknown/updated/removed tables.
+      for (Table tbl : db.getTables()) {
+        if (!addTableToWaitForHmsEventResponse(res, tbl, wantMinimalResponse)) 
{
+          return false;
+        }
+      }
+      // Add tables that are removed.
+      for (TCatalogObject obj : deleteLog_.retrieveTableObjects(dbName)) {
+        // Ignore re-created tables
+        if (db.getTable(obj.table.tbl_name) != null) continue;
+        res.result.addToRemoved_catalog_objects(obj);
+      }
+    }
+    return true;
+  }
+
+  public TWaitForHmsEventResponse waitForHmsEvent(TWaitForHmsEventRequest req) 
{
+    LOG.info("waitForHmsEvent request: want_minimal_response={}, 
coordinator={}, " +
+            "timeout_s={}, want_db_list={}, want_table_list={}, objects=[{}]",
+        req.header.want_minimal_response, req.header.coordinator_hostname, 
req.timeout_s,
+        req.want_db_list, req.want_table_list, !req.isSetObject_descs() ? "" :
+            req.object_descs.stream().map(Catalog::toCatalogObjectKey)
+                .collect(Collectors.joining(", ")));
+    TWaitForHmsEventResponse res = new TWaitForHmsEventResponse();
+    if (!(metastoreEventProcessor_ instanceof MetastoreEventsProcessor)) {
+      res.setStatus(new TStatus(TErrorCode.RPC_GENERAL_ERROR,
+          Lists.newArrayList("HMS event processing is disabled")));
+      LOG.error("HMS event processing is disabled. Return without waiting.");
+      return res;
+    }
+    MetastoreEventsProcessor eventsProcessor =
+        (MetastoreEventsProcessor) metastoreEventProcessor_;
+    TStatus status = eventsProcessor.waitForSyncUpToCurrentEvent(req.timeout_s 
* 1000L);
+    if (status.status_code != TErrorCode.OK) {
+      res.setStatus(status);
+      LOG.error(String.join("\n", status.error_msgs));
+      return res;
+    }
+    res.setResult(new TCatalogUpdateResult());
+    res.getResult().setCatalog_service_id(JniCatalog.getServiceId());
+
+    // Collect catalog objects required by the query
+    boolean wantMinimalResponse = req.header.want_minimal_response;
+    if (req.isSetObject_descs()) {
+      for (TCatalogObject catalogObject: req.getObject_descs()) {
+        if (catalogObject.isSetDb()) {
+          if (!addDbToWaitForHmsEventResponse(res, 
catalogObject.getDb().db_name,
+              wantMinimalResponse, req.want_table_list)) {
+            // Error status is already set in addDbToWaitForHmsEventResponse()
+            return res;
+          }
+        } else if (catalogObject.isSetTable()) {
+          TTable table = catalogObject.getTable();
+          Db db = getDb(table.db_name);
+          // If the db was removed, adds a deletion for it in the response
+          if (db == null) {
+            boolean success = addDbToWaitForHmsEventResponse(
+                res, table.db_name, wantMinimalResponse, 
/*wantTableList*/false);
+            Preconditions.checkState(success,
+                "should success since wantTableList is false");
+            continue;
+          }
+          Table tbl = db.getTable(table.tbl_name);
+          if (tbl == null) {
+            addRemovedTableToWaitForHmsEventResponse(res, table.db_name, 
table.tbl_name);
+          } else if (!addTableToWaitForHmsEventResponse(res, tbl, 
wantMinimalResponse)) {
+            // Error status is already set in 
addTableToWaitForHmsEventResponse()
+            return res;
+          }
+        }
+      }
+    } else if (req.want_db_list) {
+      for (Db db : getAllDbs()) {
+        boolean success = addDbToWaitForHmsEventResponse(res, db.getName(),
+            wantMinimalResponse, /*wantTableList*/false);
+        Preconditions.checkState(success,
+            "should success since wantTableList is false");
+      }
+      // Also add dbs that are recently deleted
+      for (TCatalogObject deletedDb : deleteLog_.retrieveDbObjects()) {
+        // Ignore re-created dbs
+        if (getDb(deletedDb.db.db_name) != null) continue;
+        res.result.addToRemoved_catalog_objects(deletedDb);
+      }
+    }
+    TStatus okStatus = new TStatus(TErrorCode.OK, Collections.emptyList());
+    res.setStatus(okStatus);
+    // 'status' is a required field of TCatalogUpdateResult, so we have to set 
it though
+    // it's unused.
+    res.result.setStatus(okStatus);
+    LOG.info("waitForHmsEvent succeeds. updated_objects=[{}], 
removed_objects=[{}]",
+        getCatalogUpdateSummary(res.result.updated_catalog_objects),
+        getCatalogUpdateSummary(res.result.removed_catalog_objects));
+    return res;
+  }
+
+  private String getCatalogUpdateSummary(List<TCatalogObject> objs) {
+    if (objs == null) return "";
+    return objs.stream().map(Catalog::toCatalogObjectSummary)
+        .collect(Collectors.joining(", "));
+  }
+
   /**
    * Marks write ids with corresponding status for the table if it is loaded 
HdfsTable.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index a5237008d..896179ffe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -392,7 +392,8 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
    *  catalog version is < the catalog version of this drop operation.
    */
   private void removeCatalogObject(TCatalogObject catalogObject) {
-    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
+    Preconditions.checkState(catalogObject.getCatalog_version() != 0,
+        "Dropped catalog version is 0. type: %s", catalogObject.getType());
     long dropCatalogVersion = catalogObject.getCatalog_version();
     switch(catalogObject.getType()) {
       case DATABASE:
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
index d9a70e7bf..ba98684ad 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
@@ -40,6 +40,11 @@ public interface ExternalEventsProcessor {
    */
   long getCurrentEventId() throws MetastoreNotificationFetchException;
 
+  /**
+   * Get the latest event id that has been processed.
+   */
+  default long getLastSyncedEventId() { return -1; }
+
   /**
    * Pauses the event processing. Use <code>start(fromEventId)</code> method 
below to
    * restart the event processing
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 5c7a5e4aa..4928e4b2b 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,12 +61,15 @@ import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.common.Reference;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TEventBatchProgressInfo;
 import org.apache.impala.thrift.TEventProcessorMetrics;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
+import org.apache.impala.thrift.TStatus;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.NoOpEventSequence;
@@ -678,6 +682,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   // can ignore the drop events when they are received later.
   private final DeleteEventLog deleteEventLog_ = new DeleteEventLog();
 
+  // Sleep interval when waiting for HMS events to be synced.
+  private final int hmsEventSyncSleepIntervalMs_;
+
   @VisibleForTesting
   MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long 
startSyncFromId,
       long pollingFrequencyInSec) throws CatalogException {
@@ -689,6 +696,10 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     initMetrics();
     metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor);
     pollingFrequencyInSec_ = pollingFrequencyInSec;
+    hmsEventSyncSleepIntervalMs_ = BackendConfig.INSTANCE
+        .getBackendCfg().hms_event_sync_sleep_interval_ms;
+    Preconditions.checkState(hmsEventSyncSleepIntervalMs_ > 0,
+        "hms_event_sync_sleep_interval_ms must be positive");
   }
 
   /**
@@ -804,10 +815,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   }
 
   /**
-   * returns the current value of LastSyncedEventId. This method is not 
thread-safe and
-   * only to be used for testing purposes
+   * returns the current value of LastSyncedEventId.
    */
-  @VisibleForTesting
   public long getLastSyncedEventId() {
     return lastSyncedEventId_.get();
   }
@@ -1521,4 +1530,57 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       return tableName_;
     }
   }
+
+  public TStatus waitForSyncUpToCurrentEvent(long timeoutMs) {
+    TStatus res = new TStatus();
+    // Only waits when event-processor is in ACTIVE/PAUSED states. PAUSED 
states happen
+    // at startup or when global invalidate is running, so it's ok to wait for.
+    if (!EventProcessorStatus.ACTIVE.equals(eventProcessorStatus_)
+        && !EventProcessorStatus.PAUSED.equals(eventProcessorStatus_)) {
+      res.setStatus_code(TErrorCode.GENERAL);
+      res.addToError_msgs(
+          "Current state of HMS event processor is " + eventProcessorStatus_);
+      return res;
+    }
+    long waitForEventId;
+    try {
+      waitForEventId = getCurrentEventId();
+    } catch (MetastoreNotificationFetchException e) {
+      res.setStatus_code(TErrorCode.GENERAL);
+      res.addToError_msgs("Failed to fetch current HMS event id: " + 
e.getMessage());
+      return res;
+    }
+    long lastSyncedEventId = getLastSyncedEventId();
+    long startMs = System.currentTimeMillis();
+    long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
+    // Avoid too many log entries if the waiting interval is smaller than 
500ms.
+    int logIntervals = Math.max(1, 1000 / hmsEventSyncSleepIntervalMs_);
+    int numIters = 0;
+    while (lastSyncedEventId < waitForEventId
+        && System.currentTimeMillis() - startMs < timeoutMs) {
+      if (numIters++ % logIntervals == 0) {
+        LOG.info("Waiting for last synced event id ({}) to reach {}",
+            lastSyncedEventId, waitForEventId);
+      }
+      Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, 
TimeUnit.MILLISECONDS);
+      lastSyncedEventId = getLastSyncedEventId();
+      if (!EventProcessorStatus.ACTIVE.equals(eventProcessorStatus_)
+          && !EventProcessorStatus.PAUSED.equals(eventProcessorStatus_)) {
+        res.setStatus_code(TErrorCode.GENERAL);
+        res.addToError_msgs(
+            "Current state of HMS event processor is " + 
eventProcessorStatus_);
+        return res;
+      }
+    }
+    if (lastSyncedEventId < waitForEventId) {
+      res.setStatus_code(TErrorCode.GENERAL);
+      res.addToError_msgs(String.format("Timeout waiting for HMS events to be 
synced. " +
+          "Event id to wait for: %d. Last synced event id: %d",
+          waitForEventId, lastSyncedEventId));
+      return res;
+    }
+    LOG.info("Last synced event id ({}) reached {}", lastSyncedEventId, 
waitForEventId);
+    res.setStatus_code(TErrorCode.OK);
+    return res;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java 
b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index ed72ba2ff..baaf142eb 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -47,10 +47,13 @@ import org.apache.impala.thrift.TPrioritizeLoadResponse;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResultRow;
+import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TSymbolLookupParams;
 import org.apache.impala.thrift.TSymbolLookupResult;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TWaitForHmsEventRequest;
+import org.apache.impala.thrift.TWaitForHmsEventResponse;
 import org.apache.impala.util.NativeLibUtil;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -155,6 +158,9 @@ public class FeSupport {
   // Get the number of live queries.
   public native static long NativeNumLiveQueries();
 
+  public native static byte[] NativeWaitForHmsEvents(byte[] thriftReq,
+      byte[] thriftQueryOptions);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -556,6 +562,22 @@ public class FeSupport {
     return NativeNumLiveQueries();
   }
 
+  public static TStatus WaitForHmsEvents(TWaitForHmsEventRequest req,
+      TQueryOptions queryOptions) throws InternalException {
+    try {
+      TSerializer serializer = new TSerializer();
+      byte[] result = NativeWaitForHmsEvents(serializer.serialize(req),
+          serializer.serialize(queryOptions));
+      Preconditions.checkNotNull(result, "result of NativeWaitForHmsEvents is 
null");
+      TDeserializer deserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
+      TStatus status = new TStatus();
+      deserializer.deserialize(status, result);
+      return status;
+    } catch (TException e) {
+      throw new InternalException("Error waiting for HMS events", e);
+    }
+  }
+
   /**
    * Calling this function before loadLibrary() causes external frontend
    * initialization to be used during NativeFeInit()
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index e677580d3..0a2710f86 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -92,13 +92,16 @@ import org.apache.impala.analysis.OptimizeStmt;
 import org.apache.impala.analysis.Parser;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.ResetMetadataStmt;
+import org.apache.impala.analysis.ShowDbsStmt;
 import org.apache.impala.analysis.ShowFunctionsStmt;
 import org.apache.impala.analysis.ShowGrantPrincipalStmt;
 import org.apache.impala.analysis.ShowRolesStmt;
+import org.apache.impala.analysis.ShowTablesOrViewsStmt;
 import org.apache.impala.analysis.StatementBase;
 import org.apache.impala.analysis.StmtMetadataLoader;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
 import org.apache.impala.analysis.TableName;
+import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TruncateStmt;
 import org.apache.impala.authentication.saml.ImpalaSamlClient;
 import org.apache.impala.authorization.AuthorizationChecker;
@@ -133,6 +136,7 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -160,6 +164,8 @@ import org.apache.impala.planner.ScanNode;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TBackendGflags;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogOpRequest;
 import org.apache.impala.thrift.TCatalogOpType;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
@@ -170,12 +176,14 @@ import org.apache.impala.thrift.TCommentOnParams;
 import org.apache.impala.thrift.TCopyTestCaseReq;
 import org.apache.impala.thrift.TCounter;
 import org.apache.impala.thrift.TCreateDropRoleParams;
+import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TDdlQueryOptions;
 import org.apache.impala.thrift.TDdlType;
 import org.apache.impala.thrift.TDescribeHistoryParams;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
+import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TImpalaTableType;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
@@ -212,13 +220,17 @@ import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TShowFilesParams;
 import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TSlotCountStrategy;
+import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TStmtType;
+import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTruncateParams;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUnit;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.impala.thrift.TWaitForHmsEventRequest;
+import org.apache.impala.thrift.TWaitForHmsEventResponse;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.EventSequence;
@@ -904,17 +916,9 @@ public class Frontend {
     ddl.setSync_ddl(result.getQuery_options().isSync_ddl());
     result.setCatalog_op_request(ddl);
     if (ddl.getOp_type() == TCatalogOpType.DDL) {
-      TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
-      header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
-      TQueryCtx queryCtx = analysis.getAnalyzer().getQueryCtx();
-      header.setQuery_id(queryCtx.query_id);
-      
header.setClient_ip(queryCtx.getSession().getNetwork_address().getHostname());
-      TClientRequest clientRequest = queryCtx.getClient_request();
-      header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ?
-          clientRequest.getRedacted_stmt() : clientRequest.getStmt());
-      header.setWant_minimal_response(
-          BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
-      header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
+      TCatalogServiceRequestHeader header = createCatalogServiceRequestHeader(
+          analysis.getAnalyzer().getUser().getName(),
+          analysis.getAnalyzer().getQueryCtx());
       ddl.getDdl_params().setHeader(header);
       // Forward relevant query options to the catalogd.
       TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions();
@@ -2185,6 +2189,119 @@ public class Frontend {
         expectedNumExecutor(execGroupSet), 
execGroupSet.getNum_cores_per_executor());
   }
 
+  private static TCatalogServiceRequestHeader 
createCatalogServiceRequestHeader(
+      String requestingUser, TQueryCtx queryCtx) {
+    TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
+    header.setRequesting_user(requestingUser);
+    header.setQuery_id(queryCtx.query_id);
+    
header.setClient_ip(queryCtx.getSession().getNetwork_address().getHostname());
+    TClientRequest clientRequest = queryCtx.getClient_request();
+    header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ?
+            clientRequest.getRedacted_stmt() : clientRequest.getStmt());
+    header.setWant_minimal_response(
+        BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
+    header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
+    return header;
+  }
+
+  /**
+   * Collect required catalog objects for query planning of the statement and 
update
+   * the request.
+   */
+  private void collectRequiredObjects(TWaitForHmsEventRequest req, 
StatementBase stmt,
+      String sessionDb) {
+    if (stmt instanceof ShowDbsStmt) {
+      req.want_db_list = true;
+      return;
+    }
+    // The statement hasn't been analyzed yet. We can only get the parsed db, 
i.e. the db
+    // name written in the query string.
+    String dbName = stmt.getParsedDb();
+    // Use the session database if the db is not specified in SHOW 
TABLES/VIEWS/FUNCTIONS
+    // statement.
+    if (dbName == null &&
+        (stmt instanceof ShowFunctionsStmt || stmt instanceof 
ShowTablesOrViewsStmt)) {
+      dbName = sessionDb;
+    }
+    if (dbName != null) {
+      TCatalogObject dbDesc = new TCatalogObject();
+      dbDesc.setType(TCatalogObjectType.DATABASE);
+      dbDesc.setDb(new TDatabase(dbName));
+      req.addToObject_descs(dbDesc);
+      if (stmt instanceof ShowTablesOrViewsStmt) {
+        req.setWant_table_list(true);
+        LOG.info("Waiting for HMS events on database {} and underlying 
tables", dbName);
+      } else {
+        LOG.info("Waiting for HMS events on database " + dbName);
+      }
+      // 'dbName' is not null only for CREATE/DROP/ALTER database or SHOW 
TABLES/VIEWS
+      // statements. No more tables are needed.
+      return;
+    }
+    List<TableRef> tblRefs = new ArrayList<>();
+    stmt.collectTableRefs(tblRefs);
+    Set<TableName> tableNames = new HashSet<>();
+    for (TableRef ref : tblRefs) {
+      tableNames.addAll(org.apache.impala.analysis.Path.getCandidateTables(
+          ref.getPath(), sessionDb));
+    }
+    Set<String> dbNames = new HashSet<>();
+    for (TableName tblName : tableNames) {
+      dbNames.add(tblName.getDb());
+      TCatalogObject tblDesc = new TCatalogObject();
+      tblDesc.setType(TCatalogObjectType.TABLE);
+      tblDesc.setTable(new TTable(tblName.getDb(), tblName.getTbl()));
+      req.addToObject_descs(tblDesc);
+    }
+    // Add dbs needed by the tables.
+    for (String name : dbNames) {
+      TCatalogObject dbDesc = new TCatalogObject();
+      dbDesc.setType(TCatalogObjectType.DATABASE);
+      dbDesc.setDb(new TDatabase(name));
+      req.addToObject_descs(dbDesc);
+    }
+    LOG.info("Waiting for HMS events on the dbs: {} and tables: {}",
+        String.join(", ", dbNames),
+        
tableNames.stream().map(TableName::toString).collect(Collectors.joining(", ")));
+  }
+
+  private void waitForHmsEvents(TQueryCtx queryCtx, TQueryOptions queryOptions,
+      List<String> warnings, EventSequence timeline) throws ImpalaException {
+    if (!queryOptions.isSetSync_hms_events_wait_time_s()
+        || queryOptions.sync_hms_events_wait_time_s <= 0) {
+      return;
+    }
+    StatementBase stmt = Parser.parse(
+        queryCtx.client_request.stmt, queryCtx.client_request.query_options);
+    // Return immediately for simple statements that don't require HMS 
metadata, e.g.
+    // SET query_option=value;
+    if (!stmt.requiresHmsMetadata()) return;
+    TWaitForHmsEventRequest req = new TWaitForHmsEventRequest();
+    req.setTimeout_s(queryOptions.sync_hms_events_wait_time_s);
+    req.setHeader(createCatalogServiceRequestHeader(
+        TSessionStateUtil.getEffectiveUser(queryCtx.session), queryCtx));
+    collectRequiredObjects(req, stmt, queryCtx.session.database);
+    // TODO: share 'timeline' to BE so we know when the updates are applied
+    TStatus status = FeSupport.WaitForHmsEvents(req, queryOptions);
+    if (status.status_code != TErrorCode.OK) {
+      String errorMsg;
+      if (queryOptions.sync_hms_events_strict_mode) {
+        errorMsg = "Failed to sync events from Metastore";
+      } else {
+        errorMsg = "Continuing without syncing Metastore events";
+      }
+      timeline.markEvent(errorMsg);
+      errorMsg += ": " + String.join("", status.error_msgs);
+      LOG.error(errorMsg);
+      warnings.add(errorMsg);
+      if (queryOptions.sync_hms_events_strict_mode) {
+        throw new InternalException(errorMsg);
+      }
+    } else {
+      timeline.markEvent("Synced events from Metastore");
+    }
+  }
+
   private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
       throws ImpalaException {
     TQueryCtx queryCtx = planCtx.getQueryContext();
@@ -2193,6 +2310,8 @@ public class Frontend {
         + queryCtx.session.database);
 
     TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
+    List<String> warnings = new ArrayList<>();
+    waitForHmsEvents(queryCtx, queryOptions, warnings, timeline);
     boolean enable_replan = queryOptions.isEnable_replan();
     final boolean clientSetRequestPool = queryOptions.isSetRequest_pool();
     Preconditions.checkState(
@@ -2288,7 +2407,7 @@ public class Frontend {
       String retryMsg = "";
       while (true) {
         try {
-          req = doCreateExecRequest(planCtx, timeline);
+          req = doCreateExecRequest(planCtx, warnings, timeline);
           markTimelineRetries(attempt, retryMsg, timeline);
           break;
         } catch (InconsistentMetadataFetchException e) {
@@ -2584,7 +2703,7 @@ public class Frontend {
   }
 
   private TExecRequest doCreateExecRequest(PlanCtx planCtx,
-      EventSequence timeline) throws ImpalaException {
+      List<String> warnings, EventSequence timeline) throws ImpalaException {
     TQueryCtx queryCtx = planCtx.getQueryContext();
     // Parse stmt and collect/load metadata to populate a stmt-local table 
cache
     StatementBase stmt = Parser.parse(
@@ -2628,6 +2747,7 @@ public class Frontend {
       LOG.info("Analysis finished.");
     }
     Preconditions.checkNotNull(analysisResult.getStmt());
+    analysisResult.getAnalyzer().addWarnings(warnings);
     TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
     for (TableName table : stmtTableCache.tables.keySet()) {
       result.addToTables(table.toThrift());
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index f6e80c5c1..cdf4d91ba 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -91,6 +91,8 @@ import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.thrift.TGetAllHadoopConfigsResponse;
+import org.apache.impala.thrift.TWaitForHmsEventRequest;
+import org.apache.impala.thrift.TWaitForHmsEventResponse;
 import org.apache.impala.util.AuthorizationUtil;
 import org.apache.impala.util.CatalogOpUtil;
 import org.apache.impala.util.EventSequence;
@@ -98,6 +100,7 @@ import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.NoOpEventSequence;
 import org.apache.impala.util.PatternMatcher;
+import org.apache.impala.util.TUniqueIdUtil;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.impala.util.TUniqueIdUtil;
 import org.apache.thrift.TBase;
@@ -105,6 +108,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -637,4 +641,13 @@ public class JniCatalog {
       throw new InternalException(e.getMessage());
     }
   }
+
+  public byte[] waitForHmsEvent(byte[] req) throws TException, ImpalaException 
{
+    TWaitForHmsEventRequest request = new TWaitForHmsEventRequest();
+    JniUtil.deserializeThrift(protocolFactory_, request, req);
+    String queryId = TUniqueIdUtil.PrintId(request.getHeader().getQuery_id());
+    String shortDesc = "waitForHmsEvent " + queryId;
+    return execAndSerialize(
+        "waitForHmsEvent", shortDesc, () -> catalog_.waitForHmsEvent(request));
+  }
 }
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 0cc5d4ea5..4addc0ecd 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -624,6 +624,26 @@ class ImpalaTestSuite(BaseTestSuite):
         return int(m['value'])
     assert False, "Could not find metric: %s" % name
 
+  def verify_timeline_item(self, section, label, profile):
+    in_section = False
+    for line in profile.split('\n'):
+      line = line.strip()
+      if line.startswith(section + ": "):
+        in_section = True
+        continue
+      if not in_section:
+        continue
+      if not line.startswith("- "):
+        # Not a label. The current timeline section ends.
+        break
+      if line.startswith("- {}: ".format(label)):
+        match = re.search(r'\((.*)\)', line)
+        assert match, "Value not found in '{}'".format(line)
+        duration = match.group(1)
+        assert duration != '0ns' and duration != '0'
+        return
+    assert False, "'- {}:' not found in\n{}".format(label, profile)
+
   def __do_replacements(self, s, use_db=None, extra=None):
     globs = globals()
     # following assignment are purposefully redundant to avoid flake8 warnings 
(F401).
@@ -1477,49 +1497,37 @@ class ImpalaTestSuite(BaseTestSuite):
     """Check if lines contain value."""
     return any([line.find(value) != -1 for line in lines])
 
+  def execute_query_with_hms_sync(self, query, timeout_s, strict=True):
+    """Execute the query with WaitForHmsSync enabled"""
+    with self.create_impala_client() as client:
+      client.set_configuration({"sync_hms_events_wait_time_s": timeout_s,
+                                "sync_hms_events_strict_mode": strict})
+      return self.__execute_query(client, query)
+
   def wait_for_db_to_appear(self, db_name, timeout_s):
     """Wait until the database with 'db_name' is present in the impalad's 
local catalog.
-    Fail after timeout_s if the doesn't appear."""
-    start_time = time.time()
-    while time.time() - start_time < timeout_s:
-      try:
-        # This will throw an exception if the database is not present.
-        self.client.execute("describe database 
`{db_name}`".format(db_name=db_name))
-        return
-      except Exception:
-        time.sleep(0.2)
-        continue
-    raise Exception("DB {0} didn't show up after {1}s", db_name, timeout_s)
+    Fail after timeout_s if it doesn't appear."""
+    result = self.execute_query_with_hms_sync(
+        "describe database `{0}`".format(db_name), timeout_s)
+    assert result.success
 
   def confirm_db_exists(self, db_name):
     """Confirm the database with 'db_name' is present in the impalad's local 
catalog.
        Fail if the db is not present"""
     # This will throw an exception if the database is not present.
     self.client.execute("describe database 
`{db_name}`".format(db_name=db_name))
-    return
 
   def confirm_table_exists(self, db_name, tbl_name):
     """Confirms if the table exists. The describe table command will fail if 
the table
        does not exist."""
     self.client.execute("describe `{0}`.`{1}`".format(db_name, tbl_name))
-    return
 
-  def wait_for_table_to_appear(self, db_name, table_name, timeout_s):
+  def wait_for_table_to_appear(self, db_name, table_name, timeout_s=10):
     """Wait until the table with 'table_name' in 'db_name' is present in the
-    impalad's local catalog. Fail after timeout_s if the doesn't appear."""
-    start_time = time.time()
-    while time.time() - start_time < timeout_s:
-      try:
-        # This will throw an exception if the table is not present.
-        self.client.execute("describe `{db_name}`.`{table_name}`".format(
-                            db_name=db_name, table_name=table_name))
-        return
-      except Exception as ex:
-        print(str(ex))
-        time.sleep(0.2)
-        continue
-    raise Exception("Table {0}.{1} didn't show up after {2}s", db_name, 
table_name,
-                    timeout_s)
+    impalad's local catalog. Fail after timeout_s if it doesn't appear."""
+    result = self.execute_query_with_hms_sync(
+        "describe `{0}`.`{1}`".format(db_name, table_name), timeout_s)
+    assert result.success
 
   def assert_eventually(self, timeout_s, period_s, condition, error_msg=None):
     """Assert that the condition (a function with no parameters) returns True 
within the
diff --git a/tests/custom_cluster/test_event_processing_error.py 
b/tests/custom_cluster/test_event_processing_error.py
index afe954311..64516480d 100644
--- a/tests/custom_cluster/test_event_processing_error.py
+++ b/tests/custom_cluster/test_event_processing_error.py
@@ -356,21 +356,19 @@ class TestEventProcessingError(CustomClusterTestSuite):
     replication tests
     """
     # inserts on transactional tables
-    TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, 
self.client,
-      self.cluster, unique_database, True)
+    TestEventProcessingBase._run_test_insert_events_impl(unique_database, True)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     try:
         test_db = unique_database + "_no_transact"
         self.run_stmt_in_hive("""create database {}""".format(test_db))
         # inserts on external tables
-        TestEventProcessingBase._run_test_insert_events_impl(self.hive_client,
-          self.client, self.cluster, test_db, False)
+        TestEventProcessingBase._run_test_insert_events_impl(test_db, False)
         assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     finally:
         self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
     # replication related tests
-    
TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client,
-      self.client, self.cluster, self.filesystem_client)
+    TestEventProcessingBase._run_event_based_replication_tests_impl(
+        self.filesystem_client)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
   def __create_table_and_load__(self, db_name, table_name, is_transactional,
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index f8c3d0752..c3f9bd648 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -46,6 +46,10 @@ STATESTORED_ARGS = (
   "-statestore_heartbeat_frequency_ms={freq_ms} "
   "-statestore_priority_update_frequency_ms={freq_ms}").format(
     freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+EVENT_SYNC_QUERY_OPTIONS = {
+    "sync_hms_events_wait_time_s": 10,
+    "sync_hms_events_strict_mode": True
+}
 
 
 def wait_single_statestore_heartbeat():
@@ -428,11 +432,11 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
           create table {db}.{tbl} (id int);
           insert into {db}.{tbl} values(1);""".format(db=db_name, 
tbl=tbl_name))
       # With MetastoreEventProcessor running, the insert event will be 
processed. Query
-      # the table from Impala.
-      EventProcessorUtils.wait_for_event_processing(self, event_proc_timeout)
-      # Verify that the data is present in Impala.
-      data = self.execute_scalar("select * from %s.%s" % (db_name, tbl_name))
-      assert data == '1'
+      # the table from Impala. Verify that the data is present in Impala.
+      result = self.execute_query_with_hms_sync(
+          "select * from %s.%s" % (db_name, tbl_name), event_proc_timeout)
+      assert len(result.data) == 1
+      assert result.data[0] == '1'
       # Execute ALTER TABLE + DROP in quick succession so they will be 
processed in the
       # same event batch.
       self.run_stmt_in_hive("""
@@ -1491,3 +1495,96 @@ class 
TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
     
self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", 
"trace")
     self._run_self_events_test(unique_database, 
vector.get_value('exec_option'),
         use_impala=True)
+
+
[email protected]
+class TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
+
+  
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
+  def test_hms_event_sync_with_event_processing_disabled(self, vector):
+    """Test with HMS event processing disabled. Verify the error message 
appears
+    correctly in non-strict mode. Verify the query fails in strict mode."""
+    client = self.default_impala_client(vector.get_value('protocol'))
+    query = "select count(*) from functional.alltypes"
+    # Timeline label shown in the profile when the wait failed.
+    label = "Continuing without syncing Metastore events"
+    # Verify error messages in non-strict mode
+    client.set_configuration({"sync_hms_events_wait_time_s": 10})
+    handle = client.execute_async(query)
+    client.wait_for_finished_timeout(handle, 5)
+    results = client.fetch(query, handle)
+    assert results.success
+    assert len(results.data) == 1
+    assert int(results.data[0]) == 7300
+
+    client_log = client.get_log(handle)
+    expected_error = "Continuing without syncing Metastore events: " \
+                     "HMS event processing is disabled"
+    assert expected_error in client_log
+    profile = client.get_runtime_profile(handle)
+    assert "Errors: " + expected_error in profile, profile
+    self.verify_timeline_item("Query Compilation", label, profile)
+    client.close_query(handle)
+
+    # Verify multi-lines in error log
+    client.set_configuration({"sync_hms_events_wait_time_s": 10,
+                              "debug_action": "0:PREPARE:INJECT_ERROR_LOG"})
+    handle = client.execute_async(query)
+    client.wait_for_finished_timeout(handle, 5)
+    client_log = client.get_log(handle)
+    expected_error += "\nDebug Action: INJECT_ERROR_LOG"
+    assert expected_error in client_log
+    profile = client.get_runtime_profile(handle)
+    assert "Errors: " + expected_error in profile, profile
+    self.verify_timeline_item("Query Compilation", label, profile)
+
+    # Verify failures in strict mode
+    err = self.execute_query_expect_failure(
+        client, query, EVENT_SYNC_QUERY_OPTIONS)
+    expected_error = "Failed to sync events from Metastore: " \
+                     "HMS event processing is disabled"
+    assert expected_error in str(err)
+    client.close_query(handle)
+
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--debug_actions=catalogd_event_processing_delay:SLEEP@2000")
+  def test_hms_event_sync_timeout(self, vector, unique_database):
+    client = self.default_impala_client(vector.get_value('protocol'))
+    # Timeline label shown in the profile when the wait failed.
+    label = "Continuing without syncing Metastore events"
+    # Prepare a partitioned table and load it into catalogd
+    create_stmt = "create table {}.part (i int) partitioned by (p int)".format(
+        unique_database)
+    self.execute_query_expect_success(client, create_stmt)
+    self.execute_query_expect_success(client, "describe 
{}.part".format(unique_database))
+
+    # Add a partition in Hive which generates an ADD_PARTITION event
+    alter_stmt = "insert into {}.part partition (p=0) values 
(0)".format(unique_database)
+    self.run_stmt_in_hive(alter_stmt)
+
+    # SELECT gets 0 rows since timeout waiting for HMS events to be synced
+    query = "select * from {}.part".format(unique_database)
+    client.set_configuration({"sync_hms_events_wait_time_s": 2})
+    handle = client.execute_async(query)
+    client.wait_for_finished_timeout(handle, 60)
+    results = client.fetch(query, handle)
+    assert results.success
+    assert len(results.data) == 0
+
+    # Verify the warnings
+    client_log = client.get_log(handle)
+    expected_error = (
+        "Continuing without syncing Metastore events: "
+        "Timeout waiting for HMS events to be synced. Event id to wait for:")
+    assert expected_error in client_log
+    assert ". Last synced event id: " in client_log
+    profile = client.get_runtime_profile(handle)
+    assert "Errors: " + expected_error in profile, profile
+    self.verify_timeline_item("Query Compilation", label, profile)
+    # The duration is something like "2s034ms". Just checking "2s" here.
+    assert "- Continuing without syncing Metastore events: 2s" in profile, 
profile
+    client.close_query(handle)
+
+    # SELECT gets the new row if waiting for enough time
+    results = self.execute_query_expect_success(client, query, 
EVENT_SYNC_QUERY_OPTIONS)
+    assert len(results.data) == 1
diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py 
b/tests/custom_cluster/test_hive_parquet_codec_interop.py
index a020a2be7..4f54e36ea 100644
--- a/tests/custom_cluster/test_hive_parquet_codec_interop.py
+++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py
@@ -99,8 +99,7 @@ class TestParquetInterop(CustomClusterTestSuite):
       # Make sure Impala's metadata is in sync.
       if cluster_properties.is_event_polling_enabled():
         assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
-        EventProcessorUtils.wait_for_event_processing(self)
-        self.confirm_table_exists(unique_database, "t1_hive")
+        self.wait_for_table_to_appear(unique_database, "t1_hive")
       else:
         self.client.execute("invalidate metadata {0}".format(hive_table))
 
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index e712b7421..aa380a951 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -28,7 +28,6 @@ from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
 from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option
 from tests.common.test_result_verifier import error_msg_expected
-from tests.util.event_processor_utils import EventProcessorUtils
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
 LOG = logging.getLogger(__name__)
@@ -319,15 +318,13 @@ class TestKuduHMSIntegration(CustomKuduTest):
     kudu_client.delete_table(kudu_tbl_name)
     assert not kudu_client.table_exists(kudu_tbl_name)
 
-    # Wait for events to prevent race condition
-    EventProcessorUtils.wait_for_event_processing(self)
-
     try:
-      cursor.execute("DROP TABLE %s" % kudu_tbl_name)
+      self.execute_query_with_hms_sync(
+          "DROP TABLE %s" % kudu_tbl_name, 10, strict=True)
       assert False
     except Exception as e:
       LOG.info(str(e))
-      "Table does not exist: %s" % kudu_tbl_name in str(e)
+      assert "Table does not exist: %s" % kudu_tbl_name in str(e)
 
   @pytest.mark.execute_serially
   def test_drop_external_kudu_table(self, cursor, kudu_client, 
unique_database):
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index 16f3e29cc..1d1d2cc19 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -20,33 +20,34 @@ import pytest
 import re
 import time
 
-from tests.common.impala_cluster import ImpalaCluster
+from tests.common.test_dimensions import (
+    create_single_exec_option_dimension,
+    add_mandatory_exec_option)
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
 from tests.metadata.test_event_processing_base import TestEventProcessingBase
 from tests.util.event_processor_utils import EventProcessorUtils
 
+PROCESSING_TIMEOUT_S = 10
+
 
 @SkipIfFS.hive
 @SkipIfCatalogV2.hms_event_polling_disabled()
 class TestEventProcessing(ImpalaTestSuite):
   """This class contains tests that exercise the event processing mechanism in 
the
   catalog."""
-  CATALOG_URL = "http://localhost:25020";
-  PROCESSING_TIMEOUT_S = 10
 
   @SkipIfHive2.acid
   def test_transactional_insert_events(self, unique_database):
     """Executes 'run_test_insert_events' for transactional tables.
     """
-    TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, 
self.client,
-        ImpalaCluster.get_e2e_test_cluster(), unique_database, 
is_transactional=True)
+    TestEventProcessingBase._run_test_insert_events_impl(
+        unique_database, is_transactional=True)
 
   def test_insert_events(self, unique_database):
     """Executes 'run_test_insert_events' for non-transactional tables.
     """
-    TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, 
self.client,
-        ImpalaCluster.get_e2e_test_cluster(), unique_database)
+    TestEventProcessingBase._run_test_insert_events_impl(unique_database)
 
   def test_iceberg_inserts(self):
     """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event 
generation
@@ -75,8 +76,8 @@ class TestEventProcessing(ImpalaTestSuite):
     self._run_test_empty_partition_events(unique_database, False)
 
   def test_event_based_replication(self):
-    
TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client,
-        self.client, ImpalaCluster.get_e2e_test_cluster(), 
self.filesystem_client)
+    TestEventProcessingBase._run_event_based_replication_tests_impl(
+        self.filesystem_client)
 
   def _run_test_empty_partition_events(self, unique_database, 
is_transactional):
     test_tbl = unique_database + ".test_events"
@@ -84,29 +85,28 @@ class TestEventProcessing(ImpalaTestSuite):
       is_transactional)
     self.run_stmt_in_hive("create table {0} (key string, value string) \
       partitioned by (year int) stored as parquet {1}".format(test_tbl, 
TBLPROPERTIES))
-    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.set_configuration({
+      "sync_hms_events_wait_time_s": PROCESSING_TIMEOUT_S,
+      "sync_hms_events_strict_mode": True
+    })
     self.client.execute("describe {0}".format(test_tbl))
 
     self.run_stmt_in_hive(
       "alter table {0} add partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self)
     assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
 
     self.run_stmt_in_hive(
       "alter table {0} add if not exists partition 
(year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self)
     assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
     self.run_stmt_in_hive(
       "alter table {0} drop partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self)
     assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
     self.run_stmt_in_hive(
       "alter table {0} drop if exists partition (year=2019)".format(test_tbl))
-    EventProcessorUtils.wait_for_event_processing(self)
     assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
@@ -287,3 +287,125 @@ class TestEventProcessing(ImpalaTestSuite):
     assert warning in cmd_output
     # The cleanup method will drop 'unique_database' and tables in it, which 
generates
     # more than 2 self-events. It's OK for EP to skip them.
+
+
[email protected]
[email protected]_event_polling_disabled()
+class TestEventSyncWaiting(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-planner'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestEventSyncWaiting, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    add_mandatory_exec_option(cls, 'sync_hms_events_wait_time_s', 
PROCESSING_TIMEOUT_S)
+    add_mandatory_exec_option(cls, 'sync_hms_events_strict_mode', True)
+
+  def test_hms_event_sync(self, vector, unique_database):
+    """Verify query option sync_hms_events_wait_time_s should protect the 
query by
+    waiting until Impala sync the HMS changes."""
+    client = self.default_impala_client(vector.get_value('protocol'))
+    client.set_configuration(vector.get_exec_option_dict())
+    tbl_name = unique_database + ".tbl"
+    label = "Synced events from Metastore"
+    # Test DESCRIBE on new table created in Hive
+    self.run_stmt_in_hive(
+        "create table {0} (i int) partitioned by (p int)".format(tbl_name))
+    res = self.execute_query_expect_success(client, "describe " + tbl_name)
+    assert res.data == ["i\tint\t", 'p\tint\t']
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test SHOW TABLES gets new tables created in Hive
+    self.run_stmt_in_hive("create table {0}_2 (i int)".format(tbl_name))
+    res = self.execute_query_expect_success(client, "show tables in " + 
unique_database)
+    assert res.data == ["tbl", "tbl_2"]
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test SHOW VIEWS gets new views created in Hive
+    self.run_stmt_in_hive(
+        "create view {0}.v as select * from {1}".format(unique_database, 
tbl_name))
+    res = self.execute_query_expect_success(client, "show views in " + 
unique_database)
+    assert res.data == ["v"]
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test DROP TABLE
+    try:
+      self.run_stmt_in_hive("""create database {0}_2;
+          create table {0}_2.tbl(i int);
+          create table {0}_2.tbl_2(i int);""".format(unique_database))
+      self.execute_query_expect_success(
+          client, "drop table {0}_2.tbl".format(unique_database))
+    finally:
+      self.run_stmt_in_hive(
+          "drop database if exists {0}_2 cascade".format(unique_database))
+
+    # Test SHOW DATABASES
+    res = self.execute_query_expect_success(client, "show databases")
+    assert unique_database + "\t" in res.data
+    assert unique_database + "_2\t" not in res.data
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test DROP DATABASE
+    try:
+      self.run_stmt_in_hive("create database {0}_3".format(unique_database))
+      self.execute_query_expect_success(
+          client, "drop database {0}_3".format(unique_database))
+    finally:
+      self.run_stmt_in_hive(
+          "drop database if exists {0}_3 cascade".format(unique_database))
+
+    # Test DESCRIBE DATABASE
+    try:
+      self.run_stmt_in_hive("create database {0}_4".format(unique_database))
+      self.execute_query_expect_success(
+          client, "describe database {0}_4".format(unique_database))
+    finally:
+      self.run_stmt_in_hive("drop database if exists 
{0}_4".format(unique_database))
+
+    # Test SHOW FUNCTIONS
+    try:
+      self.run_stmt_in_hive("create database {0}_5".format(unique_database))
+      self.execute_query_expect_success(
+          client, "show functions in {0}_5".format(unique_database))
+    finally:
+      self.run_stmt_in_hive("drop database if exists 
{0}_5".format(unique_database))
+
+    # Test SELECT gets new values inserted by Hive
+    self.run_stmt_in_hive(
+        "insert into table {0} partition (p=0) select 0".format(tbl_name))
+    res = self.execute_query_expect_success(client, "select * from " + 
tbl_name)
+    assert res.data == ["0\t0"]
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+    # Same case but using INSERT OVERWRITE in Hive
+    self.run_stmt_in_hive(
+        "insert overwrite table {0} partition (p=0) select 1".format(tbl_name))
+    res = self.execute_query_expect_success(client, "select * from " + 
tbl_name)
+    assert res.data == ["1\t0"]
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test SHOW PARTITIONS gets new partitions created by Hive
+    self.run_stmt_in_hive(
+        "insert into table {0} partition (p=2) select 2".format(tbl_name))
+    res = self.execute_query_expect_success(client, "show partitions " + 
tbl_name)
+    assert self.has_value('p=0', res.data)
+    assert self.has_value('p=2', res.data)
+    # 3 result lines: 2 for partitions, 1 for total info
+    assert len(res.data) == 3
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+    # Test CREATE TABLE on table dropped by Hive
+    self.run_stmt_in_hive("drop table " + tbl_name)
+    self.execute_query_expect_success(client, "create table {0} (j 
int)".format(tbl_name))
+    res = self.execute_query_expect_success(client, "describe " + tbl_name)
+    assert res.data == ["j\tint\t"]
+    assert res.log == ''
+    self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
diff --git a/tests/metadata/test_event_processing_base.py 
b/tests/metadata/test_event_processing_base.py
index 054fd2cfc..97725a2c0 100644
--- a/tests/metadata/test_event_processing_base.py
+++ b/tests/metadata/test_event_processing_base.py
@@ -17,137 +17,131 @@
 from __future__ import absolute_import, division, print_function
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfFS, SkipIfCatalogV2
-from tests.util.event_processor_utils import EventProcessorUtils
+
+EVENT_SYNC_QUERY_OPTIONS = {
+    "sync_hms_events_wait_time_s": 10,
+    "sync_hms_events_strict_mode": True
+}
 
 
[email protected]
[email protected]_event_polling_disabled()
 class TestEventProcessingBase(ImpalaTestSuite):
 
   @classmethod
-  def _run_test_insert_events_impl(cls, hive_client, impala_client, 
impala_cluster,
-      unique_database, is_transactional=False):
+  def _run_test_insert_events_impl(cls, unique_database, 
is_transactional=False):
     """Test for insert event processing. Events are created in Hive and 
processed in
     Impala. The following cases are tested :
     Insert into table --> for partitioned and non-partitioned table
     Insert overwrite table --> for partitioned and non-partitioned table
     Insert into partition --> for partitioned table
     """
-    # Test table with no partitions.
-    tbl_insert_nopart = 'tbl_insert_nopart'
-    cls.run_stmt_in_hive(
-      "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
-    tblproperties = ""
-    if is_transactional:
-      tblproperties = "tblproperties ('transactional'='true'," \
-          "'transactional_properties'='insert_only')"
-    cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
-        % (unique_database, tbl_insert_nopart, tblproperties))
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Test CTAS and insert by Impala with empty results (IMPALA-10765).
-    cls.execute_query_expect_success(impala_client,
-        "create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}"
-        .format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties))
-    cls.execute_query_expect_success(impala_client,
-        "insert into {db}.ctas_tbl select * from {db}.{tbl}"
-        .format(db=unique_database, tbl=tbl_insert_nopart))
-    # Test insert into table, this will fire an insert event.
-    cls.run_stmt_in_hive("insert into %s.%s values(101, 200)"
-        % (unique_database, tbl_insert_nopart))
-    # With MetastoreEventProcessor running, the insert event will be 
processed. Query the
-    # table from Impala.
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Verify that the data is present in Impala.
-    data = cls.execute_scalar_expect_success(impala_client, "select * from 
%s.%s" %
-        (unique_database, tbl_insert_nopart))
-    assert data.split('\t') == ['101', '200']
+    with cls.create_impala_client() as impala_client:
+      # Test table with no partitions.
+      tbl_insert_nopart = 'tbl_insert_nopart'
+      cls.run_stmt_in_hive(
+        "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
+      tblproperties = ""
+      if is_transactional:
+        tblproperties = "tblproperties ('transactional'='true'," \
+            "'transactional_properties'='insert_only')"
+      cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
+          % (unique_database, tbl_insert_nopart, tblproperties))
+      impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS)
+      # Test CTAS and insert by Impala with empty results (IMPALA-10765).
+      cls.execute_query_expect_success(impala_client,
+          "create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}"
+          .format(db=unique_database, tbl=tbl_insert_nopart, 
prop=tblproperties))
+      cls.execute_query_expect_success(impala_client,
+          "insert into {db}.ctas_tbl select * from {db}.{tbl}"
+          .format(db=unique_database, tbl=tbl_insert_nopart))
+      # Test insert into table, this will fire an insert event.
+      cls.run_stmt_in_hive("insert into %s.%s values(101, 200)"
+          % (unique_database, tbl_insert_nopart))
+      # With MetastoreEventProcessor running, the insert event will be 
processed. Query
+      # the table from Impala. Verify that the data is present in Impala.
+      data = cls.execute_scalar_expect_success(impala_client, "select * from 
%s.%s" %
+          (unique_database, tbl_insert_nopart))
+      assert data.split('\t') == ['101', '200']
 
-    # Test insert overwrite. Overwrite the existing value.
-    cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
-        % (unique_database, tbl_insert_nopart))
-    # Make sure the event has been processed.
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Verify that the data is present in Impala.
-    data = cls.execute_scalar_expect_success(impala_client, "select * from 
%s.%s" %
-        (unique_database, tbl_insert_nopart))
-    assert data.split('\t') == ['101', '201']
-    # Test insert overwrite by Impala with empty results (IMPALA-10765).
-    cls.execute_query_expect_success(impala_client,
-        "insert overwrite {db}.{tbl} select * from {db}.ctas_tbl"
-        .format(db=unique_database, tbl=tbl_insert_nopart))
-    result = cls.execute_query_expect_success(impala_client,
-        "select * from {db}.{tbl}".format(db=unique_database, 
tbl=tbl_insert_nopart))
-    assert len(result.data) == 0
+      # Test insert overwrite. Overwrite the existing value.
+      cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
+          % (unique_database, tbl_insert_nopart))
+      # Make sure the event has been processed using 
sync_hms_events_wait_time_s.
+      # Verify that the data is present in Impala.
+      data = cls.execute_scalar_expect_success(impala_client, "select * from 
%s.%s" %
+          (unique_database, tbl_insert_nopart))
+      assert data.split('\t') == ['101', '201']
+      # Test insert overwrite by Impala with empty results (IMPALA-10765).
+      cls.execute_query_expect_success(impala_client,
+          "insert overwrite {db}.{tbl} select * from {db}.ctas_tbl"
+          .format(db=unique_database, tbl=tbl_insert_nopart))
+      result = cls.execute_query_expect_success(impala_client,
+          "select * from {db}.{tbl}".format(db=unique_database, 
tbl=tbl_insert_nopart))
+      assert len(result.data) == 0
 
-    # Test partitioned table.
-    tbl_insert_part = 'tbl_insert_part'
-    cls.run_stmt_in_hive("drop table if exists %s.%s"
-        % (unique_database, tbl_insert_part))
-    cls.run_stmt_in_hive("create table %s.%s (id int, name string) "
-        "partitioned by(day int, month int, year int) %s"
-        % (unique_database, tbl_insert_part, tblproperties))
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Test insert overwrite by Impala with empty results (IMPALA-10765).
-    cls.execute_query_expect_success(impala_client,
-        "create table {db}.ctas_part partitioned by (day, month, year) {prop} 
as "
-        "select * from {db}.{tbl}".format(db=unique_database, 
tbl=tbl_insert_part,
-            prop=tblproperties))
-    cls.execute_query_expect_success(impala_client,
-        "insert into {db}.ctas_part partition(day=0, month=0, year=0) select 
id, "
-        "name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part))
-    # Insert data into partitions.
-    cls.run_stmt_in_hive(
-        "insert into %s.%s partition(day=28, month=03, year=2019)"
-        "values(101, 'x')" % (unique_database, tbl_insert_part))
-    # Make sure the event has been processed.
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Verify that the data is present in Impala.
-    data = cls.execute_scalar_expect_success(impala_client,
-        "select * from %s.%s" % (unique_database, tbl_insert_part))
-    assert data.split('\t') == ['101', 'x', '28', '3', '2019']
+      # Test partitioned table.
+      tbl_insert_part = 'tbl_insert_part'
+      cls.run_stmt_in_hive("drop table if exists %s.%s"
+          % (unique_database, tbl_insert_part))
+      cls.run_stmt_in_hive("create table %s.%s (id int, name string) "
+          "partitioned by(day int, month int, year int) %s"
+          % (unique_database, tbl_insert_part, tblproperties))
+      # Test insert overwrite by Impala with empty results (IMPALA-10765).
+      cls.execute_query_expect_success(impala_client,
+          "create table {db}.ctas_part partitioned by (day, month, year) 
{prop} as "
+          "select * from {db}.{tbl}".format(db=unique_database, 
tbl=tbl_insert_part,
+              prop=tblproperties))
+      cls.execute_query_expect_success(impala_client,
+          "insert into {db}.ctas_part partition(day=0, month=0, year=0) select 
id, "
+          "name from {db}.{tbl}".format(db=unique_database, 
tbl=tbl_insert_part))
+      # Insert data into partitions.
+      cls.run_stmt_in_hive(
+          "insert into %s.%s partition(day=28, month=03, year=2019)"
+          "values(101, 'x')" % (unique_database, tbl_insert_part))
+      # Make sure the event has been processed using 
sync_hms_events_wait_time_s.
+      # Verify that the data is present in Impala.
+      data = cls.execute_scalar_expect_success(impala_client,
+          "select * from %s.%s" % (unique_database, tbl_insert_part))
+      assert data.split('\t') == ['101', 'x', '28', '3', '2019']
 
-    # Test inserting into existing partitions.
-    cls.run_stmt_in_hive(
-        "insert into %s.%s partition(day=28, month=03, year=2019)"
-        "values(102, 'y')" % (unique_database, tbl_insert_part))
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Verify that the data is present in Impala.
-    data = cls.execute_scalar_expect_success(impala_client,
-        "select count(*) from %s.%s where day=28 and month=3 "
-        "and year=2019" % (unique_database, tbl_insert_part))
-    assert data.split('\t') == ['2']
-    # Test inserting into existing partitions by Impala with empty results
-    # (IMPALA-10765).
-    cls.execute_query_expect_success(impala_client,
-        "insert into {db}.{tbl} partition(day=28, month=03, year=2019) "
-        "select id, name from {db}.ctas_part"
-        .format(db=unique_database, tbl=tbl_insert_part))
+      # Test inserting into existing partitions.
+      cls.run_stmt_in_hive(
+          "insert into %s.%s partition(day=28, month=03, year=2019)"
+          "values(102, 'y')" % (unique_database, tbl_insert_part))
+      # Verify that the data is present in Impala.
+      data = cls.execute_scalar_expect_success(impala_client,
+          "select count(*) from %s.%s where day=28 and month=3 "
+          "and year=2019" % (unique_database, tbl_insert_part))
+      assert data.split('\t') == ['2']
+      # Test inserting into existing partitions by Impala with empty results
+      # (IMPALA-10765).
+      cls.execute_query_expect_success(impala_client,
+          "insert into {db}.{tbl} partition(day=28, month=03, year=2019) "
+          "select id, name from {db}.ctas_part"
+          .format(db=unique_database, tbl=tbl_insert_part))
 
-    # Test insert overwrite into existing partitions
-    cls.run_stmt_in_hive(
-        "insert overwrite table %s.%s partition(day=28, month=03, "
-        "year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part))
-    EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
-    # Verify that the data is present in Impala.
-    data = cls.execute_scalar_expect_success(impala_client,
-        "select * from %s.%s where day=28 and month=3 and"
-        " year=2019 and id=101" % (unique_database, tbl_insert_part))
-    assert data.split('\t') == ['101', 'z', '28', '3', '2019']
-    # Test insert overwrite into existing partitions by Impala with empty 
results
-    # (IMPALA-10765).
-    cls.execute_query_expect_success(impala_client, "insert overwrite 
{db}.{tbl} "
-                       "partition(day=28, month=03, year=2019) "
-                       "select id, name from {db}.ctas_part"
-                       .format(db=unique_database, tbl=tbl_insert_part))
-    result = cls.execute_query_expect_success(impala_client, "select * from 
{db}.{tbl} "
-                                "where day=28 and month=3 and year=2019"
-                                .format(db=unique_database, 
tbl=tbl_insert_part))
-    assert len(result.data) == 0
+      # Test insert overwrite into existing partitions
+      cls.run_stmt_in_hive(
+          "insert overwrite table %s.%s partition(day=28, month=03, "
+          "year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part))
+      # Verify that the data is present in Impala.
+      data = cls.execute_scalar_expect_success(impala_client,
+          "select * from %s.%s where day=28 and month=3 and"
+          " year=2019 and id=101" % (unique_database, tbl_insert_part))
+      assert data.split('\t') == ['101', 'z', '28', '3', '2019']
+      impala_client.clear_configuration()
+      # Test insert overwrite into existing partitions by Impala with empty 
results
+      # (IMPALA-10765).
+      cls.execute_query_expect_success(impala_client, "insert overwrite 
{db}.{tbl} "
+                         "partition(day=28, month=03, year=2019) "
+                         "select id, name from {db}.ctas_part"
+                         .format(db=unique_database, tbl=tbl_insert_part))
+      result = cls.execute_query_expect_success(impala_client, "select * from 
{db}.{tbl} "
+                                  "where day=28 and month=3 and year=2019"
+                                  .format(db=unique_database, 
tbl=tbl_insert_part))
+      assert len(result.data) == 0
 
   @classmethod
-  def _run_event_based_replication_tests_impl(cls, hive_client, impala_client,
-      impala_cluster, filesystem_client, transactional=True):
+  def _run_event_based_replication_tests_impl(cls, filesystem_client, 
transactional=True):
     """Hive Replication relies on the insert events generated on the tables.
     This test issues some basic replication commands from Hive and makes sure
     that the replicated table has correct data."""
@@ -156,12 +150,13 @@ class TestEventProcessingBase(ImpalaTestSuite):
     target_db = ImpalaTestSuite.get_random_name("repl_target_")
     unpartitioned_tbl = "unpart_tbl"
     partitioned_tbl = "part_tbl"
+    impala_client = cls.create_impala_client()
     try:
       cls.run_stmt_in_hive("create database {0}".format(source_db))
       cls.run_stmt_in_hive(
         "alter database {0} set dbproperties ('repl.source.for'='xyz')"
         .format(source_db))
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
+      impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS)
       # explicit create table command since create table like doesn't allow 
tblproperties
       impala_client.execute("create table {0}.{1} (a string, b string) stored 
as parquet"
         " {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES))
@@ -195,7 +190,6 @@ class TestEventProcessingBase(ImpalaTestSuite):
       # replicate the table from source to target
       cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
         target_db))
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
       assert unpartitioned_tbl in impala_client.execute(
         "show tables in {0}".format(target_db)).get_data()
       assert partitioned_tbl in impala_client.execute(
@@ -221,9 +215,6 @@ class TestEventProcessingBase(ImpalaTestSuite):
       # replicate the table from source to target
       cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
         target_db))
-      # we wait until the events catch up in case repl command above did some 
HMS
-      # operations.
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
       # confirm the number of rows in target match with the source table.
       rows_in_unpart_tbl_target = 
int(cls.execute_scalar_expect_success(impala_client,
         "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
@@ -246,9 +237,6 @@ class TestEventProcessingBase(ImpalaTestSuite):
       # replicate the table from source to target
       cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
         target_db))
-      # we wait until the events catch up in case repl command above did some 
HMS
-      # operations.
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
       # confirm the number of rows in target match with the source table.
       rows_in_unpart_tbl_target = 
int(cls.execute_scalar_expect_success(impala_client,
         "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
@@ -270,9 +258,6 @@ class TestEventProcessingBase(ImpalaTestSuite):
       # replicate the table from source to target
       cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
         target_db))
-      # we wait until the events catch up in case repl command above did some 
HMS
-      # operations.
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
       # confirm the number of rows in target match with the source table.
       rows_in_unpart_tbl_source = 
int(cls.execute_scalar_expect_success(impala_client,
         "select count(*) from "
@@ -298,9 +283,6 @@ class TestEventProcessingBase(ImpalaTestSuite):
       # replicate the table from source to target
       cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
         target_db))
-      # we wait until the events catch up in case repl command above did some 
HMS
-      # operations.
-      EventProcessorUtils.wait_for_event_processing_impl(hive_client, 
impala_cluster)
       # confirm the number of rows in target match with the source table.
       rows_in_unpart_tbl_target = 
int(cls.execute_scalar_expect_success(impala_client,
         "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
@@ -328,6 +310,7 @@ class TestEventProcessingBase(ImpalaTestSuite):
       if target_db_obj is not None and target_db_obj.managedLocationUri is not 
None:
         filesystem_client.delete_file_dir(
           target_db_obj.managedLocationUri, True)
+      impala_client.close()
 
   @classmethod
   def __get_db_nothrow(self, name):
diff --git a/tests/metadata/test_metadata_query_statements.py 
b/tests/metadata/test_metadata_query_statements.py
index 89eea1a4e..d31d169d4 100644
--- a/tests/metadata/test_metadata_query_statements.py
+++ b/tests/metadata/test_metadata_query_statements.py
@@ -186,8 +186,14 @@ class TestMetadataQueryStatements(ImpalaTestSuite):
       if cluster_properties.is_event_polling_enabled():
         # Using HMS event processor - wait until the database shows up.
         assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
-        EventProcessorUtils.wait_for_event_processing(self)
+        self.client.set_configuration({
+          "sync_hms_events_wait_time_s": 10,
+          "sync_hms_events_strict_mode": True
+        })
+        # Waits for the externally created dbs to appear.
         self.confirm_db_exists("hive_test_desc_db")
+        self.confirm_db_exists("hive_test_desc_db2")
+        self.client.clear_configuration()
       else:
         # Invalidate metadata to pick up hive-created db.
         self.client.execute("invalidate metadata")
diff --git a/tests/query_test/test_hive_codec_interop.py 
b/tests/query_test/test_hive_codec_interop.py
index 957c8ca1d..9d245aeb6 100644
--- a/tests/query_test/test_hive_codec_interop.py
+++ b/tests/query_test/test_hive_codec_interop.py
@@ -81,7 +81,7 @@ class TestFileCodecInterop(ImpalaTestSuite):
 
     # Make sure Impala's metadata is in sync.
     if cluster_properties.is_catalog_v2_cluster():
-      self.wait_for_table_to_appear(unique_database, hive_table, timeout_s=10)
+      self.wait_for_table_to_appear(unique_database, "t1_hive", timeout_s=10)
     else:
       self.client.execute("invalidate metadata {0}".format(hive_table))
 

Reply via email to