This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b1a32fad8a6cc5173b0ac1585af69f08d583ed9
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Mar 18 13:41:59 2025 -0700

    IMPALA-13850 (part 4): Implement in-place reset for CatalogD
    
    This patch improve the availability of CatalogD under huge INVALIDATE
    METADATA operation. Previously, CatalogServiceCatalog.reset() hold
    versionLock_.writeLock() for the whole reset duration. When the number
    of database, tables, or functions are big, this write lock can be held
    for a long time, preventing any other catalog operation from proceeding.
    
    This patch improve the situation by:
    1. Making CatalogServiceCatalog.reset() rebuild dbCache_ in place and
       occasionally release the write lock between rebuild stages.
    2. Fetch databases, tables, and functions metadata from MetaStore in
       background using ExecutorService. Added catalog_reset_max_threads
       flag to control number of threads to do parallel fetch.
    
    In order to do so, lexicographic order must be enforced during reset()
    and ensure all Db invalidation within a single stage is complete before
    releasing the write lock. Stages should run in approximately the same
    amount of time. A catalog operation over a database must ensure that no
    reset operation is currently running, or the database name is
    lexicographically less than the current database-under-invalidation.
    
    This patch adds CatalogResetManager to do background metadata fetching
    and provide helper methods to help facilitate waiting for reset
    progress. CatalogServiceCatalog must hold the versionLock_.writeLock()
    before calling most of CatalogResetManager methods.
    
    These are methods in CatalogServiceCatalog class that must wait for
    CatalogResetManager.waitOngoingMetadataFetch():
    
    addDb()
    addFunction()
    addIncompleteTable()
    addTable()
    invalidateTableIfExists()
    removeDb()
    removeFunction()
    removeTable()
    renameTable()
    replaceTableIfUnchanged()
    tryLock()
    updateDb()
    InvalidateAwareDbSnapshotIterator.hasNext()
    
    Concurrent global IM must wait until currently running global IM
    complete. The waiting happens by calling waitFullMetadataFetch().
    
    CatalogServiceCatalog.getAllDbs() get a snapshot of dbCache_ values at a
    time. With this patch, it is now possible that some Db in this snapshot
    maybe removed from dbCache() by concurrent reset(). Caller that cares
    about snapshot integrity like CatalogServiceCatalog.getCatalogDelta()
    should be careful when iterating the snapshot. It must iterate in
    lexicographic order, similar like reset(), and make sure that it does
    not go beyond the current database-under-invalidation. It also must skip
    the Db that it is currently being inspected if Db.isRemoved() is True.
    Added helper class InvalidateAwareDbSnapshot for this kind of iteration
    
    Override CatalogServiceCatalog.getDb() and
    CatalogServiceCatalog.getDbs() to wait until first reset metadata
    complete or looked up Db found in cache.
    
    Expand test_restart_catalogd_twice to test_restart_legacy_catalogd_twice
    and test_restart_local_catalogd_twice. Update
    CustomClusterTestSuite.wait_for_wm_init_complete() to correctly pass
    timeout values to helper methods that it calls. Reduce cluster_size from
    10 to 3 in few tests of test_workload_mgmt_init.py to avoid flakiness.
    
    Fixed HMS connection leak between tests in AuthorizationStmtTest (see
    IMPALA-8073).
    
    Testing:
    - Pass exhaustive tests.
    
    Change-Id: Ib4ae2154612746b34484391c5950e74b61f85c9d
    Reviewed-on: http://gerrit.cloudera.org:8080/22640
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Quanlong Huang <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  20 +
 be/src/util/backend-gflag-util.cc                  |   4 +
 common/thrift/BackendGflags.thrift                 |   4 +
 .../java/org/apache/impala/catalog/Catalog.java    |  25 +-
 .../apache/impala/catalog/CatalogResetManager.java | 335 +++++++++++++
 .../impala/catalog/CatalogServiceCatalog.java      | 533 +++++++++++++++------
 fe/src/main/java/org/apache/impala/catalog/Db.java |   8 +
 .../org/apache/impala/service/BackendConfig.java   |   8 +
 .../apache/impala/service/CatalogOpExecutor.java   |   2 +-
 .../java/org/apache/impala/util/DebugUtils.java    |   7 +-
 .../authorization/AuthorizationStmtTest.java       |  10 +-
 .../apache/impala/testutil/ImpaladTestCatalog.java |   6 +-
 tests/common/custom_cluster_test_suite.py          |  44 +-
 tests/custom_cluster/test_catalogd_ha.py           |  52 +-
 tests/custom_cluster/test_concurrent_ddls.py       |  14 +-
 tests/custom_cluster/test_ext_data_sources.py      |   4 +-
 tests/custom_cluster/test_local_catalog.py         |   9 +-
 tests/custom_cluster/test_metadata_replicas.py     |   2 +
 tests/custom_cluster/test_restart_services.py      |  37 +-
 tests/custom_cluster/test_workload_mgmt_init.py    |  12 +-
 20 files changed, 904 insertions(+), 232 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index afeaf9011..a7e39a322 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -45,6 +45,13 @@ using namespace apache::thrift;
 using namespace rapidjson;
 using namespace strings;
 
