This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a41c5cbfd IMPALA-14416: JniFrontend.getDbs() should handle 
InconsistentMetadataFetchException
a41c5cbfd is described below

commit a41c5cbfddcf0bd84df37f4f3b9940b609abbf29
Author: stiga-huang <[email protected]>
AuthorDate: Tue Sep 9 20:42:37 2025 +0800

    IMPALA-14416: JniFrontend.getDbs() should handle 
InconsistentMetadataFetchException
    
    JniFrontend.getDbs() returns the thrift representation of all the dbs.
    This might trigger multiple getPartialCatalogObject requests to catalogd
    and could fail in InconsistentMetadataFetchException, e.g. if a db is
    removed after coordinator fetching the db name list and before
    coordinator fetching the msDb of that db.
    
    This patch fixes the issue by retrying the above steps when hitting
    InconsistentMetadataFetchException, similar to what other methods in
    Frontend do. Adds getThriftDbs() in Frontend to directly return the
    thrift db list so JniFrontend can use it directly and the retry can be
    added inside Frontend.java.
    
    TestAuthorization.test_local_catalog_show_dbs_with_transient_db is an
    existing test to verify a similar problem. Running this test with
    authorization disabled can reproduce the current bug. So this patch
    extracts the test code into
    TestLocalCatalogRetries._run_show_dbs_with_transient_db() and share it
    in both authz enabled and disabled tests.
    
    Tests
     - Ran TestLocalCatalogRetries.test_show_dbs_retry 60 times. Without the
       fix, it fails in about a dozen times.
    
    Change-Id: Ib337f88a2ac0f35142417f6cee51d30497f12845
    Reviewed-on: http://gerrit.cloudera.org:8080/23402
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/service/Frontend.java   | 32 ++++++++++++++++++++--
 .../org/apache/impala/service/JniFrontend.java     |  8 ++----
 tests/authorization/test_authorization.py          | 27 ++----------------
 tests/custom_cluster/__init__.py                   |  0
 tests/custom_cluster/test_local_catalog.py         | 32 ++++++++++++++++++++++
 5 files changed, 66 insertions(+), 33 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b71f052c9..56acc4f78 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1149,8 +1149,7 @@ public class Frontend {
    * exception. Inconsistent metadata comes up due to interleaving catalog 
object updates
    * with retrieving those objects. Instead of bubbling up the issue to the 
user, retrying
    * can get the user's operation to run on a consistent snapshot and to 
succeed.
-   * Retries are *not* needed for accessing top-level objects such as 
databases, since
-   * they do not have a parent, so cannot be inconsistent.
+   * Max number of retries is configured by local_catalog_max_fetch_retries.
    * TODO: this class is typically used in a loop at the call-site. replace 
with lambdas
    *       in Java 8 to simplify the looping boilerplate.
    */
@@ -1481,7 +1480,8 @@ public class Frontend {
 
   /**
    * Returns all databases in catalog cache that match the pattern of 
'matcher' and are
-   * accessible to 'user'.
+   * accessible to 'user'. Callers should handle 
InconsistentMetadataFetchException when
+   * using these dbs.
    */
   public List<? extends FeDb> getDbs(PatternMatcher matcher, User user)
       throws UserCancelledException, InternalException {
@@ -1506,6 +1506,32 @@ public class Frontend {
     return dbs;
   }
 
+  /**
+   * Returns thrift representation of all databases in catalog cache that 
match the
+   * pattern of 'matcher' and are accessible to 'user'. Retries on
+   * InconsistentMetadataFetchException are handled in this method (see 
comments of
+   * RetryTracker)
+   */
+  public List<TDatabase> getThriftDbs(PatternMatcher matcher, User user)
+      throws UserCancelledException, InternalException {
+    Frontend.RetryTracker retries = new Frontend.RetryTracker(
+        String.format("Fetching db list for user %s", user.getName()));
+    while (true) {
+      try {
+        List<? extends FeDb> dbs = getDbs(matcher, user);
+        List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
+        // LocalDb.toThrift() might trigger getPartialCatalogObject request to 
catalogd
+        // which could fail if the db is dropped concurrently. In this case,
+        // InconsistentMetadataFetchException will be thrown and we will retry 
from
+        // getting the db list.
+        for (FeDb db : dbs) tDbs.add(db.toThrift());
+        return tDbs;
+      } catch (InconsistentMetadataFetchException e) {
+        retries.handleRetryOrThrow(e);
+      }
+    }
+  }
+
   /**
    *  Handles DESCRIBE HISTORY queries.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index de476bbc0..3aecb33ba 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -387,13 +387,9 @@ public class JniFrontend {
 
     TSessionState session = params.isSetSession() ? params.getSession() : null;
     User user = getUser(session);
-
-    List<? extends FeDb> dbs = frontend_.getDbs(
-        PatternMatcher.createHivePatternMatcher(params.pattern), user);
     TGetDbsResult result = new TGetDbsResult();
-    List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
-    for (FeDb db: dbs) tDbs.add(db.toThrift());
-    result.setDbs(tDbs);
+    result.setDbs(frontend_.getThriftDbs(
+        PatternMatcher.createHivePatternMatcher(params.pattern), user));
     try {
       TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
diff --git a/tests/authorization/test_authorization.py 
b/tests/authorization/test_authorization.py
index 4ba9c825d..d34b43e85 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -20,8 +20,6 @@
 from __future__ import absolute_import, division, print_function
 from getpass import getuser
 import random
-import threading
-import time
 
 import pytest
 
@@ -29,6 +27,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 from tests.common.file_utils import assert_file_in_dir_contains
 from tests.common.test_result_verifier import error_msg_equal
 from tests.common.test_vector import HS2
+from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries
 
 PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
 ADMIN = "admin"
@@ -192,25 +191,5 @@ class TestAuthorization(CustomClusterTestSuite):
     # Use admin user to have create+drop privileges.
     unique_database = unique_name + "_db"
     admin_client = self.create_impala_client(user=ADMIN)
-    stop = False
-
-    def create_drop_db():
-      while not stop:
-        admin_client.execute("create database " + unique_database)
-        # Sleep some time so coordinator can get the updates of it.
-        time.sleep(0.1)
-        if stop:
-          break
-        admin_client.execute("drop database " + unique_database)
-    t = threading.Thread(target=create_drop_db)
-    t.start()
-
-    try:
-      for i in range(100):
-        self.execute_query("show databases")
-        # Sleep some time so the db can be dropped.
-        time.sleep(0.2)
-    finally:
-      stop = True
-      t.join()
-      admin_client.execute("drop database if exists " + unique_database)
+    TestLocalCatalogRetries._run_show_dbs_with_transient_db(
+        unique_database, admin_client, self.client)
diff --git a/tests/custom_cluster/__init__.py b/tests/custom_cluster/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index 92931ca0f..63bac1eed 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -428,6 +428,38 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
       client1.close()
       client2.close()
 
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal")
+  def test_show_dbs_retry(self, unique_name):
+    self._run_show_dbs_with_transient_db(
+        unique_name + "_db", self.create_impala_client(), self.client)
+
+  @classmethod
+  def _run_show_dbs_with_transient_db(cls, unique_database, admin_client, 
user_client):
+    stop = False
+
+    def create_drop_db():
+      while not stop:
+        admin_client.execute("create database " + unique_database)
+        # Sleep some time so coordinator can get the updates of it.
+        time.sleep(0.1)
+        if stop:
+          break
+        admin_client.execute("drop database " + unique_database)
+    t = threading.Thread(target=create_drop_db)
+    t.start()
+
+    try:
+      for i in range(100):
+        user_client.execute("show databases")
+        # Sleep some time so the db can be dropped.
+        time.sleep(0.2)
+    finally:
+      stop = True
+      t.join()
+      admin_client.execute("drop database if exists " + unique_database)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true 
--inject_latency_after_catalog_fetch_ms=50",

Reply via email to