This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6323455425f946eee5c9bd78a94b99bf422c51de
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Fri Aug 9 18:35:00 2024 +0200

    IMPALA-13293: Fix too long wait for initial catalog update
    
    Coordinators always waited 2s in Frontend.WaitForCatalog() for the
    first catalog update to arrive, even if the update arrived quicker.
    This was obvious in case of LocalCatalog, but ImpaladCatalog looked
    as if it would wake up when the update arrives by notifying
    catalogUpdateEventNotifier_. In reality the arrival of the catalog
    update led FeCatalogManager to create a new ImpaladCatalog instance
    while Frontend was still waiting for the old instance which never
    got notified.
    
    The change fixes the wait/notify mechanism in ImpaladCatalog and
    implements a similar logic for LocalCatalog in CatalogdMetaProvider.
    
    Makes starting impala cluster faster:
    ImpaladCatalog: ~12s->~11s
    LocalCatalog: ~11->~10s
    
    Change-Id: I4da1d0c0d25f80b02c39be863431201d1b913c5d
    Reviewed-on: http://gerrit.cloudera.org:8080/21655
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/catalog/ImpaladCatalog.java     | 10 ++++++++++
 .../impala/catalog/local/CatalogdMetaProvider.java    | 19 +++++++++++++++++++
 .../impala/catalog/local/DirectMetaProvider.java      |  5 +++++
 .../org/apache/impala/catalog/local/LocalCatalog.java |  8 +-------
 .../org/apache/impala/catalog/local/MetaProvider.java |  6 ++++++
 .../org/apache/impala/service/FeCatalogManager.java   |  3 +++
 .../main/java/org/apache/impala/service/Frontend.java | 16 ++++++++++------
 7 files changed, 54 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 2962325be..a2ba7271e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -304,6 +304,16 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
     }
   }
 
+  /**
+   * Called by FeCatalogManager when new ImpalaCatalog is created and this one 
is
+   * no longer used. Wakes up all threads that wait for 
catalogUpdateEventNotifier_.
+   */
+  public void release() {
+    synchronized (catalogUpdateEventNotifier_) {
+      catalogUpdateEventNotifier_.notifyAll();
+    }
+  }
+
   /**
    *  Adds the given TCatalogObject to the catalog cache. The update may be 
ignored
    *  (considered out of date) if:
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 8c702c305..42204297d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -342,6 +342,10 @@ public class CatalogdMetaProvider implements MetaProvider {
   private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID;
   private final Object catalogServiceIdLock_ = new Object();
 
+  /**
+   * Object that is used to synchronize on and signal when the catalog is 
ready for use.
+   */
+  private final Object catalogReadyNotifier_ = new Object();
 
   /**
    * Cache of authorization policy metadata. Populated from data pushed from 
the
@@ -410,6 +414,18 @@ public class CatalogdMetaProvider implements MetaProvider {
     return authPolicy_;
   }
 
+  @Override
+  public void waitForIsReady(long timeoutMs) {
+    if (isReady()) return;
+    synchronized (catalogReadyNotifier_) {
+      try {
+        catalogReadyNotifier_.wait(timeoutMs);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+
   @Override
   public boolean isReady() {
     return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION;
@@ -1473,6 +1489,9 @@ public class CatalogdMetaProvider implements MetaProvider 
{
     // the update.
     if (nextCatalogVersion != null) {
       lastSeenCatalogVersion_.set(nextCatalogVersion);
+      synchronized (catalogReadyNotifier_) {
+        catalogReadyNotifier_.notifyAll();
+      }
     }
 
     // NOTE: the return value is ignored when this function is called by a DDL
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 58632a397..ffb6bb461 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -117,6 +117,11 @@ class DirectMetaProvider implements MetaProvider {
     return true;
   }
 
+  @Override
+  public void waitForIsReady(long timeoutMs) {
+    // NOOP
+  }
+
   @Override
   public ImmutableList<String> loadDbList() throws TException {
     try (MetaStoreClient c = msClientPool_.getClient()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index e6c484c08..1973498a7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -292,13 +292,7 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public void waitForCatalogUpdate(long timeoutMs) {
-    if (isReady()) return;
-    // Sleep here to avoid log spew from the retry loop in Frontend.
-    try {
-      Thread.sleep(timeoutMs);
-    } catch (InterruptedException e) {
-      // Ignore
-    }
+    metaProvider_.waitForIsReady(timeoutMs);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index b8770e6ac..49286463f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -68,6 +68,12 @@ public interface MetaProvider {
    */
   boolean isReady();
 
+  /**
+   * Allows waiting for isReady() to become true. May return early, so
+   * it needs to be called in a loop.
+   */
+  void waitForIsReady(long timeoutMs);
+
   ImmutableList<String> loadDbList() throws TException;
 
   Database loadDb(String dbName) throws TException;
diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java 
b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
index ae7c47158..b4359a8cb 100644
--- a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
+++ b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
@@ -115,6 +115,7 @@ public abstract class FeCatalogManager {
 
       // If this is not a delta, this update should replace the current
       // Catalog contents so create a new catalog and populate it.
+      ImpaladCatalog oldCatalog = catalog;
       catalog = createNewCatalog();
 
       TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
@@ -124,6 +125,8 @@ public abstract class FeCatalogManager {
       // disappear. The catalog is guaranteed to be ready since 
updateCatalog() has a
       // postcondition of isReady() == true.
       catalog_.set(catalog);
+      if (oldCatalog != null) oldCatalog.release();
+
       return response;
     }
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b53dca08c..1c0f9a0fa 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1875,24 +1875,28 @@ public class Frontend {
   }
 
   /**
-   * Waits indefinitely for the local catalog to be ready. The catalog is 
"ready" after
-   * the first catalog update with a version > INITIAL_CATALOG_VERSION is 
received from
-   * the statestore.
+   * Waits indefinitely for the local catalog to be ready. Normally the 
catalog is
+   * "ready" after the first catalog update with a version > 
INITIAL_CATALOG_VERSION is
+   * received from the statestore. During some tests the catalog is forced to 
ready state
+   * with JniCatalog.setCatalogIsReady().
    *
-   * @see ImpaladCatalog#isReady()
+   * @see ImpaladCatalog#isReady(), CatalogdMetaProvider#isReady()
    */
   public void waitForCatalog() {
     LOG.info("Waiting for first catalog update from the statestore.");
     int numTries = 0;
     long startTimeMs = System.currentTimeMillis();
     while (true) {
-      if (getCatalog().isReady()) {
+      // Avoid calling getCatalog again in this attempt as it may return a 
different
+      // ImpaladCatalog if a full topic update has arrived.
+      FeCatalog catalog = getCatalog();
+      if (catalog.isReady()) {
         LOG.info("Local catalog initialized after: " +
             (System.currentTimeMillis() - startTimeMs) + " ms.");
         return;
       }
       LOG.info("Waiting for local catalog to be initialized, attempt: " + 
numTries);
-      getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
+      catalog.waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
       ++numTries;
     }
   }

Reply via email to