This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 00d0b0dda IMPALA-9441,IMPALA-13170: Ops listing dbs/tables should
handle db not exists
00d0b0dda is described below
commit 00d0b0dda1e215d8e91ff52688fe6654bee52282
Author: stiga-huang <[email protected]>
AuthorDate: Thu Jun 20 16:27:51 2024 +0800
IMPALA-9441,IMPALA-13170: Ops listing dbs/tables should handle db not exists
We have some operations listing the dbs/tables in the following steps:
1. Get the db list
2. Do something on the db which could fail if the db no longer exists
For instance, when authorization is enabled, SHOW DATABASES would need a
step-2 to get the owner of each db. This is fine in the legacy catalog
mode since the whole Db object is cached in the coordinator side.
However, in the local catalog mode, the msDb could be missing in the
local cache. Coordinator then triggers a getPartialCatalogObject RPC to
load it from catalogd. If the db no longer exists in catalogd, such step
will fail.
The same in GetTables HS2 requests when listing all tables in all dbs.
In step-2 we list the table names for a db. Though it exists when we get
the db list, it could be dropped when we start listing the table names
in it.
This patch adds codes to handle the exceptions due to db no longer
exists. Also improves GetSchemas to not list the table names to get rid
of the same issue.
Tests:
- Add e2e tests
Change-Id: I2bd40d33859feca2bbd2e5f1158f3894a91c2929
Reviewed-on: http://gerrit.cloudera.org:8080/21546
Reviewed-by: Yida Wu <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
common/thrift/CatalogService.thrift | 3 +-
.../impala/catalog/local/CatalogdMetaProvider.java | 4 +-
.../local/InconsistentMetadataFetchException.java | 9 +-
.../java/org/apache/impala/service/Frontend.java | 130 +++++++++++++--------
.../java/org/apache/impala/service/MetadataOp.java | 8 +-
tests/authorization/test_authorization.py | 38 ++++++
tests/hs2/test_hs2.py | 53 +++++++++
7 files changed, 192 insertions(+), 53 deletions(-)
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index ce14337a9..66b1455bb 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -611,7 +611,8 @@ enum CatalogLookupStatus {
// cases this lookup status is set and the caller can retry the fetch.
// TODO: Fix partition lookup logic to not do it with IDs.
PARTITION_NOT_FOUND,
- DATA_SOURCE_NOT_FOUND
+ DATA_SOURCE_NOT_FOUND,
+ VERSION_MISMATCH
}
// RPC response for GetPartialCatalogObject.
diff --git
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index fb80adb2b..5d1834793 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -463,7 +463,7 @@ public class CatalogdMetaProvider implements MetaProvider {
case PARTITION_NOT_FOUND:
case DATA_SOURCE_NOT_FOUND:
invalidateCacheForObject(req.object_desc);
- throw new InconsistentMetadataFetchException(
+ throw new InconsistentMetadataFetchException(resp.lookup_status,
String.format("Fetching %s failed: %s. Could not find %s",
req.object_desc.type, resp.lookup_status, req.object_desc));
default: break;
@@ -483,7 +483,7 @@ public class CatalogdMetaProvider implements MetaProvider {
LOG.warn("Catalog object {} changed version from {} to {} while fetching
metadata",
req.object_desc.toString(), req.object_desc.catalog_version,
resp.object_version_number);
- throw new InconsistentMetadataFetchException(
+ throw new
InconsistentMetadataFetchException(CatalogLookupStatus.VERSION_MISMATCH,
String.format("Catalog object %s changed version between accesses.",
req.object_desc.toString()));
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
index 9147cecf1..1ba544189 100644
---
a/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
+++
b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
@@ -17,6 +17,8 @@
package org.apache.impala.catalog.local;
+import org.apache.impala.thrift.CatalogLookupStatus;
+
/**
* If this is thrown, it indicates that the catalog implementation in the
Impalad
* has identified that the metadata it read was not a proper snapshot of the
source
@@ -31,7 +33,12 @@ package org.apache.impala.catalog.local;
public class InconsistentMetadataFetchException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public InconsistentMetadataFetchException(String msg) {
+ private final CatalogLookupStatus reason_;
+
+ public InconsistentMetadataFetchException(CatalogLookupStatus reason, String
msg) {
super(msg);
+ reason_ = reason;
}
+
+ public CatalogLookupStatus getReason() { return reason_; }
}
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index f68a330e2..266b3c88b 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -154,6 +154,7 @@ import org.apache.impala.planner.HdfsScanNode;
import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.Planner;
import org.apache.impala.planner.ScanNode;
+import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.TAlterDbParams;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TCatalogOpRequest;
@@ -1147,25 +1148,72 @@ public class Frontend {
/**
* A Callable wrapper used for checking authorization to tables/databases.
*/
- private class CheckAuthorization implements Callable<Boolean> {
- private final String dbName_;
- private final String tblName_;
- private final String owner_;
- private final User user_;
-
- public CheckAuthorization(String dbName, String tblName, String owner,
User user) {
- // dbName and user cannot be null, tblName and owner can be null.
- Preconditions.checkNotNull(dbName);
+ private abstract class CheckAuthorization implements Callable<Boolean> {
+ protected final User user_;
+
+ public CheckAuthorization(User user) {
Preconditions.checkNotNull(user);
- dbName_ = dbName;
- tblName_ = tblName;
- owner_ = owner;
- user_ = user;
+ this.user_ = user;
}
+ public abstract boolean checkAuthorization() throws Exception;
+
@Override
public Boolean call() throws Exception {
- return Boolean.valueOf(isAccessibleToUser(dbName_, tblName_, owner_,
user_));
+ return checkAuthorization();
+ }
+ }
+
+ private class CheckDbAuthorization extends CheckAuthorization {
+ private final FeDb db_;
+
+ public CheckDbAuthorization(FeDb db, User user) {
+ super(user);
+ Preconditions.checkNotNull(db);
+ this.db_ = db;
+ }
+
+ @Override
+ public boolean checkAuthorization() throws Exception {
+ try {
+ // FeDb.getOwnerUser() could throw InconsistentMetadataFetchException
in local
+ // catalog mode if the db is not cached locally and is dropped in
catalogd.
+ return isAccessibleToUser(db_.getName(), null, db_.getOwnerUser(),
user_);
+ } catch (InconsistentMetadataFetchException e) {
+ Preconditions.checkState(e.getReason() ==
CatalogLookupStatus.DB_NOT_FOUND,
+ "Unexpected failure of InconsistentMetadataFetchException: %s",
+ e.getReason());
+ LOG.warn("Database {} no longer exists", db_.getName(), e);
+ }
+ return false;
+ }
+ }
+
+ private class CheckTableAuthorization extends CheckAuthorization {
+ private final FeTable table_;
+
+ public CheckTableAuthorization(FeTable table, User user) {
+ super(user);
+ Preconditions.checkNotNull(table);
+ this.table_ = table;
+ }
+
+ @Override
+ public boolean checkAuthorization() throws Exception {
+ // Get the owner information. Do not force load the table, only get it
+ // from cache, if it is already loaded. This means that we cannot access
+ // ownership information for unloaded tables and they will not be listed
+ // here. This might result in situations like 'show tables' not listing
+ // 'owned' tables for a given user just because the metadata is not
loaded.
+ // TODO(IMPALA-8937): Figure out a way to load Table/Database ownership
+ // information when fetching the table lists from HMS.
+ String tableOwner = table_.getOwnerUser();
+ if (tableOwner == null) {
+ LOG.info("Table {} not yet loaded, ignoring it in table listing.",
+ table_.getFullName());
+ }
+ return isAccessibleToUser(
+ table_.getDb().getName(), table_.getName(), tableOwner, user_);
}
}
@@ -1174,9 +1222,10 @@ public class Frontend {
return getTableNames(dbName, matcher, user, /*tableTypes*/
Collections.emptySet());
}
- /** Returns the names of the tables of types specified in 'tableTypes' in
database
+ /**
+ * Returns the names of the tables of types specified in 'tableTypes' in
database
* 'dbName' that are accessible to 'user'. Only tables that match the
pattern of
- * 'matcher' are returned.
+ * 'matcher' are returned. Returns an empty list if the db doesn't exist.
*/
public List<String> getTableNames(String dbName, PatternMatcher matcher,
User user,
Set<TImpalaTableType> tableTypes) throws ImpalaException {
@@ -1184,9 +1233,15 @@ public class Frontend {
String.format("fetching %s table names", dbName));
while (true) {
try {
- return doGetCatalogTableNames(dbName, matcher, user, tableTypes);
- } catch(InconsistentMetadataFetchException e) {
+ FeCatalog catalog = getCatalog();
+ List<String> tblNames = catalog.getTableNames(dbName, matcher,
tableTypes);
+ filterTablesIfAuthNeeded(dbName, user, tblNames);
+ return tblNames;
+ } catch (InconsistentMetadataFetchException e) {
retries.handleRetryOrThrow(e);
+ } catch (DatabaseNotFoundException e) {
+ LOG.warn("Database {} no longer exists", dbName, e);
+ return Collections.emptyList();
}
}
}
@@ -1231,9 +1286,10 @@ public class Frontend {
}
}
- if (failedCheckTasks > 0)
+ if (failedCheckTasks > 0) {
throw new InternalException("Failed to check access." +
"Check the server log for more details.");
+ }
}
private void filterTablesIfAuthNeeded(String dbName, User user, List<String>
tblNames)
@@ -1243,40 +1299,21 @@ public class Frontend {
if (needsAuthChecks) {
List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
- Iterator<String> iter = tblNames.iterator();
- while (iter.hasNext()) {
- String tblName = iter.next();
- // Get the owner information. Do not force load the table, only get it
- // from cache, if it is already loaded. This means that we cannot
access
- // ownership information for unloaded tables and they will not be
listed
- // here. This might result in situations like 'show tables' not listing
- // 'owned' tables for a given user just because the metadata is not
loaded.
- // TODO(IMPALA-8937): Figure out a way to load Table/Database ownership
- // information when fetching the table lists from HMS.
+ for (String tblName : tblNames) {
FeTable table = getCatalog().getTableIfCached(dbName, tblName);
- String tableOwner = table.getOwnerUser();
- if (tableOwner == null) {
- LOG.info("Table {} not yet loaded, ignoring it in table listing.",
- dbName + "." + tblName);
+ // Table could be removed after we get the table list.
+ if (table == null) {
+ LOG.warn("Table {}.{} no longer exists", dbName, tblName);
+ continue;
}
pendingCheckTasks.add(checkAuthorizationPool_.submit(
- new CheckAuthorization(dbName, tblName, tableOwner, user)));
+ new CheckTableAuthorization(table, user)));
}
filterUnaccessibleElements(pendingCheckTasks, tblNames);
}
}
- private List<String> doGetCatalogTableNames(String dbName, PatternMatcher
matcher,
- User user, Set<TImpalaTableType> tableTypes) throws ImpalaException {
- FeCatalog catalog = getCatalog();
- List<String> tblNames = catalog.getTableNames(dbName, matcher, tableTypes);
-
- filterTablesIfAuthNeeded(dbName, user, tblNames);
-
- return tblNames;
- }
-
private List<String> doGetMetadataTableNames(String dbName, String
catalogTblName,
PatternMatcher matcher, User user)
throws ImpalaException {
@@ -1426,7 +1463,7 @@ public class Frontend {
while (iter.hasNext()) {
FeDb db = iter.next();
pendingCheckTasks.add(checkAuthorizationPool_.submit(
- new CheckAuthorization(db.getName(), null, db.getOwnerUser(),
user)));
+ new CheckDbAuthorization(db, user)));
}
filterUnaccessibleElements(pendingCheckTasks, dbs);
@@ -1484,8 +1521,7 @@ public class Frontend {
private boolean isAccessibleToUser(String dbName, String tblName,
String owner, User user) throws InternalException {
Preconditions.checkNotNull(dbName);
- if (tblName == null &&
- dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
+ if (tblName == null && dbName.equalsIgnoreCase(Catalog.DEFAULT_DB)) {
// Default DB should always be shown.
return true;
}
diff --git a/fe/src/main/java/org/apache/impala/service/MetadataOp.java
b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
index c7a3dda33..05228da85 100644
--- a/fe/src/main/java/org/apache/impala/service/MetadataOp.java
+++ b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
@@ -326,13 +326,13 @@ public class MetadataOp {
PatternMatcher columnPatternMatcher, PatternMatcher fnPatternMatcher,
User user)
throws ImpalaException {
Frontend.RetryTracker retries = new Frontend.RetryTracker(
- String.format("fetching metadata"));
+ String.format("fetching metadata for user %s", user.getName()));
while (true) {
try {
return doGetDbsMetadata(fe, catalogName,
schemaPatternMatcher, tablePatternMatcher,
columnPatternMatcher, fnPatternMatcher, user);
- } catch(InconsistentMetadataFetchException e) {
+ } catch (InconsistentMetadataFetchException e) {
retries.handleRetryOrThrow(e);
}
}
@@ -356,6 +356,10 @@ public class MetadataOp {
// Get function metadata
List<Function> fns = db.getFunctions(null, fnPatternMatcher);
result.functions.add(fns);
+ } else if (tablePatternMatcher == PatternMatcher.MATCHER_MATCH_NONE
+ && columnPatternMatcher == PatternMatcher.MATCHER_MATCH_NONE) {
+ // Get db list. No need to list table names.
+ result.dbs.add(db.getName());
} else {
// Get table metadata
List<String> tableList = Lists.newArrayList();
diff --git a/tests/authorization/test_authorization.py
b/tests/authorization/test_authorization.py
index 65a41267c..e8229d16b 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -26,6 +26,8 @@ import re
import random
import sys
import subprocess
+import threading
+import time
import urllib
from getpass import getuser
@@ -228,3 +230,39 @@ class TestAuthorization(CustomClusterTestSuite):
"--ranger_app_id=impala --authorization_provider=ranger")
def test_num_check_authorization_threads_with_ranger(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger "
+ "--use_local_catalog=true",
+ catalogd_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger "
+ "--catalog_topic_mode=minimal")
+ def test_local_catalog_show_dbs_with_transient_db(self, unique_name):
+ """Regression test for IMPALA-13170"""
+ # Starts a background thread to create+drop the transient db.
+ # Use admin user to have create+drop privileges.
+ unique_database = unique_name + "_db"
+ admin_client = self.create_impala_client()
+ stop = False
+
+ def create_drop_db():
+ while not stop:
+ admin_client.execute("create database " + unique_database, user=ADMIN)
+ # Sleep some time so coordinator can get the updates of it.
+ time.sleep(0.1)
+ if stop:
+ break
+ admin_client.execute("drop database " + unique_database, user=ADMIN)
+ t = threading.Thread(target=create_drop_db)
+ t.start()
+
+ try:
+ for i in range(100):
+ self.execute_query("show databases")
+ # Sleep some time so the db can be dropped.
+ time.sleep(0.2)
+ finally:
+ stop = True
+ t.join()
+ admin_client.execute("drop database if exists " + unique_database,
user=ADMIN)
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index ba0892b17..02847442d 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -469,6 +469,59 @@ class TestHS2(HS2TestSuite):
self.session_handle)
TestHS2.check_invalid_session(self.hs2_client.GetSchemas(get_schemas_req))
+ @needs_session_cluster_properties()
+ def test_get_schemas_on_transient_db(self, cluster_properties,
unique_database):
+ # Use a new db name
+ unique_database += "_tmp"
+ stop = False
+
+ def create_drop_db():
+ while not stop:
+ self.execute_query("create database if not exists " + unique_database)
+ time.sleep(0.1)
+ if stop:
+ break
+ self.execute_query("drop database " + unique_database)
+ t = threading.Thread(target=create_drop_db)
+ t.start()
+
+ try:
+ get_schemas_req = TCLIService.TGetSchemasReq(self.session_handle)
+ for i in range(100):
+ get_schemas_resp = self.hs2_client.GetSchemas(get_schemas_req)
+ TestHS2.check_response(get_schemas_resp)
+ time.sleep(0.2)
+ finally:
+ stop = True
+ t.join()
+
+ @needs_session_cluster_properties()
+ def test_get_tables_on_transient_db(self, cluster_properties,
unique_database):
+ # Use a new db name
+ unique_database += "_tmp"
+ stop = False
+
+ def create_drop_db():
+ while not stop:
+ self.execute_query("create database if not exists " + unique_database)
+ time.sleep(0.1)
+ if stop:
+ break
+ self.execute_query("drop database " + unique_database)
+ t = threading.Thread(target=create_drop_db)
+ t.start()
+
+ try:
+ # Use empty 'schemaName' to get tables in all dbs.
+ get_tables_req = TCLIService.TGetTablesReq(self.session_handle)
+ for i in range(100):
+ get_tables_resp = self.hs2_client.GetTables(get_tables_req)
+ TestHS2.check_response(get_tables_resp)
+ time.sleep(0.2)
+ finally:
+ stop = True
+ t.join()
+
@needs_session_cluster_properties()
def test_get_tables(self, cluster_properties, unique_database):
"""Basic test for the GetTables() HS2 method. Needs to execute serially
because