This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a41c5cbfd IMPALA-14416: JniFrontend.getDbs() should handle
InconsistentMetadataFetchException
a41c5cbfd is described below
commit a41c5cbfddcf0bd84df37f4f3b9940b609abbf29
Author: stiga-huang <[email protected]>
AuthorDate: Tue Sep 9 20:42:37 2025 +0800
IMPALA-14416: JniFrontend.getDbs() should handle
InconsistentMetadataFetchException
JniFrontend.getDbs() returns the thrift representation of all the dbs.
This might trigger multiple getPartialCatalogObject requests to catalogd
and could fail in InconsistentMetadataFetchException, e.g. if a db is
removed after coordinator fetching the db name list and before
coordinator fetching the msDb of that db.
This patch fixes the issue by retrying the above steps when hitting
InconsistentMetadataFetchException, similar to what other methods in
Frontend do. Adds getThriftDbs() in Frontend to directly return the
thrift db list so JniFrontend can use it directly and the retry can be
added inside Frontend.java.
TestAuthorization.test_local_catalog_show_dbs_with_transient_db is an
existing test to verify a similar problem. Running this test with
authorization disabled can reproduce the current bug. So this patch
extracts the test code into
TestLocalCatalogRetries._run_show_dbs_with_transient_db() and share it
in both authz enabled and disabled tests.
Tests
- Ran TestLocalCatalogRetries.test_show_dbs_retry 60 times. Without the
fix, it fails in about a dozen times.
Change-Id: Ib337f88a2ac0f35142417f6cee51d30497f12845
Reviewed-on: http://gerrit.cloudera.org:8080/23402
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/service/Frontend.java | 32 ++++++++++++++++++++--
.../org/apache/impala/service/JniFrontend.java | 8 ++----
tests/authorization/test_authorization.py | 27 ++----------------
tests/custom_cluster/__init__.py | 0
tests/custom_cluster/test_local_catalog.py | 32 ++++++++++++++++++++++
5 files changed, 66 insertions(+), 33 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index b71f052c9..56acc4f78 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1149,8 +1149,7 @@ public class Frontend {
* exception. Inconsistent metadata comes up due to interleaving catalog
object updates
* with retrieving those objects. Instead of bubbling up the issue to the
user, retrying
* can get the user's operation to run on a consistent snapshot and to
succeed.
- * Retries are *not* needed for accessing top-level objects such as
databases, since
- * they do not have a parent, so cannot be inconsistent.
+ * Max number of retries is configured by local_catalog_max_fetch_retries.
* TODO: this class is typically used in a loop at the call-site. replace
with lambdas
* in Java 8 to simplify the looping boilerplate.
*/
@@ -1481,7 +1480,8 @@ public class Frontend {
/**
* Returns all databases in catalog cache that match the pattern of
'matcher' and are
- * accessible to 'user'.
+ * accessible to 'user'. Callers should handle
InconsistentMetadataFetchException when
+ * using these dbs.
*/
public List<? extends FeDb> getDbs(PatternMatcher matcher, User user)
throws UserCancelledException, InternalException {
@@ -1506,6 +1506,32 @@ public class Frontend {
return dbs;
}
+ /**
+ * Returns thrift representation of all databases in catalog cache that
match the
+ * pattern of 'matcher' and are accessible to 'user'. Retries on
+ * InconsistentMetadataFetchException are handled in this method (see
comments of
+ * RetryTracker)
+ */
+ public List<TDatabase> getThriftDbs(PatternMatcher matcher, User user)
+ throws UserCancelledException, InternalException {
+ Frontend.RetryTracker retries = new Frontend.RetryTracker(
+ String.format("Fetching db list for user %s", user.getName()));
+ while (true) {
+ try {
+ List<? extends FeDb> dbs = getDbs(matcher, user);
+ List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
+ // LocalDb.toThrift() might trigger getPartialCatalogObject request to
catalogd
+ // which could fail if the db is dropped concurrently. In this case,
+ // InconsistentMetadataFetchException will be thrown and we will retry
from
+ // getting the db list.
+ for (FeDb db : dbs) tDbs.add(db.toThrift());
+ return tDbs;
+ } catch (InconsistentMetadataFetchException e) {
+ retries.handleRetryOrThrow(e);
+ }
+ }
+ }
+
/**
* Handles DESCRIBE HISTORY queries.
*/
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index de476bbc0..3aecb33ba 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -387,13 +387,9 @@ public class JniFrontend {
TSessionState session = params.isSetSession() ? params.getSession() : null;
User user = getUser(session);
-
- List<? extends FeDb> dbs = frontend_.getDbs(
- PatternMatcher.createHivePatternMatcher(params.pattern), user);
TGetDbsResult result = new TGetDbsResult();
- List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
- for (FeDb db: dbs) tDbs.add(db.toThrift());
- result.setDbs(tDbs);
+ result.setDbs(frontend_.getThriftDbs(
+ PatternMatcher.createHivePatternMatcher(params.pattern), user));
try {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(result);
diff --git a/tests/authorization/test_authorization.py
b/tests/authorization/test_authorization.py
index 4ba9c825d..d34b43e85 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -20,8 +20,6 @@
from __future__ import absolute_import, division, print_function
from getpass import getuser
import random
-import threading
-import time
import pytest
@@ -29,6 +27,7 @@ from tests.common.custom_cluster_test_suite import
CustomClusterTestSuite
from tests.common.file_utils import assert_file_in_dir_contains
from tests.common.test_result_verifier import error_msg_equal
from tests.common.test_vector import HS2
+from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries
PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
ADMIN = "admin"
@@ -192,25 +191,5 @@ class TestAuthorization(CustomClusterTestSuite):
# Use admin user to have create+drop privileges.
unique_database = unique_name + "_db"
admin_client = self.create_impala_client(user=ADMIN)
- stop = False
-
- def create_drop_db():
- while not stop:
- admin_client.execute("create database " + unique_database)
- # Sleep some time so coordinator can get the updates of it.
- time.sleep(0.1)
- if stop:
- break
- admin_client.execute("drop database " + unique_database)
- t = threading.Thread(target=create_drop_db)
- t.start()
-
- try:
- for i in range(100):
- self.execute_query("show databases")
- # Sleep some time so the db can be dropped.
- time.sleep(0.2)
- finally:
- stop = True
- t.join()
- admin_client.execute("drop database if exists " + unique_database)
+ TestLocalCatalogRetries._run_show_dbs_with_transient_db(
+ unique_database, admin_client, self.client)
diff --git a/tests/custom_cluster/__init__.py b/tests/custom_cluster/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/custom_cluster/test_local_catalog.py
b/tests/custom_cluster/test_local_catalog.py
index 92931ca0f..63bac1eed 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -428,6 +428,38 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
client1.close()
client2.close()
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal")
+ def test_show_dbs_retry(self, unique_name):
+ self._run_show_dbs_with_transient_db(
+ unique_name + "_db", self.create_impala_client(), self.client)
+
+ @classmethod
+ def _run_show_dbs_with_transient_db(cls, unique_database, admin_client,
user_client):
+ stop = False
+
+ def create_drop_db():
+ while not stop:
+ admin_client.execute("create database " + unique_database)
+ # Sleep some time so coordinator can get the updates of it.
+ time.sleep(0.1)
+ if stop:
+ break
+ admin_client.execute("drop database " + unique_database)
+ t = threading.Thread(target=create_drop_db)
+ t.start()
+
+ try:
+ for i in range(100):
+ user_client.execute("show databases")
+ # Sleep some time so the db can be dropped.
+ time.sleep(0.2)
+ finally:
+ stop = True
+ t.join()
+ admin_client.execute("drop database if exists " + unique_database)
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true
--inject_latency_after_catalog_fetch_ms=50",