This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6323455425f946eee5c9bd78a94b99bf422c51de Author: Csaba Ringhofer <[email protected]> AuthorDate: Fri Aug 9 18:35:00 2024 +0200 IMPALA-13293: Fix too long wait for initial catalog update Coordinators always waited 2s in Frontend.WaitForCatalog() for the first catalog update to arrive, even if the update arrived quicker. This was obvious in case of LocalCatalog, but ImpaladCatalog looked as if it would wake up when the update arrives by notifying catalogUpdateEventNotifier_. In reality the arrival of the catalog update led FeCatalogManager to create a new ImpaladCatalog instance while Frontend was still waiting for the old instance which never got notified. The change fixes the wait/notify mechanism in ImpaladCatalog and implements a similar logic for LocalCatalog in CatalogdMetaProvider. Makes starting impala cluster faster: ImpaladCatalog: ~12s->~11s LocalCatalog: ~11->~10s Change-Id: I4da1d0c0d25f80b02c39be863431201d1b913c5d Reviewed-on: http://gerrit.cloudera.org:8080/21655 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/ImpaladCatalog.java | 10 ++++++++++ .../impala/catalog/local/CatalogdMetaProvider.java | 19 +++++++++++++++++++ .../impala/catalog/local/DirectMetaProvider.java | 5 +++++ .../org/apache/impala/catalog/local/LocalCatalog.java | 8 +------- .../org/apache/impala/catalog/local/MetaProvider.java | 6 ++++++ .../org/apache/impala/service/FeCatalogManager.java | 3 +++ .../main/java/org/apache/impala/service/Frontend.java | 16 ++++++++++------ 7 files changed, 54 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 2962325be..a2ba7271e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -304,6 +304,16 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { } } + /** + * Called by FeCatalogManager when new ImpalaCatalog is created and this one is + * no longer used. Wakes up all threads that wait for catalogUpdateEventNotifier_. + */ + public void release() { + synchronized (catalogUpdateEventNotifier_) { + catalogUpdateEventNotifier_.notifyAll(); + } + } + /** * Adds the given TCatalogObject to the catalog cache. The update may be ignored * (considered out of date) if: diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 8c702c305..42204297d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -342,6 +342,10 @@ public class CatalogdMetaProvider implements MetaProvider { private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID; private final Object catalogServiceIdLock_ = new Object(); + /** + * Object that is used to synchronize on and signal when the catalog is ready for use. + */ + private final Object catalogReadyNotifier_ = new Object(); /** * Cache of authorization policy metadata. Populated from data pushed from the @@ -410,6 +414,18 @@ public class CatalogdMetaProvider implements MetaProvider { return authPolicy_; } + @Override + public void waitForIsReady(long timeoutMs) { + if (isReady()) return; + synchronized (catalogReadyNotifier_) { + try { + catalogReadyNotifier_.wait(timeoutMs); + } catch (InterruptedException e) { + // Ignore + } + } + } + @Override public boolean isReady() { return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION; @@ -1473,6 +1489,9 @@ public class CatalogdMetaProvider implements MetaProvider { // the update. if (nextCatalogVersion != null) { lastSeenCatalogVersion_.set(nextCatalogVersion); + synchronized (catalogReadyNotifier_) { + catalogReadyNotifier_.notifyAll(); + } } // NOTE: the return value is ignored when this function is called by a DDL diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 58632a397..ffb6bb461 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -117,6 +117,11 @@ class DirectMetaProvider implements MetaProvider { return true; } + @Override + public void waitForIsReady(long timeoutMs) { + // NOOP + } + @Override public ImmutableList<String> loadDbList() throws TException { try (MetaStoreClient c = msClientPool_.getClient()) { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java index e6c484c08..1973498a7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java @@ -292,13 +292,7 @@ public class LocalCatalog implements FeCatalog { @Override public void waitForCatalogUpdate(long timeoutMs) { - if (isReady()) return; - // Sleep here to avoid log spew from the retry loop in Frontend. - try { - Thread.sleep(timeoutMs); - } catch (InterruptedException e) { - // Ignore - } + metaProvider_.waitForIsReady(timeoutMs); } @Override diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index b8770e6ac..49286463f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -68,6 +68,12 @@ public interface MetaProvider { */ boolean isReady(); + /** + * Allows waiting for isReady() to become true. May return early, so + * it needs to be called in a loop. + */ + void waitForIsReady(long timeoutMs); + ImmutableList<String> loadDbList() throws TException; Database loadDb(String dbName) throws TException; diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java index ae7c47158..b4359a8cb 100644 --- a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java +++ b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java @@ -115,6 +115,7 @@ public abstract class FeCatalogManager { // If this is not a delta, this update should replace the current // Catalog contents so create a new catalog and populate it. + ImpaladCatalog oldCatalog = catalog; catalog = createNewCatalog(); TUpdateCatalogCacheResponse response = catalog.updateCatalog(req); @@ -124,6 +125,8 @@ public abstract class FeCatalogManager { // disappear. The catalog is guaranteed to be ready since updateCatalog() has a // postcondition of isReady() == true. catalog_.set(catalog); + if (oldCatalog != null) oldCatalog.release(); + return response; } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index b53dca08c..1c0f9a0fa 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1875,24 +1875,28 @@ public class Frontend { } /** - * Waits indefinitely for the local catalog to be ready. The catalog is "ready" after - * the first catalog update with a version > INITIAL_CATALOG_VERSION is received from - * the statestore. + * Waits indefinitely for the local catalog to be ready. Normally the catalog is + * "ready" after the first catalog update with a version > INITIAL_CATALOG_VERSION is + * received from the statestore. During some tests the catalog is forced to ready state + * with JniCatalog.setCatalogIsReady(). * - * @see ImpaladCatalog#isReady() + * @see ImpaladCatalog#isReady(), CatalogdMetaProvider#isReady() */ public void waitForCatalog() { LOG.info("Waiting for first catalog update from the statestore."); int numTries = 0; long startTimeMs = System.currentTimeMillis(); while (true) { - if (getCatalog().isReady()) { + // Avoid calling getCatalog again in this attempt as it may return a different + // ImpaladCatalog if a full topic update has arrived. + FeCatalog catalog = getCatalog(); + if (catalog.isReady()) { LOG.info("Local catalog initialized after: " + (System.currentTimeMillis() - startTimeMs) + " ms."); return; } LOG.info("Waiting for local catalog to be initialized, attempt: " + numTries); - getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS); + catalog.waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS); ++numTries; } }