+// Validator function asserting the value of a flag is greater than 0.
+static const auto gt_0 = [](const char* name, int32_t val) {
+  if (val > 0) return true;
+  LOG(ERROR) << "Invalid value for --" << name << ": must be greater than 0";
+  return false;
+};
+
 DEFINE_int32(catalog_service_port, 26000, "port where the CatalogService is 
running");
 DEFINE_string(catalog_topic_mode, "full",
     "The type of data that the catalog service will publish into the Catalog "
@@ -219,6 +226,7 @@ DEFINE_string(default_skipped_hms_event_types,
     "HMS event types that are not used by Impala. They are skipped by default 
in "
     "fetching HMS event batches. Only in few places they will be fetched, e.g. 
fetching "
     "the latest event time in HMS.");
+
 DEFINE_string(common_hms_event_types, 
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,"
     
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,"
     
"CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN,"
@@ -271,6 +279,18 @@ DEFINE_int32(max_outstanding_events_on_executors, 1000,
     "the outstanding events exceeds the threshold. This configuration is 
applicable when "
     "enable_hierarchical_event_processing is enabled.");
 
+DEFINE_int32(reset_metadata_lock_duration_ms, 100,
+    "Duration in ms where CatalogD will hold version lock before temporarily 
releasing "
+    "it during reset/invalidate metadata operation. Setting low duration will 
increase "
+    "CatalogD availability during long invalidate metadata operation, while 
setting "
+    "high number will result in less catalog update per invalidate metadata 
operation.");
+DEFINE_validator(reset_metadata_lock_duration_ms, gt_0);
+
+DEFINE_int32(catalog_reset_max_threads, 10,
+    "Maximum number of threads for fetching metadata from Metastore in 
parallel during "
+    "catalog reset.");
+DEFINE_validator(catalog_reset_max_threads, gt_0);
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index ce52d7dca..7a010cce9 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -143,6 +143,8 @@ 
DECLARE_int32(num_table_event_executors_per_db_event_executor);
 DECLARE_int32(min_event_processor_idle_ms);
 DECLARE_int32(max_outstanding_events_on_executors);
 DECLARE_bool(consolidate_grant_revoke_requests);
+DECLARE_int32(reset_metadata_lock_duration_ms);
+DECLARE_int32(catalog_reset_max_threads);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -544,6 +546,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
       FLAGS_max_outstanding_events_on_executors);
   
cfg.__set_consolidate_grant_revoke_requests(FLAGS_consolidate_grant_revoke_requests);
   cfg.__set_iceberg_catalog_num_threads(FLAGS_iceberg_catalog_num_threads);
+  
cfg.__set_reset_metadata_lock_duration_ms(FLAGS_reset_metadata_lock_duration_ms);
+  cfg.__set_catalog_reset_max_threads(FLAGS_catalog_reset_max_threads);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 9f909b724..a9d502420 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -341,4 +341,8 @@ struct TBackendGflags {
   154: required bool consolidate_grant_revoke_requests
 
   155: required i32 iceberg_catalog_num_threads
+
+  156: required i32 reset_metadata_lock_duration_ms
+
+  157: required i32 catalog_reset_max_threads
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 4e0655a3d..df07b63aa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -23,10 +23,10 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -90,11 +90,8 @@ public abstract class Catalog implements AutoCloseable {
   // Sentry Service, if configured.
   protected AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
 
-  // Thread safe cache of database metadata. Uses an AtomicReference so reset()
-  // operations can atomically swap dbCache_ references.
-  // TODO: Update this to use a CatalogObjectCache?
-  protected AtomicReference<Map<String, Db>> dbCache_ =
-      new AtomicReference<>(new ConcurrentHashMap<String, Db>());
+  // Thread safe cache of database metadata.
+  protected CatalogObjectCache<Db> dbCache_ = new CatalogObjectCache<>();
 
   // Cache of data sources.
   protected final CatalogObjectCache<DataSource> dataSources_;
@@ -150,9 +147,7 @@ public abstract class Catalog implements AutoCloseable {
    * Adds a new database to the catalog, replacing any existing database with 
the same
    * name.
    */
-  public void addDb(Db db) {
-    dbCache_.get().put(db.getName().toLowerCase(), db);
-  }
+  public void addDb(Db db) { dbCache_.add(db); }
 
   /**
    * Gets the Db object from the Catalog using a case-insensitive lookup on 
the name.
@@ -161,7 +156,7 @@ public abstract class Catalog implements AutoCloseable {
   public Db getDb(String dbName) {
     Preconditions.checkArgument(dbName != null && !dbName.isEmpty(),
         "Null or empty database name given as argument to Catalog.getDb");
-    return dbCache_.get().get(dbName.toLowerCase());
+    return dbCache_.get(dbName.toLowerCase());
   }
 
   /**
@@ -169,15 +164,17 @@ public abstract class Catalog implements AutoCloseable {
    * if not database was removed as part of this operation. Used by DROP 
DATABASE
    * statements.
    */
-  public Db removeDb(String dbName) {
-    return dbCache_.get().remove(dbName.toLowerCase());
+  public @Nullable Db removeDb(String dbName) {
+    Db removedDb = dbCache_.remove(dbName.toLowerCase());
+    if (removedDb != null) { removedDb.markRemoved(); }
+    return removedDb;
   }
 
   /**
    * Returns all databases that match 'matcher'.
    */
   public List<Db> getDbs(PatternMatcher matcher) {
-    return filterCatalogObjectsByPattern(dbCache_.get().values(), matcher);
+    return filterCatalogObjectsByPattern(dbCache_.getValues(), matcher);
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java
new file mode 100644
index 000000000..db2370cae
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogResetManager.java
@@ -0,0 +1,335 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.util.ThreadNameAnnotator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Condition;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Manage parallel fetching of metadata from Metastore used to reset the 
Catalog.
+ *
+ * This class can either be in active state (parallel fetch is ongoing) or 
inactive
+ * state (no fetch is ongoing). The active state is indicated by the 
'fetchingDbs_'
+ * queue not being empty. When in active state, Catalog operation that want to 
look up for
+ * any database must wait until the fetch task for that database is done and 
polled out of
+ * CatalogResetManager by Catalog.
+ *
+ * Catalog should call beginFetch() to start the parallel metadata fetch. It 
then follows
+ * up by calling peekResettingDb() and pollResettingDb() continuously until 
all tasks
+ * polled. Must call stop() to clean up the executor service and reset the 
state. Most of
+ * the methods in this class must be called while holding the write lock of the
+ * CatalogServiceCatalog's version lock.
+ */
+public class CatalogResetManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogResetManager.class);
+
+  // Maximum number of threads to use for fetching metadata from Metastore.
+  private static final int MAX_NUM_THREADS =
+      BackendConfig.INSTANCE.getCatalogResetMaxThreads();
+
+  // Maximum number of fetch tasks to submit to executor service.
+  // This is intended to prevent too many fetch task from occupying memory.
+  private static final int MAX_FETCH_TASK = MAX_NUM_THREADS * 2;
+
+  // The catalog service that this reset manager is associated with.
+  private final CatalogServiceCatalog catalog_;
+
+  // Condition to wait and signal when 'versionLock_.writeLock()' is being 
held/release by
+  // reset(). Must wait on this if 'fetchingDbs_' is not empty.
+  private final Condition fetchMetadataCondition_;
+
+  // A queue of database that undergoes metadata fetch.
+  // If not empty, the elements are always in lexicographic order head to tail 
and should
+  // not contain any blacklisted Dbs. If empty, then no reset operation is 
currently
+  // running.
+  private final Queue<Pair<String, Future<PrefetchedDatabaseObjects>>> 
fetchingDbs_;
+
+  // A queue of database names that are pending to be fetched.
+  // The elements are always in lexicographic order head to tail and should 
not contain
+  // any blacklisted Dbs.
+  private final Queue<String> pendingDbNames_;
+
+  // Executor service to run the metadata fetch tasks in parallel.
+  private ExecutorService executorService_ = null;
+
+  public CatalogResetManager(CatalogServiceCatalog catalog) {
+    catalog_ = catalog;
+    fetchMetadataCondition_ = catalog.getLock().writeLock().newCondition();
+    fetchingDbs_ = new LinkedList<>();
+    pendingDbNames_ = new LinkedList<>();
+  }
+
+  private boolean threadIsHoldingWriteLock() {
+    return catalog_.getLock().writeLock().isHeldByCurrentThread();
+  }
+
+  /**
+   * Begin a metadata fetch for the given list of database names.
+   * This CatalogResetManager must not be active, and the executor service 
must be
+   * stopped before calling this method.
+   */
+  protected void beginFetch(List<String> dbNames) {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    Preconditions.checkState(
+        !isActive(), "Cannot begin reset while another reset is active.");
+    Preconditions.checkState(
+        executorService_ == null, "Existing executor service must be stopped 
first.");
+
+    executorService_ = Executors.newFixedThreadPool(MAX_NUM_THREADS,
+        new 
ThreadFactoryBuilder().setNameFormat("DatabaseResetMonitor-%d").build());
+    dbNames.stream()
+        .map(String::toLowerCase)
+        .filter(dbName -> {
+          boolean isBlacklisted = catalog_.isBlacklistedDbInternal(dbName);
+          if (isBlacklisted) {
+            LOG.info("Skipping reset for blacklisted database: " + dbName);
+          }
+          return !isBlacklisted;
+        })
+        .sorted()
+        .forEachOrdered(dbName -> pendingDbNames_.add(dbName));
+    scheduleNextFetch();
+  }
+
+  // Schedule the fetch task for the next database.
+  private void scheduleNextFetch() {
+    while (!pendingDbNames_.isEmpty() && fetchingDbs_.size() < MAX_FETCH_TASK) 
{
+      String dbName = pendingDbNames_.poll();
+      Future<PrefetchedDatabaseObjects> future =
+          executorService_.submit(new MetastoreFetchTask(dbName));
+      fetchingDbs_.add(Pair.create(dbName, future));
+    }
+  }
+
+  /**
+   * Returns True if there is an ongoing fetch operation.
+   */
+  protected boolean isActive() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    return !fetchingDbs_.isEmpty();
+  }
+
+  /**
+   * Stop the metadata fetch operation.
+   */
+  protected void stop() {
+    if (executorService_ != null) {
+      executorService_.shutdown();
+      executorService_ = null;
+    }
+    pendingDbNames_.clear();
+    fetchingDbs_.clear();
+  }
+
+  /**
+   * Signal all threads waiting on resetMetadataCondition_.
+   */
+  protected void signalAllWaiters() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    fetchMetadataCondition_.signalAll();
+  }
+
+  /**
+   * Peek the next fetching database.
+   */
+  protected Pair<String, Future<PrefetchedDatabaseObjects>> peekFetchingDb() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    return fetchingDbs_.peek();
+  }
+
+  /**
+   * Poll the next fetching database and schedule the next reset task.
+   */
+  protected Pair<String, Future<PrefetchedDatabaseObjects>> pollFetchingDb() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    Pair<String, Future<PrefetchedDatabaseObjects>> pair = fetchingDbs_.poll();
+    scheduleNextFetch();
+    if (fetchingDbs_.isEmpty()) stop();
+    return pair;
+  }
+
+  /**
+   * Return a list of all currently resetting databases.
+   */
+  protected List<String> allFetcingDbList() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    return Stream
+        .concat(fetchingDbs_.stream().map(Pair::getFirst), 
pendingDbNames_.stream())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Wait until all parallel fetch finish.
+   */
+  protected void waitFullMetadataFetch() {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    while (isActive()) {
+      try {
+        fetchMetadataCondition_.await();
+      } catch (InterruptedException ex) {
+        // IMPALA-915: Handle this properly if we support cancel query during 
frontend
+        // compilation. For now, maintain current behavior (block everything 
during
+        // INVALIDATE METADATA) by ignoring and continue waiting.
+        // fetchingDbs_ will eventually be cleared.
+      }
+    }
+  }
+
+  /**
+   * Wait until it is ensured that given 'dbName' has been polled out.
+   * This method will lower case 'dbName' for matching.
+   */
+  protected void waitOngoingMetadataFetch(String dbName) {
+    waitOngoingMetadataFetch(ImmutableList.of(dbName));
+  }
+
+  /**
+   * Wait until it is ensured that all 'dbNames' has been polled out.
+   * This method will lower case 'dbNames' and sort them for matching.
+   */
+  protected void waitOngoingMetadataFetch(List<String> dbNames) {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    List<String> lowerDbNames =
+        
dbNames.stream().map(String::toLowerCase).sorted().collect(Collectors.toList());
+    int unlockedDbs = 0;
+    while (unlockedDbs < lowerDbNames.size()) {
+      String lowerDbName = lowerDbNames.get(unlockedDbs);
+      boolean hasWait = false;
+      while (isPendingFetch(lowerDbName)) {
+        if (!hasWait) {
+          LOG.info("Waiting metadata reset for database " + lowerDbName);
+          hasWait = true;
+        }
+        try {
+          fetchMetadataCondition_.await();
+        } catch (InterruptedException ex) {
+          // IMPALA-915: Handle this properly if we support cancel query 
during frontend
+          // compilation. For now, maintain current behavior (block everything 
during
+          // INVALIDATE METADATA) by ignoring and continue waiting.
+          // fetchingDbs_ will eventually be cleared.
+        }
+      }
+      if (hasWait && lowerDbNames.size() > 1) {
+        // Back to first Db to ensure that none of 'dbNames' are ever under 
invalidation.
+        unlockedDbs = 0;
+      } else {
+        // Only advance to next Db if not wait in this iteration.
+        unlockedDbs++;
+      }
+    }
+  }
+
+  private String dbNameAtFetchQueueHead() {
+    if (fetchingDbs_.isEmpty()) return null;
+    Pair<String, Future<PrefetchedDatabaseObjects>> pair = fetchingDbs_.peek();
+    if (pair == null) return null;
+    return pair.first;
+  }
+
+  /**
+   * Return True if given lowerCaseDbName is currently in fetch queue or 
pending queue.
+   * Must hold versionLock_.writeLock() and lowerCaseDbName must be in lower 
case.
+   */
+  protected boolean isPendingFetch(String lowerCaseDbName) {
+    Preconditions.checkState(threadIsHoldingWriteLock());
+    String fetchingDbHead = dbNameAtFetchQueueHead();
+    return fetchingDbHead != null && lowerCaseDbName.compareTo(fetchingDbHead) 
>= 0;
+  }
+
+  private class MetastoreFetchTask implements 
Callable<PrefetchedDatabaseObjects> {
+    private final String dbName_;
+
+    public MetastoreFetchTask(String dbName) { this.dbName_ = dbName; }
+
+    @Override
+    public PrefetchedDatabaseObjects call() throws Exception {
+      long startTime = System.currentTimeMillis();
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient();
+           ThreadNameAnnotator tna =
+               new ThreadNameAnnotator(String.format("Prefetching %s db", 
dbName_));) {
+        // Fetch the database, functions, and table metadata from HMS.
+        Database msDb = msClient.getHiveClient().getDatabase(dbName_);
+        List<Function> javaFunctions = new ArrayList<>();
+        for (String javaFn : msClient.getHiveClient().getFunctions(dbName_, 
"*")) {
+          javaFunctions.add(msClient.getHiveClient().getFunction(dbName_, 
javaFn));
+        }
+        List<TableMeta> tableMetas = 
CatalogServiceCatalog.getTableMetaFromHive(
+            msClient, dbName_, /*tblName*/ null);
+        long duration = System.currentTimeMillis() - startTime;
+        return new PrefetchedDatabaseObjects(msDb, tableMetas,
+            catalog_.extractNativeImpalaFunctions(msDb),
+            catalog_.extractJavaFunctions(javaFunctions), duration);
+      }
+    }
+  }
+
+  public static class PrefetchedDatabaseObjects {
+    private final Database msDb_;
+    private final List<TableMeta> tableMetas_;
+    private final List<org.apache.impala.catalog.Function> nativeFunctions_;
+    private final List<org.apache.impala.catalog.Function> javaFunctions_;
+    private final long durationMs_;
+
+    public PrefetchedDatabaseObjects(Database msDb, List<TableMeta> tableMetas,
+        List<org.apache.impala.catalog.Function> nativeFunctions,
+        List<org.apache.impala.catalog.Function> javaFunctions, long 
durationMs) {
+      this.msDb_ = msDb;
+      this.nativeFunctions_ = nativeFunctions;
+      this.javaFunctions_ = javaFunctions;
+      this.tableMetas_ = tableMetas;
+      this.durationMs_ = durationMs;
+    }
+
+    public Database getMsDb() { return msDb_; }
+
+    public List<org.apache.impala.catalog.Function> getNativeFunctions() {
+      return nativeFunctions_;
+    }
+
+    public List<org.apache.impala.catalog.Function> getJavaFunctions() {
+      return javaFunctions_;
+    }
+
+    public List<TableMeta> getTableMetas() { return tableMetas_; }
+
+    public long getDurationMs() { return durationMs_; }
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index dfeadefd2..3b36768ee 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -33,19 +33,22 @@ import static 
org.apache.impala.thrift.TCatalogObjectType.TABLE;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -55,6 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
+
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -65,25 +69,21 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.impala.analysis.Path;
 import org.apache.impala.analysis.TableName;
-import org.apache.impala.analysis.TableRef;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.CatalogResetManager.PrefetchedDatabaseObjects;
 import org.apache.impala.catalog.FeFsTable.Utils;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
-import org.apache.impala.catalog.events.MetastoreEvents;
 import 
org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
-import org.apache.impala.catalog.events.MetastoreNotificationException;
 import org.apache.impala.catalog.events.MetastoreNotificationFetchException;
 import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
@@ -169,7 +169,6 @@ import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -289,6 +288,9 @@ public class CatalogServiceCatalog extends Catalog {
   //   atomically (potentially in a different database).
   private final ReentrantReadWriteLock versionLock_ = new 
ReentrantReadWriteLock(true);
 
+  // Executor service for fetching catalog objects from Metastore in the 
background.
+  private final CatalogResetManager resetManager_ = new 
CatalogResetManager(this);
+
   // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is 
incremented
   // with each update to the Catalog. Continued across the lifetime of the 
object.
   // Protected by versionLock_.
@@ -384,6 +386,9 @@ public class CatalogServiceCatalog extends Catalog {
   private int numTables_ = 0;
   private int numFunctions_ = 0;
 
+  // True if initial reset() has been triggered internally.
+  private boolean triggeredInitialReset_ = false;
+
   private final List<String> impalaSysTables;
 
   /**
@@ -462,6 +467,60 @@ public class CatalogServiceCatalog extends Catalog {
     LOG.info("Common HMS event types: " + commonHmsEventTypes_);
   }
 
+  /**
+   * If initial reset has just begin, wait until it is completed.
+   * @param dbName if supplied, wait can return early if the db is found in 
the cache.
+   */
+  private void waitInitialResetCompletion(@Nullable String dbName) {
+    boolean isWaiting = false;
+    while (!triggeredInitialReset_) {
+      if (dbName != null && dbCache_.contains(dbName)) {
+        // If the db is found in the cache, we can return early.
+        break;
+      }
+
+      versionLock_.writeLock().lock();
+      try {
+        if (!resetManager_.isActive()) {
+          // Catalog is not currently resetting, so we can stop wait.
+          // This can happen if the catalog is in passive state.
+          LOG.info("Catalog is not initialized yet. Skip waiting{}...",
+              (dbName != null ? " for db " + dbName : ""));
+          break;
+        } else {
+          // Wait for the initial reset to complete.
+          if (!isWaiting) {
+            LOG.info("Waiting for initial reset to complete{}...",
+                (dbName != null ? " for db " + dbName : ""));
+            isWaiting = true;
+          }
+          if (dbName != null) {
+            resetManager_.waitOngoingMetadataFetch(dbName);
+          } else {
+            resetManager_.waitFullMetadataFetch();
+          }
+        }
+      } finally {
+        versionLock_.writeLock().unlock();
+      }
+    }
+    if (isWaiting) {
+      LOG.info("Initial reset completed{}.", (dbName != null ? " for db " + 
dbName : ""));
+    }
+  }
+
+  @Override
+  public Db getDb(String dbName) {
+    waitInitialResetCompletion(dbName);
+    return super.getDb(dbName);
+  }
+
+  @Override
+  public List<Db> getDbs(PatternMatcher matcher) {
+    waitInitialResetCompletion(null);
+    return super.getDbs(matcher);
+  }
+
   public void startEventsProcessor() {
     Preconditions.checkNotNull(metastoreEventProcessor_,
         "Start events processor called before initializing it");
@@ -486,11 +545,16 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public boolean isBlacklistedDb(String dbName) {
     Preconditions.checkNotNull(dbName);
-    if (BackendConfig.INSTANCE.enableWorkloadMgmt() && 
dbName.equalsIgnoreCase(Db.SYS)) {
+    return isBlacklistedDbInternal(dbName.toLowerCase());
+  }
+
+  protected boolean isBlacklistedDbInternal(String loweredDbName) {
+    if (BackendConfig.INSTANCE.enableWorkloadMgmt()
+        && loweredDbName.equalsIgnoreCase(Db.SYS)) {
       // Override 'sys' for Impala system tables.
       return false;
     }
-    return blacklistedDbs_.contains(dbName.toLowerCase());
+    return blacklistedDbs_.contains(loweredDbName);
   }
 
   /**
@@ -631,6 +695,7 @@ public class CatalogServiceCatalog extends Catalog {
         versionLock_.writeLock().lock();
         Lock lock = useWriteLock ? tbl.writeLock() : tbl.readLock();
         try {
+          resetManager_.waitOngoingMetadataFetch(tbl.getDb().getName());
           //Note that we don't use the timeout directly here since the timeout
           //since we don't want to unnecessarily hold the versionLock if the 
table
           //cannot be acquired. Holding version lock can potentially blocks 
other
@@ -667,15 +732,17 @@ public class CatalogServiceCatalog extends Catalog {
         "Attempting to lock database " + db.getName())) {
       long begin = System.currentTimeMillis();
       long end;
+      String lowerCaseDbName = db.getName().toLowerCase();
       do {
         versionLock_.writeLock().lock();
-        if (db.getLock().tryLock()) {
-          long duration = System.currentTimeMillis() - begin;
-          if (duration > LOCK_ACQUIRING_DURATION_WARN_MS) {
-            LOG.warn("Lock for db {} was acquired in {} msec",
-                db.getName(), duration);
+        if (!resetManager_.isPendingFetch(lowerCaseDbName)) {
+          if (db.getLock().tryLock()) {
+            long duration = System.currentTimeMillis() - begin;
+            if (duration > LOCK_ACQUIRING_DURATION_WARN_MS) {
+              LOG.warn("Lock for db {} was acquired in {} msec", db.getName(), 
duration);
+            }
+            return true;
           }
-          return true;
         }
         versionLock_.writeLock().unlock();
         try {
@@ -1056,7 +1123,9 @@ public class CatalogServiceCatalog extends Catalog {
     } finally {
       versionLock_.readLock().unlock();
     }
-    for (Db db: getAllDbs()) {
+    InvalidateAwareDbSnapshot snapshot = new 
InvalidateAwareDbSnapshot(getAllDbs());
+    for (Db db : snapshot) {
+      if (db.isRemoved()) continue;
       ctx.numDbs++;
       addDatabaseToCatalogDelta(db, ctx);
     }
@@ -1371,13 +1440,65 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  /**
+   * A helper class to iterate list of Db.
+   * iterator() method will return an iterator that will wait for ongoing
+   * reset() if needed. Caller of iterator must still check for Db.isRemoved()
+   * status and act accordingly. Especially if there is a significant gap 
between
+   * hasNext() and next().
+   */
+  public class InvalidateAwareDbSnapshot implements Iterable<Db> {
+    private final List<Db> dbList_;
+
+    public InvalidateAwareDbSnapshot(List<Db> snapshot) {
+      dbList_ = new ArrayList<>(snapshot);
+      dbList_.sort(Comparator.comparing(Db::getName));
+    }
+
+    private class InvalidateAwareDbSnapshotIterator implements Iterator<Db> {
+      private int idx_ = 0;
+      public InvalidateAwareDbSnapshotIterator() {}
+
+      @Override
+      public boolean hasNext() {
+        while (idx_ < dbList_.size()) {
+          versionLock_.writeLock().lock();
+          try {
+            if (resetManager_.isActive()) {
+              // reset() is happening. Wait until invalidation passed the next 
Db.
+              
resetManager_.waitOngoingMetadataFetch(dbList_.get(idx_).getName());
+            }
+          } finally { versionLock_.writeLock().unlock(); }
+
+          if (dbList_.get(idx_).isRemoved()) {
+            ++idx_;
+          } else {
+            break;
+          }
+        }
+        return idx_ < dbList_.size();
+      }
+
+      @Override
+      public Db next() {
+        return dbList_.get(idx_++);
+      }
+    }
+
+    @Override
+    public Iterator<Db> iterator() {
+      return new InvalidateAwareDbSnapshotIterator();
+    }
+  }
+
   /**
    * Get a snapshot view of all the databases in the catalog.
+   * Reader must check isRemoved() value and act accordingly.
    */
   List<Db> getAllDbs() {
     versionLock_.readLock().lock();
     try {
-      return ImmutableList.copyOf(dbCache_.get().values());
+      return ImmutableList.copyOf(dbCache_.getValues());
     } finally {
       versionLock_.readLock().unlock();
     }
@@ -1558,6 +1679,7 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkNotNull(msDb.getName());
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(msDb.getName());
       Db db = getDb(msDb.getName());
       if (db == null) {
         throw new DatabaseNotFoundException("Database " + msDb.getName() + " 
not found");
@@ -1968,51 +2090,59 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Extracts Impala functions stored in metastore db parameters and adds them 
to
-   * the catalog cache.
+   * Extracts Impala functions stored in metastore db parameters.
    */
-  private void loadFunctionsFromDbParams(Db db,
+  protected List<Function> extractNativeImpalaFunctions(
       org.apache.hadoop.hive.metastore.api.Database msDb) {
-    if (msDb == null || msDb.getParameters() == null) return;
-    LOG.info("Loading native functions for database: " + db.getName());
-    List<Function> funcs = 
FunctionUtils.deserializeNativeFunctionsFromDbParams(
-        msDb.getParameters());
-    for (Function f : funcs) {
-      db.addFunction(f, false);
-      f.setCatalogVersion(incrementAndGetCatalogVersion());
-    }
-    LOG.info("Loaded {} native functions for database: {}", funcs.size(), 
db.getName());
+    if (msDb == null || msDb.getParameters() == null) return 
Collections.emptyList();
+    return 
FunctionUtils.deserializeNativeFunctionsFromDbParams(msDb.getParameters());
   }
 
   /**
-   * Loads Java functions into the catalog. For each function in "functions",
-   * we extract all Impala compatible evaluate() signatures and load them
-   * as separate functions in the catalog.
+   * Extract Java functions.
+   * For each function in "functions", we extract all Impala compatible 
evaluate()
+   * signatures.
    */
-  private void loadJavaFunctions(Db db,
+  protected List<Function> extractJavaFunctions(
       List<org.apache.hadoop.hive.metastore.api.Function> functions) {
     Preconditions.checkNotNull(functions);
     if (BackendConfig.INSTANCE.disableCatalogDataOpsDebugOnly()) {
       LOG.info("Skip loading Java functions: catalog data ops disabled.");
-      return;
+      return Collections.emptyList();
     }
-    LOG.info("Loading Java functions for database: " + db.getName());
-    int numFuncs = 0;
+    List<Function> javaFunctions = new ArrayList<>();
     for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
       try {
         HiveJavaFunctionFactoryImpl factory =
             new HiveJavaFunctionFactoryImpl(localLibraryPath_);
         HiveJavaFunction javaFunction = factory.create(function);
-        for (Function fn: javaFunction.extract()) {
-          db.addFunction(fn);
-          fn.setCatalogVersion(incrementAndGetCatalogVersion());
-          ++numFuncs;
-        }
+        javaFunctions.addAll(javaFunction.extract());
       } catch (Exception | LinkageError e) {
         LOG.error("Skipping function load: " + function.getFunctionName(), e);
       }
     }
-    LOG.info("Loaded {} Java functions for database: {}", numFuncs, 
db.getName());
+    return javaFunctions;
+  }
+
+  /**
+   * Adds native and java functions to db.
+   */
+  private void loadFunctions(
+      Db db, List<Function> nativeFuncs, List<Function> javaFuncs) {
+    long startTime = System.currentTimeMillis();
+    for (Function f : nativeFuncs) {
+      db.addFunction(f, false);
+      f.setCatalogVersion(incrementAndGetCatalogVersion());
+    }
+    for (Function fn : javaFuncs) {
+      db.addFunction(fn);
+      fn.setCatalogVersion(incrementAndGetCatalogVersion());
+    }
+    long duration = System.currentTimeMillis() - startTime;
+    if (!nativeFuncs.isEmpty() || !javaFuncs.isEmpty()) {
+      LOG.info("Loaded {} native {} and {} java functions for database {} in 
{} ms.",
+          nativeFuncs.size(), javaFuncs.size(), db.getName(), duration);
+    }
   }
 
   /**
@@ -2035,10 +2165,9 @@ public class CatalogServiceCatalog extends Catalog {
       org.apache.hadoop.hive.metastore.api.Database msDb =
           msClient.getHiveClient().getDatabase(dbName);
       tmpDb = new Db(dbName, msDb);
-      // Load native UDFs into the temporary db.
-      loadFunctionsFromDbParams(tmpDb, msDb);
-      // Load Java UDFs from HMS into the temporary db.
-      loadJavaFunctions(tmpDb, javaFns);
+      // Load native and java UDFs into the temporary db.
+      loadFunctions(
+          tmpDb, extractNativeImpalaFunctions(msDb), 
extractJavaFunctions(javaFns));
 
       Db db = getDb(dbName);
       if (db == null) {
@@ -2119,8 +2248,8 @@ public class CatalogServiceCatalog extends Catalog {
    * @param tblName Nullable table name. If it's null, all tables of the 
required database
    *               will be fetched. If it's not null, the list will contain 
only one item.
    */
-  private List<TableMeta> getTableMetaFromHive(MetaStoreClient msClient, 
String dbName,
-      @Nullable String tblName) throws TException {
+  protected static List<TableMeta> getTableMetaFromHive(MetaStoreClient 
msClient,
+      String dbName, @Nullable String tblName) throws TException {
     // Load the exact TableMeta list if pull_table_types_and_comments is set 
to true.
     if (BackendConfig.INSTANCE.pullTableTypesAndComments()) {
       return msClient.getHiveClient().getTableMeta(dbName, tblName, 
/*tableTypes*/ null);
@@ -2141,25 +2270,15 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Invalidates the database 'db'. This method can have potential race
-   * conditions with external changes to the Hive metastore and hence any
-   * conflicting changes to the objects can manifest in the form of exceptions
-   * from the HMS calls which are appropriately handled. Returns the 
invalidated
-   * 'Db' object along with list of tables to be loaded by the TableLoadingMgr.
-   * Returns null if the method encounters an exception during invalidation.
+   * Invalidates the database 'db'.
+   * Returns the invalidated 'Db' object along with list of tables to be 
loaded by
+   * the TableLoadingMgr. Returns null if the method encounters an exception 
during
+   * invalidation.
    */
-  private Pair<Db, List<TTableName>> invalidateDb(
-      MetaStoreClient msClient, String dbName, Db existingDb,
-      EventSequence catalogTimeline) {
+  private Pair<Db, List<TTableName>> invalidateDb(String dbName, Db existingDb,
+      PrefetchedDatabaseObjects prefetchedObjects, EventSequence 
catalogTimeline) {
     try {
-      List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
-          new ArrayList<>();
-      for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) {
-        javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
-      }
-      org.apache.hadoop.hive.metastore.api.Database msDb =
-          msClient.getHiveClient().getDatabase(dbName);
-      Db newDb = new Db(dbName, msDb);
+      Db newDb = new Db(dbName, prefetchedObjects.getMsDb());
       // existingDb is usually null when the Catalog loads for the first time.
       // In that case we needn't restore any transient functions.
       if (existingDb != null) {
@@ -2170,17 +2289,15 @@ public class CatalogServiceCatalog extends Catalog {
           fn.setCatalogVersion(incrementAndGetCatalogVersion());
         }
       }
-      // Reload native UDFs.
-      loadFunctionsFromDbParams(newDb, msDb);
-      // Reload Java UDFs from HMS.
-      loadJavaFunctions(newDb, javaFns);
+
+      // Reload native and Java UDFs.
+      loadFunctions(newDb, prefetchedObjects.getNativeFunctions(),
+          prefetchedObjects.getJavaFunctions());
       newDb.setCatalogVersion(incrementAndGetCatalogVersion());
-      catalogTimeline.markEvent("Loaded functions of " + dbName);
 
-      LOG.info("Loading table list for database: {}", dbName);
       int numTables = 0;
       List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
-      for (TableMeta tblMeta: getTableMetaFromHive(msClient, dbName, 
/*tblName*/null)) {
+      for (TableMeta tblMeta : prefetchedObjects.getTableMetas()) {
         String tableName = tblMeta.getTableName().toLowerCase();
         if (isBlacklistedTable(dbName, tableName)) {
           LOG.info("skip blacklisted table: " + dbName + "." + tableName);
@@ -2197,10 +2314,14 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName));
         }
       }
-      catalogTimeline.markEvent(String.format(
-          "Loaded %d table names of database %s", numTables, dbName));
-      LOG.info("Loaded table list for database: {}. Number of tables: {}",
-          dbName, numTables);
+      int numFunctions = prefetchedObjects.getNativeFunctions().size()
+          + prefetchedObjects.getJavaFunctions().size();
+      catalogTimeline.markEvent(
+          String.format("Loaded %d table names and %d functions of database 
%s",
+              numTables, numFunctions, dbName));
+      LOG.info("Loaded table list ({}) and functions ({}) for database: {}. "
+              + "Fetch duration: {} ms.",
+          numTables, numFunctions, dbName, prefetchedObjects.getDurationMs());
 
       if (existingDb != null) {
         // Identify any removed functions and add them to the delta log.
@@ -2248,15 +2369,22 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  public long reset(EventSequence catalogTimeline) throws CatalogException {
+    return reset(catalogTimeline, false);
+  }
+
   /**
    * Resets this catalog instance by clearing all cached table and database 
metadata.
    * Returns the current catalog version before reset has taken any effect. The
    * requesting impalad will use that version to determine when the
    * effects of reset have been applied to its local catalog cache.
    */
-  public long reset(EventSequence catalogTimeline) throws CatalogException {
+  public long reset(EventSequence catalogTimeline, boolean isSyncDdl)
+      throws CatalogException {
     long startVersion = getCatalogVersion();
     LOG.info("Invalidating all metadata. Version: " + startVersion);
+    Stopwatch resetTimer = Stopwatch.createStarted();
+    Stopwatch unlockedTimer = Stopwatch.createStarted();
     // First update the policy metadata.
     refreshAuthorization(true);
 
@@ -2293,71 +2421,33 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.error("Couldn't identify the default FS. Cache Pool reader will be 
disabled.");
     }
     versionLock_.writeLock().lock();
-    catalogTimeline.markEvent(GOT_CATALOG_VERSION_WRITE_LOCK);
-    // In case of an empty new catalog, the version should still change to 
reflect the
-    // reset operation itself and to unblock impalads by making the catalog 
version >
-    // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
-    if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) {
-      catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET;
-      LOG.info("First reset initiated. Version: " + catalogVersion_);
-    } else {
-      ++catalogVersion_;
-    }
-
-    // Update data source, db and table metadata
     try {
-      // Refresh DataSource objects from HMS and assign new versions.
-      refreshDataSources();
+      resetManager_.waitFullMetadataFetch();
+      unlockedTimer.stop();
+      catalogTimeline.markEvent(GOT_CATALOG_VERSION_WRITE_LOCK);
+      // In case of an empty new catalog, the version should still change to 
reflect the
+      // reset operation itself and to unblock impalads by making the catalog 
version >
+      // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
+      if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) {
+        catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET;
+        LOG.info("First reset initiated. Version: " + catalogVersion_);
+      } else {
+        ++catalogVersion_;
+      }
 
-      // Not all Java UDFs are persisted to the metastore. The ones which 
aren't
-      // should be restored once the catalog has been invalidated.
-      Map<String, Db> oldDbCache = dbCache_.get();
+      // Update data source, db and table metadata.
+      // First, refresh DataSource objects from HMS and assign new versions.
+      refreshDataSources();
 
-      // Build a new DB cache, populate it, and replace the existing cache in 
one
-      // step.
-      Map<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
-      List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
+      // Next, rebuild the dbCache_ in-place.
+      List<String> allHmsDbs;
       try (MetaStoreClient msClient = getMetaStoreClient(catalogTimeline)) {
-        List<String> allDbs = msClient.getHiveClient().getAllDatabases();
+        allHmsDbs = msClient.getHiveClient().getAllDatabases();
         catalogTimeline.markEvent("Got database list");
-        int numComplete = 0;
-        for (String dbName: allDbs) {
-          if (isBlacklistedDb(dbName)) {
-            LOG.info("skip blacklisted db: " + dbName);
-            continue;
-          }
-          String annotation = String.format("invalidating metadata - %s/%s dbs 
complete",
-              numComplete++, allDbs.size());
-          try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
-            dbName = dbName.toLowerCase();
-            Db oldDb = oldDbCache.get(dbName);
-            Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
-                dbName, oldDb, catalogTimeline);
-            if (invalidatedDb == null) continue;
-            newDbCache.put(dbName, invalidatedDb.first);
-            tblsToBackgroundLoad.addAll(invalidatedDb.second);
-          }
-
-          DebugUtils.executeDebugAction(BackendConfig.INSTANCE.debugActions(),
-              DebugUtils.RESET_METADATA_LOOP_LOCKED);
-        }
-      }
-      dbCache_.set(newDbCache);
-      catalogTimeline.markEvent("Updated catalog cache");
-
-      // Identify any deleted databases and add them to the delta log.
-      Set<String> oldDbNames = oldDbCache.keySet();
-      Set<String> newDbNames = newDbCache.keySet();
-      oldDbNames.removeAll(newDbNames);
-      for (String dbName: oldDbNames) {
-        Db removedDb = oldDbCache.get(dbName);
-        updateDeleteLog(removedDb);
       }
+      rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl);
 
-      // Submit tables for background loading.
-      for (TTableName tblName: tblsToBackgroundLoad) {
-        tableLoadingMgr_.backgroundLoad(tblName);
-      }
+      catalogTimeline.markEvent("Updated catalog cache");
     } catch (Exception e) {
       LOG.error("Error initializing Catalog", e);
       throw new CatalogException("Error initializing Catalog. Catalog may be 
empty.", e);
@@ -2366,6 +2456,10 @@ public class CatalogServiceCatalog extends Catalog {
       // acquires the version lock before us so the lastResetStartVersion_ is 
already
       // bumped. Don't need to update it in this case.
       if (lastResetStartVersion_ < startVersion) lastResetStartVersion_ = 
startVersion;
+      resetManager_.stop();
+      resetManager_.signalAllWaiters();
+      triggeredInitialReset_ = true; // set to true, regardless of success 
status.
+      unlockedTimer.start();
       versionLock_.writeLock().unlock();
       // clear all txn to write ids mapping so that there is no memory leak 
for previous
       // events
@@ -2373,10 +2467,120 @@ public class CatalogServiceCatalog extends Catalog {
       // restart the event processing for id just before the reset
       metastoreEventProcessor_.start(currentEventId);
     }
-    LOG.info("Invalidated all metadata.");
+    unlockedTimer.stop();
+    resetTimer.stop();
+    LOG.info("Invalidated all metadata in {} ms ({} ms outside version write 
lock).",
+        resetTimer.elapsed(TimeUnit.MILLISECONDS),
+        unlockedTimer.elapsed(TimeUnit.MILLISECONDS));
     return startVersion;
   }
 
+  /**
+   * Build a new dbCache_ in-place.
+   * If isSyncDdl is False, do it in stages and unlock writeLock in between 
stages.
+   * Entering and exiting this method should hold versionLock_.writeLock().
+   */
+  private void rebuildDbCache(List<String> allHmsDbs, Stopwatch unlockedTimer,
+      EventSequence catalogTimeline, boolean isSyncDdl) {
+    Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
+
+    // Not all Java UDFs are persisted to the metastore. The ones which aren't
+    // should be restored once the catalog has been invalidated.
+    List<String> oldDbNamesList = new ArrayList<>(dbCache_.keySet());
+    Collections.sort(oldDbNamesList);
+    Queue<String> oldDbNames = new LinkedList<>(oldDbNamesList);
+
+    // Fetch the list of databases, functions, and tables from HMS.
+    resetManager_.beginFetch(allHmsDbs);
+
+    Set<String> newDbNames = new HashSet<>();
+    long allDbsCount = allHmsDbs.size();
+    int numComplete = 0;
+    long lockDurationMs = 
BackendConfig.INSTANCE.getResetMetadataLockDurationMs();
+    long nextUnlock = System.currentTimeMillis() + lockDurationMs;
+
+    while (resetManager_.isActive()) {
+      Pair<String, Future<PrefetchedDatabaseObjects>> resettingDbPair =
+          resetManager_.peekFetchingDb();
+      String dbName = resettingDbPair.first;
+      String annotation =
+          String.format("invalidating metadata - %s/%s dbs complete. Current 
db: %s",
+              numComplete++, allDbsCount, dbName);
+      try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
+        // Poll oldDbNames up to current dbName.
+        while (!oldDbNames.isEmpty() && oldDbNames.peek().compareTo(dbName) < 
0) {
+          String oldDbName = oldDbNames.poll();
+          if (!newDbNames.contains(oldDbName)) {
+            LOG.info("Removing database: " + oldDbName);
+            removeDbLocked(oldDbName);
+          }
+        }
+
+        // Invalidate dbName.
+        Pair<Db, List<TTableName>> invalidatedDb = null;
+        try {
+          // Getting Future<DatabaseHmsObjects> result can have potential race
+          // conditions with external changes to the Hive metastore and hence 
any
+          // conflicting changes to the objects can manifest in the form of 
exceptions
+          // from the HMS calls which are appropriately handled by leaving 
invalidatedDb
+          // null.
+          PrefetchedDatabaseObjects hmsObjects = resettingDbPair.second.get();
+          Db oldDb = dbCache_.get(dbName);
+          invalidatedDb = invalidateDb(dbName, oldDb, hmsObjects, 
catalogTimeline);
+        } catch (Exception e) {
+          LOG.warn("Error fetching HMS objects for database " + dbName, e);
+        }
+        if (invalidatedDb != null) {
+          dbCache_.add(invalidatedDb.first);
+          // Submit tables for background loading.
+          for (TTableName tblName : invalidatedDb.second) {
+            tableLoadingMgr_.backgroundLoad(tblName);
+          }
+        }
+        newDbNames.add(dbName);
+        resetManager_.pollFetchingDb();
+
+        DebugUtils.executeDebugAction(
+            BackendConfig.INSTANCE.debugActions(), 
DebugUtils.RESET_METADATA_LOOP_LOCKED);
+      }
+
+      // Everytime lockDurationMs passed, temporarily unlock 
versionLock_.writeLock()
+      // to allow other operation to progress. DO NOT unlock if 
dbResetMonitor_ is
+      // not active.
+      if (!isSyncDdl && resetManager_.isActive()
+          && System.currentTimeMillis() >= nextUnlock) {
+        annotation =
+            String.format("invalidating metadata - %s/%s dbs complete - 
temporary unlock",
+                numComplete++, allDbsCount);
+        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
+          // Temporary release writeLock() and lock it again.
+          // Increase catalogVersion_ after relocking.
+          resetManager_.signalAllWaiters();
+          unlockedTimer.start();
+          versionLock_.writeLock().unlock();
+
+          DebugUtils.executeDebugAction(BackendConfig.INSTANCE.debugActions(),
+              DebugUtils.RESET_METADATA_LOOP_UNLOCKED);
+
+          versionLock_.writeLock().lock();
+          unlockedTimer.stop();
+          nextUnlock = System.currentTimeMillis() + lockDurationMs;
+          ++catalogVersion_;
+        }
+      }
+    }
+
+    // Poll the remaining oldDbNames.
+    while (!oldDbNames.isEmpty()) {
+      String oldDbName = oldDbNames.poll();
+      if (!newDbNames.contains(oldDbName)) {
+        LOG.info("Removing database: " + oldDbName);
+        removeDbLocked(oldDbName);
+      }
+    }
+    Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
+  }
+
   public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database 
msDb) {
     return addDb(dbName, msDb, -1);
   }
@@ -2390,6 +2594,7 @@ public class CatalogServiceCatalog extends Catalog {
     Db newDb = new Db(dbName, msDb);
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(dbName);
       newDb.setCatalogVersion(incrementAndGetCatalogVersion());
       newDb.setCreateEventId(eventId);
       addDb(newDb);
@@ -2405,17 +2610,22 @@ public class CatalogServiceCatalog extends Catalog {
    * Used by DROP DATABASE statements.
    */
   @Override
-  public Db removeDb(String dbName) {
+  public @Nullable Db removeDb(String dbName) {
     versionLock_.writeLock().lock();
     try {
-      Db removedDb = super.removeDb(dbName);
-      if (removedDb != null) updateDeleteLog(removedDb);
-      return removedDb;
+      resetManager_.waitOngoingMetadataFetch(dbName);
+      return removeDbLocked(dbName);
     } finally {
       versionLock_.writeLock().unlock();
     }
   }
 
+  private @Nullable Db removeDbLocked(String dbName) {
+    Db removedDb = super.removeDb(dbName);
+    if (removedDb != null) updateDeleteLog(removedDb);
+    return removedDb;
+  }
+
   /**
    * Helper function to clean up the state associated with a removed database. 
It creates
    * the entries in the delete log for 'db' as well as for its tables and 
functions
@@ -2450,6 +2660,7 @@ public class CatalogServiceCatalog extends Catalog {
       String tblComment, long createEventId) {
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(dbName);
       // IMPALA-9211: get db object after holding the writeLock in case of 
getting stale
       // db object due to concurrent INVALIDATE METADATA
       Db db = getDb(dbName);
@@ -2477,17 +2688,18 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Table addTable(Db db, Table table) {
     versionLock_.writeLock().lock();
+    Preconditions.checkNotNull(db);
     try {
-      
Preconditions.checkNotNull(db).addTable(Preconditions.checkNotNull(table));
+      resetManager_.waitOngoingMetadataFetch(db.getName());
+      db.addTable(Preconditions.checkNotNull(table));
     } finally {
       versionLock_.writeLock().unlock();
     }
     return table;
   }
 
-  public Table getOrLoadTable(String dbName, String tblName, String reason,
-      ValidWriteIdList validWriteIdList)
-      throws CatalogException {
+  public @Nullable Table getOrLoadTable(String dbName, String tblName, String 
reason,
+      ValidWriteIdList validWriteIdList) throws CatalogException {
     return getOrLoadTable(dbName, tblName, reason, validWriteIdList,
         TABLE_ID_UNAVAILABLE, NoOpEventSequence.INSTANCE);
   }
@@ -2501,7 +2713,7 @@ public class CatalogServiceCatalog extends Catalog {
    * and the current cached value will be returned. This may mean that a 
missing table
    * (not yet loaded table) will be returned.
    */
-  public Table getOrLoadTable(String dbName, String tblName, String reason,
+  public @Nullable Table getOrLoadTable(String dbName, String tblName, String 
reason,
       ValidWriteIdList validWriteIdList, long tableId, EventSequence 
catalogTimeline)
       throws CatalogException {
     TTableName tableName = new TTableName(dbName.toLowerCase(), 
tblName.toLowerCase());
@@ -2619,10 +2831,11 @@ public class CatalogServiceCatalog extends Catalog {
    * for transactional tables, we still replace the existing table if the 
updatedTbl has
    * more recent writeIdList than the existing table.
    */
-  private Table replaceTableIfUnchanged(Table updatedTbl, long 
expectedCatalogVersion,
-      long tableId) throws DatabaseNotFoundException {
+  private @Nullable Table replaceTableIfUnchanged(Table updatedTbl,
+      long expectedCatalogVersion, long tableId) throws 
DatabaseNotFoundException {
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(updatedTbl.getDb().getName());
       Db db = getDb(updatedTbl.getDb().getName());
       if (db == null) {
         throw new DatabaseNotFoundException(
@@ -2690,10 +2903,11 @@ public class CatalogServiceCatalog extends Catalog {
    * Returns the removed Table, or null if the table or db does not exist.
    */
   public Table removeTable(String dbName, String tblName) {
-    Db parentDb = getDb(dbName);
-    if (parentDb == null) return null;
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(dbName);
+      Db parentDb = getDb(dbName);
+      if (parentDb == null) return null;
       Table removedTable = parentDb.removeTable(tblName);
       if (removedTable != null && 
!removedTable.isStoredInImpaladCatalogCache()) {
         
CatalogMonitor.INSTANCE.getCatalogTableMetrics().removeTable(removedTable);
@@ -2717,6 +2931,7 @@ public class CatalogServiceCatalog extends Catalog {
   public Function removeFunction(Function desc) {
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(desc.dbName());
       Function removedFn = super.removeFunction(desc);
       if (removedFn != null) {
         removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
@@ -2734,10 +2949,11 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public boolean addFunction(Function fn) {
-    Db db = getDb(fn.getFunctionName().getDb());
-    if (db == null) return false;
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(fn.dbName());
+      Db db = getDb(fn.getFunctionName().getDb());
+      if (db == null) return false;
       if (db.addFunction(fn)) {
         fn.setCatalogVersion(incrementAndGetCatalogVersion());
         return true;
@@ -2799,6 +3015,9 @@ public class CatalogServiceCatalog extends Catalog {
     if (db == null) return Pair.create(null, null);
     versionLock_.writeLock().lock();
     try {
+      List<String> dbNames =
+          ImmutableList.of(oldTableName.getDb_name(), 
newTableName.getDb_name());
+      resetManager_.waitOngoingMetadataFetch(dbNames);
       Table oldTable =
           removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
       if (oldTable == null) return Pair.create(null, null);
@@ -3115,6 +3334,7 @@ public class CatalogServiceCatalog extends Catalog {
     Table incompleteTable;
     versionLock_.writeLock().lock();
     try {
+      resetManager_.waitOngoingMetadataFetch(dbName);
       Db db = getDb(dbName);
       if (db == null) return null;
       Table existingTbl = db.getTable(tblName);
@@ -4230,7 +4450,14 @@ public class CatalogServiceCatalog extends Catalog {
     TCatalogInfoSelector sel = 
Preconditions.checkNotNull(req.catalog_info_selector,
         "no catalog_info_selector in request");
     if (sel.want_db_names) {
-      resp.catalog_info.db_names = 
ImmutableList.copyOf(dbCache_.get().keySet());
+      // dbCache_ might not be fully populated yet.
+      // Report content of dbResetMonitor_ as well, if it is not empty.
+      versionLock_.writeLock().lock();
+      try {
+        Set<String> dbNames = new HashSet<>(dbCache_.keySet());
+        dbNames.addAll(resetManager_.allFetcingDbList());
+        resp.catalog_info.db_names = ImmutableList.copyOf(dbNames);
+      } finally { versionLock_.writeLock().unlock(); }
     }
     // TODO(todd) implement data sources and other global information.
     return resp;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java 
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 67588cb04..16c8ffc9d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -129,6 +129,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
   // by reading this flag and without acquiring read lock on db object
   private volatile long lastSyncedEventId_ = -1;
 
+  // Flag used by CatalogServiceCatalog to mark if this Db is already removed 
or not.
+  // Should only be used by CatalogServiceCatalog.
+  private volatile boolean isRemoved_ = false;
+
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     setMetastoreDb(name, msDb);
     tableCache_ = new CatalogObjectCache<>();
@@ -158,6 +162,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
 
   }
 
+  protected boolean isRemoved() { return isRemoved_; }
+
+  protected void markRemoved() { isRemoved_ = true; }
+
   public void setIsSystemDb(boolean b) { isSystemDb_ = b; }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 0a4ca616f..db9678fef 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -581,4 +581,12 @@ public class BackendConfig {
   public int icebergCatalogNumThreads() {
     return backendCfg_.iceberg_catalog_num_threads;
   }
+
+  public int getResetMetadataLockDurationMs() {
+    return backendCfg_.reset_metadata_lock_duration_ms;
+  }
+
+  public int getCatalogResetMaxThreads() {
+    return backendCfg_.catalog_reset_max_threads;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 38e26783f..f16e7b33f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7277,7 +7277,7 @@ public class CatalogOpExecutor {
     } else {
       // Invalidate the entire catalog if no table name is provided.
       Preconditions.checkArgument(!req.isIs_refresh());
-      resp.getResult().setVersion(catalog_.reset(catalogTimeline));
+      resp.getResult().setVersion(catalog_.reset(catalogTimeline, 
req.isSync_ddl()));
       resp.getResult().setIs_invalidate(true);
     }
     catalogTimeline.markEvent("Finished resetMetadata request");
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 89e9d978c..d5a24fc00 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -111,7 +111,7 @@ public class DebugUtils {
   // test failure for IMPALA-13126.
   public static final String MOCK_WRITE_LOCK_FAILURE = 
"mock_write_lock_failure";
 
-  // debug action lable inside CatalogServiceCatalog.reset() Db loop section 
that hold
+  // debug action label inside CatalogServiceCatalog.reset() Db loop section 
that hold
   // the write lock.
   public static final String RESET_METADATA_LOOP_LOCKED = 
"reset_metadata_loop_locked";
 
@@ -123,6 +123,11 @@ public class DebugUtils {
   // debug action label for plan creation.
   public static final String PLAN_CREATE = "plan_create";
 
+  // debug action label inside CatalogServiceCatalog.reset() Db loop section 
that release
+  // the write lock.
+  public static final String RESET_METADATA_LOOP_UNLOCKED =
+      "reset_metadata_loop_unlocked";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git 
a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java 
b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index f02af26fb..1991e7133 100644
--- 
a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ 
b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -33,8 +33,8 @@ import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableName;
+import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -76,6 +76,14 @@ public class AuthorizationStmtTest extends 
AuthorizationTestBase {
     RuntimeEnv.INSTANCE.reset();
   }
 
+  @After
+  public void closeAuthzCatalog() {
+    // This is to prevent HMS connection leak between tests (see IMPALA-8073).
+    // Class constructor will be called and create a new instance of 
authzCatalog_
+    // for each test.
+    authzCatalog_.close();
+  }
+
   @Parameters
   public static Collection<AuthorizationProvider> data() {
     return Arrays.asList(AuthorizationProvider.RANGER);
diff --git 
a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java 
b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index e9dc2bb24..f33bba560 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -18,6 +18,7 @@
 package org.apache.impala.testutil;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.NoopAuthorizationFactory;
@@ -26,7 +27,6 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsCachePool;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.Role;
@@ -41,6 +41,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 /**
  * Mock catalog used for running FE tests that allows lazy-loading of tables 
without a
  * running catalogd/statestored.
@@ -84,7 +86,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
   }
 
   @Override
-  public Db removeDb(String dbName) {
+  public @Nullable Db removeDb(String dbName) {
     return srcCatalog_.removeDb(dbName);
   }
 
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 5cf512c70..a54e9616d 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -35,9 +35,8 @@ from tests.common.file_utils import cleanup_tmp_test_dir, 
make_tmp_test_dir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.util.filesystem_utils import IS_LOCAL
-from tests.util.retry import retry
 from tests.util.workload_management import QUERY_TBL_LOG_NAME, 
QUERY_TBL_LIVE_NAME
-from time import sleep
+from time import sleep, time
 
 LOG = logging.getLogger(__name__)
 
@@ -415,28 +414,33 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if not self.SHARED_CLUSTER_ARGS:
       self.cluster_teardown(method.__name__, method.__dict__)
 
-  def wait_for_wm_init_complete(self, timeout_s=60):
+  def wait_for_wm_init_complete(self, timeout_s=180):
     """
     Waits for the catalog to report the workload management initialization 
process has
-    completed and the workload management tables to be in the local catalog. 
The input
-    timeout_s is used as the timeout for multiple separate function calls. 
Thus, the
-    theoretical max amount of time this function could wait is:
-        timeout_s + (timeout_s * num_coordinators).
+    completed and the workload management tables to be in the local catalog of
+    all coordinators.
     """
+    end_time = time() + timeout_s
     self.assert_catalogd_ha_contains("INFO", r'Completed workload management '
-        r'initialization', timeout_s)
-
-    for tbl in (QUERY_TBL_LIVE_NAME, QUERY_TBL_LOG_NAME):
-      for coord in self.cluster.get_all_coordinators():
-        # Wait until table is available in the coordinator's catalog cache.
-        def exists_func():
-          catalog_objs = coord.service.read_debug_webpage("catalog?json")
-          return tbl in catalog_objs
-
-        max_attempts = timeout_s / 3
-        assert retry(func=exists_func, max_attempts=max_attempts, 
sleep_time_s=3,
-            backoff=1), "Did not find table '{}' in local catalog of 
coordinator " \
-            "'{}:{}'.".format(tbl, coord.hostname, coord.get_webserver_port())
+        r'initialization', timeout_s=(end_time - time()))
+
+    # Wait until table is available in the coordinator's catalog cache.
+    for coord in self.cluster.get_all_coordinators():
+      success = False
+      wm_tables = list()
+      while (not success and time() < end_time):
+        wm_tables = [QUERY_TBL_LIVE_NAME, QUERY_TBL_LOG_NAME]
+        catalog_objs = coord.service.read_debug_webpage(
+            "catalog?json", timeout=(end_time - time()))
+        for tbl in list(wm_tables):
+          if tbl in catalog_objs:
+            wm_tables.remove(tbl)
+        success = (len(wm_tables) == 0)
+        if not success:
+          sleep(0.5)
+      assert success, (
+          "Did not find table '{}' in local catalog of coordinator 
'{}:{}'.").format(
+              str(wm_tables), coord.hostname, coord.get_webserver_port())
 
   def wait_for_wm_idle(self, coordinators=[], timeout_s=370):
     """Wait until workload management worker in each coordinator becomes idle.
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 53b5b422f..d7086745d 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -37,6 +37,12 @@ DEFAULT_CATALOG_SERVICE_PORT = 26000
 SLOW_BUILD_SYNC_DDL_DELAY_S = 20
 SYNC_DDL_DELAY_S = build_flavor_timeout(
     10, slow_build_timeout=SLOW_BUILD_SYNC_DDL_DELAY_S)
+SS_AUTO_FAILOVER_FREQ_MS = 500
+SS_AUTO_FAILOVER_ARGS = (
+  "--use_subscriber_id_as_catalogd_priority=true "
+  "--statestore_heartbeat_frequency_ms={0} "
+  "--active_catalogd_designation_monitoring_interval_ms={0} ").format(
+  SS_AUTO_FAILOVER_FREQ_MS)
 # s3 can behave as a slow build.
 if IS_S3:
   SYNC_DDL_DELAY_S = SLOW_BUILD_SYNC_DDL_DELAY_S
@@ -177,9 +183,9 @@ class TestCatalogdHA(CustomClusterTestSuite):
     statestore_service = self.cluster.statestored.service
 
     # Assert that cluster is set up with configs needed to run this test.
-    assert 1000 >= int(statestore_service.get_flag_current_value(
+    assert SS_AUTO_FAILOVER_FREQ_MS >= 
int(statestore_service.get_flag_current_value(
         'active_catalogd_designation_monitoring_interval_ms'))
-    assert 1000 >= int(statestore_service.get_flag_current_value(
+    assert SS_AUTO_FAILOVER_FREQ_MS >= 
int(statestore_service.get_flag_current_value(
         'statestore_heartbeat_frequency_ms'))
 
     start_count_clear_topic_entries = statestore_service.get_metric_value(
@@ -234,9 +240,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
 
   @CustomClusterTestSuite.with_args(
-    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
-                     "--statestore_heartbeat_frequency_ms=1000 "
-                     
"--active_catalogd_designation_monitoring_interval_ms=1000",
+    statestored_args=SS_AUTO_FAILOVER_ARGS,
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
     start_args="--enable_catalogd_ha")
   def test_catalogd_auto_failover(self, unique_database):
@@ -252,10 +256,9 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert failed_update_catalogd_rpc_num == 0
 
   @CustomClusterTestSuite.with_args(
-    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
-                     "--statestore_heartbeat_frequency_ms=1000 "
-                     
"--active_catalogd_designation_monitoring_interval_ms=1000 "
-                     
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]",
+    statestored_args=(
+        SS_AUTO_FAILOVER_ARGS
+        + "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:[email protected]"),
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
     start_args="--enable_catalogd_ha")
   def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
@@ -271,20 +274,35 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num
 
   @CustomClusterTestSuite.with_args(
-    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
-                     "--statestore_heartbeat_frequency_ms=1000 "
-                     
"--active_catalogd_designation_monitoring_interval_ms=1000 "
-                     
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:SLEEP@3000",
+    statestored_args=(
+        SS_AUTO_FAILOVER_ARGS
+        + "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:SLEEP@3000"),
     # minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
-    catalogd_args="--debug_actions=reset_metadata_loop_locked:SLEEP@50",
+    catalogd_args="--reset_metadata_lock_duration_ms=100 "
+                  "--debug_actions=reset_metadata_loop_locked:SLEEP@50",
     start_args="--enable_catalogd_ha")
-  
@UniqueDatabase.parametrize(name_prefix='aa_test_catalogd_auto_failover_slow')
-  def test_catalogd_auto_failover_slow(self, unique_database):
+  
@UniqueDatabase.parametrize(name_prefix='aaa_test_catalogd_auto_failover_slow_first_db')
+  def test_catalogd_auto_failover_slow_first_db(self, unique_database):
     """Tests for Catalog Service auto fail over with both slow metadata reset 
and slow
-    statestore update. Set 'aa_' as unique_database prefix to make the 
database among
+    statestore update. Set 'aaa_' as unique_database prefix to make the 
database among
     the earliest in reset metadata order."""
     self.__test_catalogd_auto_failover(unique_database)
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args=(
+        SS_AUTO_FAILOVER_ARGS
+        + "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:SLEEP@3000"),
+    # minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
+    catalogd_args="--reset_metadata_lock_duration_ms=100 "
+                  "--debug_actions=reset_metadata_loop_locked:SLEEP@50",
+    start_args="--enable_catalogd_ha")
+  
@UniqueDatabase.parametrize(name_prefix='zzz_test_catalogd_auto_failover_slow_last_db')
+  def test_catalogd_auto_failover_slow_last_db(self, unique_database):
+    """Tests for Catalog Service auto fail over with both slow metadata reset 
and slow
+    statestore update. Set 'zzz_' as unique_database prefix to make the 
database among
+    the latest in reset metadata order."""
+    self.__test_catalogd_auto_failover(unique_database)
+
   def __test_catalogd_manual_failover(self, unique_database):
     """Stop active catalogd and verify standby catalogd becomes active.
     Restart original active catalogd with force_catalogd_active as true. 
Verify that
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index c94be7977..9349048df 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -84,6 +84,16 @@ class TestConcurrentDdls(CustomClusterTestSuite):
   def test_local_catalog_ddls_with_invalidate_metadata_sync_ddl(self, 
unique_database):
     self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal "
+                  "--reset_metadata_lock_duration_ms=50 "
+                  "--debug_actions=reset_metadata_loop_unlocked:SLEEP@50")
+  def test_local_catalog_ddls_with_invalidate_metadata_unlock_gap(self, 
unique_database):
+    """Test with 50ms write unlock gap."""
+    self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
+
   def _run_ddls_with_invalidation(self, db, sync_ddl=False):
     """Test INVALIDATE METADATA with concurrent DDLs to see if any queries 
hang"""
     test_self = self
@@ -136,8 +146,8 @@ class TestConcurrentDdls(CustomClusterTestSuite):
         while True:
           try:
             handle = tls.client.execute_async(query)
-            is_finished = tls.client.wait_for_finished_timeout(handle, 
timeout=60)
-            assert is_finished, "Query timeout(60s): " + query
+            is_finished = tls.client.wait_for_finished_timeout(handle, 
timeout=120)
+            assert is_finished, "Query timeout(120s): " + query
             tls.client.close_query(handle)
             # Success, next case.
             break
diff --git a/tests/custom_cluster/test_ext_data_sources.py 
b/tests/custom_cluster/test_ext_data_sources.py
index 5aae15cfa..bfd739c4c 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -72,7 +72,9 @@ class TestExtDataSources(CustomClusterTestSuite):
 
   @SkipIfApacheHive.data_connector_not_supported
   @pytest.mark.execute_serially
-  def test_restart_catalogd(self, vector, unique_database):
+  @CustomClusterTestSuite.with_args(
+      statestored_args="--statestore_update_frequency_ms=1000")
+  def test_restart_catalogd(self):
     """Restart Catalog server after creating a data source. Verify that the 
data source
     object is persistent across restarting of Catalog server."""
     DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS 
test_restart_persistent"
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index fae07b4f7..92931ca0f 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -201,7 +201,7 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
     # Update all partitions. We should receive invalidations for partition 
id=0,1,2.
     self.execute_query("insert into my_part partition(p) values 
(0,0),(1,1),(2,2)")
 
-    log_regex = "Invalidated objects in cache: \[partition %s.my_part:p=\d 
\(id=%%d\)\]"\
+    log_regex = r"Invalidated objects in cache: \[partition %s.my_part:p=\d 
\(id=%%d\)\]"\
                 % unique_database
     self.assert_impalad_log_contains('INFO', log_regex % 0)
     self.assert_impalad_log_contains('INFO', log_regex % 1)
@@ -225,7 +225,7 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
     # Update the table. So we should receive an invalidation on partition id = 
9.
     self.execute_query("insert into my_tbl select 0")
     self.assert_impalad_log_contains(
-        'INFO', "Invalidated objects in cache: \[partition %s.my_tbl: 
\(id=9\)\]"
+        'INFO', r"Invalidated objects in cache: \[partition %s.my_tbl: 
\(id=9\)\]"
                 % unique_database)
 
   @pytest.mark.execute_serially
@@ -448,13 +448,14 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     tls = ThreadLocalClient()
 
     def do_table(i):
-      for q in [
+      queries = [
         "create table {db}.t{i} (i int)",
         "describe {db}.t{i}",
         "drop table {db}.t{i}",
         "create database {db}_{i}",
         "show tables in {db}_{i}",
-        "drop database {db}_{i}"]:
+        "drop database {db}_{i}"]
+      for q in queries:
         self.execute_query_expect_success(tls.c, q.format(
             db=unique_database, i=i))
 
diff --git a/tests/custom_cluster/test_metadata_replicas.py 
b/tests/custom_cluster/test_metadata_replicas.py
index 81d7020a0..7727ee1c7 100644
--- a/tests/custom_cluster/test_metadata_replicas.py
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -40,6 +40,8 @@ class TestMetadataReplicas(CustomClusterTestSuite):
     self.__validate_metadata()
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      statestored_args="--statestore_update_frequency_ms=1000")
   def test_catalog_restart(self, testid_checksum):
     """ IMPALA-6948: reproduces the issue by deleting a table from Hive while 
the catalogd
         is down. When catalogd is restarted, if the regression is present, the 
deleted
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 1ae35ce56..c5373686d 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -47,6 +47,9 @@ LOG = logging.getLogger(__name__)
 
 
 class TestRestart(CustomClusterTestSuite):
+
+  UPDATE_FREQUENCY_S = 4
+
   @pytest.mark.execute_serially
   def test_restart_statestore(self, cursor):
     """ Regression test of IMPALA-6973. After the statestore restarts, the 
metadata should
@@ -171,8 +174,10 @@ class TestRestart(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    statestored_args="--statestore_update_frequency_ms=5000 "
-                     "--statestore_heartbeat_frequency_ms=10000")
+      statestored_args=(
+          "--statestore_heartbeat_frequency_ms=10000 "
+          "--statestore_update_frequency_ms={frequency_ms}").format(
+              frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
   def test_restart_catalogd(self, unique_database):
     tbl_name = unique_database + ".join_aa"
     self.execute_query_expect_success(
@@ -339,7 +344,8 @@ class TestRestart(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    statestored_args="--statestore_update_frequency_ms=5000")
+      
statestored_args="--statestore_update_frequency_ms={frequency_ms}".format(
+          frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
   def test_restart_catalogd_sync_ddl(self, unique_database):
     tbl_name = unique_database + ".join_aa"
     self.execute_query_expect_success(
@@ -363,24 +369,34 @@ class TestRestart(CustomClusterTestSuite):
         "alter table {} add columns (name string)".format(tbl_name), 
query_options)
     self.execute_query_expect_success(self.client, "select name from 
{}".format(tbl_name))
 
-  UPDATE_FREQUENCY_S = 10
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms={frequency_ms}"
+    .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
+  def test_restart_legacy_catalogd_twice(self, unique_database):
+    self.run_restart_catalogd_twice(unique_database)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal",
     statestored_args="--statestore_update_frequency_ms={frequency_ms}"
     .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
-  def test_restart_catalogd_twice(self, unique_database):
+  def test_restart_local_catalogd_twice(self, unique_database):
+    self.run_restart_catalogd_twice(unique_database)
+
+  def run_restart_catalogd_twice(self, unique_database):
     tbl_name = unique_database + ".join_aa"
+
     self.cluster.catalogd.restart()
-    query = "create table {}(id int)".format(tbl_name)
-    query_handle = []
 
     def execute_query_async():
-      query_handle.append(self.execute_query(query))
+      query = "create table {}(id int)".format(tbl_name)
+      self.execute_query(query)
 
     thread = threading.Thread(target=execute_query_async)
     thread.start()
-    sleep(self.UPDATE_FREQUENCY_S - 5)
+    sleep(self.UPDATE_FREQUENCY_S // 2)
     self.cluster.catalogd.restart()
     thread.join()
     self.execute_query_expect_success(self.client,
@@ -391,7 +407,8 @@ class TestRestart(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true",
       catalogd_args="--catalog_topic_mode=minimal",
-      statestored_args="--statestore_update_frequency_ms=5000")
+      
statestored_args="--statestore_update_frequency_ms={frequency_ms}".format(
+          frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
   def test_restart_catalogd_with_local_catalog(self, unique_database):
     tbl_name = unique_database + ".join_aa"
     self.execute_query_expect_success(
diff --git a/tests/custom_cluster/test_workload_mgmt_init.py 
b/tests/custom_cluster/test_workload_mgmt_init.py
index b7c1fa492..a1eb59449 100644
--- a/tests/custom_cluster/test_workload_mgmt_init.py
+++ b/tests/custom_cluster/test_workload_mgmt_init.py
@@ -169,34 +169,34 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         r"upgraded", expected_count=0)
 
   @CustomClusterTestSuite.with_args(
-      cluster_size=10, disable_log_buffering=True,
+      cluster_size=3, disable_log_buffering=True,
       log_symlinks=True, workload_mgmt=True,
       impalad_args="--workload_mgmt_schema_version=1.0.0",
       catalogd_args="--workload_mgmt_schema_version=1.0.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_0_0(self, vector):
     """Asserts that workload management tables are properly created on version 
1.0.0 using
-       a 10 node cluster when no tables exist."""
+       a 3 node cluster when no tables exist."""
     self.check_schema("1.0.0", vector, multiple_impalad=True)
 
   @CustomClusterTestSuite.with_args(
-      cluster_size=10, disable_log_buffering=True,
+      cluster_size=3, disable_log_buffering=True,
       log_symlinks=True, workload_mgmt=True,
       impalad_args="--workload_mgmt_schema_version=1.1.0",
       catalogd_args="--workload_mgmt_schema_version=1.1.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_1_0(self, vector):
     """Asserts that workload management tables are properly created on version 
1.1.0 using
-       a 10 node cluster when no tables exist."""
+       a 3 node cluster when no tables exist."""
     self.check_schema("1.1.0", vector, multiple_impalad=True)
 
   @CustomClusterTestSuite.with_args(
-      cluster_size=10, disable_log_buffering=True,
+      cluster_size=3, disable_log_buffering=True,
       log_symlinks=True, workload_mgmt=True,
       catalogd_args="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_2_0(self, vector):
     """Asserts that workload management tables are properly created on the 
latest version
-       using a 10 node cluster when no tables exist."""
+       using a 3 node cluster when no tables exist."""
     self.check_schema("1.2.0", vector, multiple_impalad=True)
 
   @CustomClusterTestSuite.with_args(


Reply via email to