This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit be16a02fa8f98da09d572d8250363896ae10b9e7
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 8 22:39:15 2025 -0700

    IMPALA-13850 (part 2): Fix bug found by test_restart_services.py
    
    This patch stabilize test_restart_catalogd_with_local_catalog in
    test_restart_services.py after the first part of IMPALA-13850 merged.
    
    IMPALA-13850 (part 1) make local catalog mode send statestore update
    twice: the first is to announce its availability and service id, while
    the second is the full topic update. There is a slight duration where
    CatalogD accept getCatalogObject() request before the very first
    CatalogServiceCatalog.reset() initiated and obtain write lock. When such
    request went through, the request might see an empty catalog which
    results in query failures of db/table not exists.
    
    This patch block CatalogServiceThriftIf.AcceptRequest() until
    CatalogServiceCatalog.reset() initiated. Catalog version 100 is used to
    signal that initial reset has begun. Later in part 3, when we implement
    in-place metadata cache reset, AcceptRequest() can unblock faster when
    reset() release the write lock in-between catalog cache initialization.
    
    Testing:
    - Loop and pass test_restart_catalogd_with_local_catalog 100 times.
    
    Change-Id: I97f6f692506de0bbf2e1445f83bed824dc8298fd
    Reviewed-on: http://gerrit.cloudera.org:8080/22844
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                        | 17 +++++++++++++++++
 fe/src/main/java/org/apache/impala/catalog/Catalog.java |  3 +++
 .../apache/impala/catalog/CatalogServiceCatalog.java    |  7 ++++++-
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 63d6c0587..875853e0b 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -329,6 +329,9 @@ const string HADOOP_VARZ_TEMPLATE = "hadoop-varz.tmpl";
 const string HADOOP_VARZ_WEB_PAGE = "/hadoop-varz";
 
 const int REFRESH_METRICS_INTERVAL_MS = 1000;
+// Catalog version that signal that the first metadata reset has begun.
+// This should match Catalog.CATALOG_VERSION_AFTER_FIRST_RESET
+const int MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST = 100;
 
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
@@ -547,6 +550,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
  private:
   CatalogServer* catalog_server_;
   string server_address_;
+  bool has_initiated_first_reset_ = false;
 
   // Check if catalog protocols are compatible between client and catalog 
server.
   // Return Status::OK() if the protocols are compatible and catalog server is 
active.
@@ -559,6 +563,19 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
       status = Status(Substitute("Request for Catalog service is rejected 
since "
           "catalogd $0 is in standby mode", server_address_));
     }
+    while (status.ok() && !has_initiated_first_reset_) {
+      long current_catalog_version = 0;
+      status = 
catalog_server_->catalog()->GetCatalogVersion(&current_catalog_version);
+      if (!status.ok()) break;
+      if (current_catalog_version >= MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST) {
+        has_initiated_first_reset_ = true;
+      } else {
+        VLOG(1) << "Catalog is not initialized yet. Waiting for catalog 
version ("
+                << current_catalog_version << ") to be >= "
+                << MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST;
+        SleepForMs(100);
+      }
+    }
     return status;
   }
 };
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java 
b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index cf90bc9b7..4e0655a3d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -78,6 +78,9 @@ import com.google.common.base.Preconditions;
 public abstract class Catalog implements AutoCloseable {
   // Initial catalog version and ID.
   public final static long INITIAL_CATALOG_VERSION = 0L;
+  // Catalog version that signal that the first metadata reset has begun.
+  // This should match MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST.
+  public final static long CATALOG_VERSION_AFTER_FIRST_RESET = 100L;
   public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 
0L);
   public static final String DEFAULT_DB = "default";
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0a7227d7f..f1373cd04 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2297,7 +2297,12 @@ public class CatalogServiceCatalog extends Catalog {
     // In case of an empty new catalog, the version should still change to 
reflect the
     // reset operation itself and to unblock impalads by making the catalog 
version >
     // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
-    ++catalogVersion_;
+    if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) {
+      catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET;
+      LOG.info("First reset initiated. Version: " + catalogVersion_);
+    } else {
+      ++catalogVersion_;
+    }
 
     // Update data source, db and table metadata
     try {

Reply via email to