This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e40fd461107387363578c9bd89ecb02a817b9079 Author: gaoxq <[email protected]> AuthorDate: Wed Nov 16 12:26:53 2022 +0800 IMPALA-11728: Set fallback database for functions Currently the create function statement only works on specified database. When using the function in another database, it will throw an AnalysisException because the function is unknown in the current database. Add query option FALLBACK_DB_FOR_FUNCTIONS. When setting fallback database query option, function names do not need to be fully qualified. The exact precedence of the functions: 1. _impala_builtins 2. fallback db specified in query option FALLBACK_DB_FOR_FUNCTIONS 3. function in current db An example of how to use this feature is listed below. Assuming we connected to the default database. The query executed successfully after setting FALLBACK_DB_FOR_FUNCTIONS. CREATE FUNCTION util_db.fn() returns int LOCATION '/libTestUdfs.so' symbol='NoArgs'; set FALLBACK_DB_FOR_FUNCTIONS=util_db; SELECT fn() from functional.alltypes limit 1; Testing: - Added various FE and E2E tests to verify Impala's behavior after setting FALLBACK_DB_FOR_FUNCTIONS. Change-Id: I37b7e126718fea1c50723cacbaed898b20bb55e5 Reviewed-on: http://gerrit.cloudera.org:8080/19252 Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/query-options.cc | 4 ++ be/src/service/query-options.h | 6 +- common/thrift/ImpalaService.thrift | 2 + common/thrift/Query.thrift | 2 + .../java/org/apache/impala/analysis/Analyzer.java | 8 +++ .../org/apache/impala/analysis/FunctionName.java | 21 ++++++ .../authorization/AuthorizationStmtTest.java | 17 +++++ tests/authorization/test_ranger.py | 77 +++++++++++++++++++++- tests/query_test/test_udfs.py | 39 ++++++++++- 9 files changed, 172 insertions(+), 4 deletions(-) diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index a78a920ab..8582df3a4 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1037,6 +1037,10 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_expand_complex_types(IsTrue(value)); break; } + case TImpalaQueryOptions::FALLBACK_DB_FOR_FUNCTIONS: { + query_options->__set_fallback_db_for_functions(value); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 75e4c83ea..05dcdf2ee 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::EXPAND_COMPLEX_TYPES + 1); \ + TImpalaQueryOptions::FALLBACK_DB_FOR_FUNCTIONS + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -275,7 +275,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(test_replan, TEST_REPLAN, TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR) \ QUERY_OPT_FN(orc_schema_resolution, ORC_SCHEMA_RESOLUTION, TQueryOptionLevel::REGULAR) \ - QUERY_OPT_FN(expand_complex_types, EXPAND_COMPLEX_TYPES, TQueryOptionLevel::REGULAR); + QUERY_OPT_FN(expand_complex_types, EXPAND_COMPLEX_TYPES, TQueryOptionLevel::REGULAR) \ + QUERY_OPT_FN(fallback_db_for_functions, FALLBACK_DB_FOR_FUNCTIONS, \ + TQueryOptionLevel::ADVANCED); /// Enforce practical limits on some query options to avoid undesired query state. static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 715b2df64..0b3cef56b 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -740,6 +740,8 @@ enum TImpalaQueryOptions { // Expands complex types in star queries EXPAND_COMPLEX_TYPES = 147 + // Specify the database name which stores global udf + FALLBACK_DB_FOR_FUNCTIONS = 148; } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 806f2dcd3..596965ff8 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -597,6 +597,8 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 148: optional bool expand_complex_types = false; + + 149: optional string fallback_db_for_functions; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 0ef130ef0..7d8d3a3ba 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -3188,6 +3188,14 @@ public class Analyzer { } public String getDefaultDb() { return globalState_.queryCtx.session.database; } + + public String getFallbackDbForFunctions() { + return getQueryCtx() + .getClient_request() + .getQuery_options() + .getFallback_db_for_functions(); + } + public User getUser() { return user_; } public String getUserShortName() throws AnalysisException { try { diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionName.java b/fe/src/main/java/org/apache/impala/analysis/FunctionName.java index b3093c0c1..7cb259c88 100644 --- a/fe/src/main/java/org/apache/impala/analysis/FunctionName.java +++ b/fe/src/main/java/org/apache/impala/analysis/FunctionName.java @@ -19,8 +19,11 @@ package org.apache.impala.analysis; import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.BuiltinsDb; import org.apache.impala.catalog.Db; +import org.apache.impala.catalog.FeDb; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TFunctionName; @@ -99,8 +102,12 @@ public class FunctionName { * - When preferBuiltinsDb is true: * - If the function name specified has the same name as a built-in function, * set the database name to _impala_builtins. + * - Else if the query option of 'FALLBACK_DB_FOR_FUNCTIONS' being set and the + * function exists in fallback database, set the database name to fallback + * database. * - Else, set the database name to the current session DB name. * - When preferBuiltinsDb is false: set the database name to current session DB name. + * Only for CREATE/DROP FUNCTION statements, preferBuiltinsDb is false. */ public void analyze(Analyzer analyzer, boolean preferBuiltinsDb) throws AnalysisException { @@ -123,6 +130,8 @@ public class FunctionName { if (!isFullyQualified()) { if (preferBuiltinsDb && builtinDb.containsFunction(fn_)) { db_ = BuiltinsDb.NAME; + } else if (preferBuiltinsDb && fallbackDbContainsFn(analyzer)) { + db_ = analyzer.getFallbackDbForFunctions(); } else { db_ = analyzer.getDefaultDb(); } @@ -133,6 +142,18 @@ public class FunctionName { isAnalyzed_ = true; } + private boolean fallbackDbContainsFn(Analyzer analyzer) throws AnalysisException { + String dbName = analyzer.getFallbackDbForFunctions(); + if (StringUtils.isEmpty(dbName)) { + return false; + } + // Execute a UDF of the fallback database in a SELECT statement, the requesting user + // has be to granted any one of the INSERT, REFRESH, SELECT privileges on the + // fallback database. + FeDb feDb = analyzer.getDb(dbName, Privilege.VIEW_METADATA, false); + return feDb != null && feDb.containsFunction(fn_); + } + private void analyzeFnNamePath() throws AnalysisException { if (fnNamePath_ == null) return; if (fnNamePath_.size() > 2 || fnNamePath_.isEmpty()) { diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java index af25f0b50..3e898b628 100644 --- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java +++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java @@ -2952,6 +2952,23 @@ public class AuthorizationStmtTest extends AuthorizationTestBase { } finally { removeFunction(fn); } + + // IMPALA-11728: Make sure use of functions in the fallback database requires SELECT + // (or higher) privilege on the database. + fn = addFunction("functional", "f"); + try { + TQueryOptions options = new TQueryOptions(); + options.setFallback_db_for_functions("functional"); + authorize(createAnalysisCtx(options, authzFactory_, user_.getName()), "select f()") + .ok(onDatabase("functional", TPrivilegeLevel.ALL)) + .ok(onDatabase("functional", TPrivilegeLevel.OWNER)) + .ok(onDatabase("functional", viewMetadataPrivileges())) + .error(accessError("functional")) + .error(accessError("functional"), + onDatabase("functional", allExcept(viewMetadataPrivileges()))); + } finally { + removeFunction(fn); + } } @Test diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index 44f3b1b09..249923f0b 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -32,7 +32,7 @@ from tests.common.test_dimensions import (create_client_protocol_dimension, create_exec_option_dimension, create_orc_dimension) from tests.util.hdfs_util import NAMENODE from tests.util.calculation_util import get_random_id -from tests.util.filesystem_utils import WAREHOUSE_PREFIX +from tests.util.filesystem_utils import WAREHOUSE_PREFIX, WAREHOUSE ADMIN = "admin" RANGER_AUTH = ("admin", "admin") @@ -1106,6 +1106,81 @@ class TestRanger(CustomClusterTestSuite): finally: self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_select_function_with_fallback_db(self, unique_name): + """Verifies that Impala should not allow using functions in the fallback database + unless the user has some privileges on the given database.""" + test_user = "non_owner" + admin_client = self.create_impala_client() + non_owner_client = self.create_impala_client() + refresh_stmt = "refresh authorization" + unique_database = unique_name + "_db" + + try: + admin_client.execute("drop database if exists {0} cascade".format(unique_database), + user=ADMIN) + admin_client.execute("create database %s" % unique_database, user=ADMIN) + admin_client.execute("create function {0}.identity(bigint) " + "RETURNS bigint " + "LOCATION " + "'{1}/libTestUdfs.so' " + "SYMBOL='Identity'" + .format(unique_database, WAREHOUSE), user=ADMIN) + # A user not granted any privilege is not allowed to execute the UDF. + result = self._run_query_as_user("select identity(1)", test_user, False) + err = "User '{0}' does not have privileges to access: default".format( + test_user) + assert err in str(result) + + admin_client.execute( + "grant select on database default to user {0}".format(test_user), + user=ADMIN) + self._refresh_authorization(admin_client, refresh_stmt) + + result = self._run_query_as_user("select identity(1)", test_user, False) + err = "default.identity() unknown for database default." + assert err in str(result) + + # A user is not allowed to access fallback database if the user has no + # privileges on it, whether the function exists or not. + result = self.execute_query_expect_failure( + non_owner_client, "select identity(1)", query_options={ + 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, user=test_user) + err = "User '{0}' does not have privileges to access: {1}".format( + test_user, unique_database) + assert err in str(result) + + result = self.execute_query_expect_failure( + non_owner_client, "select fn()", query_options={ + 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, user=test_user) + err = "User '{0}' does not have privileges to access: {1}".format( + test_user, unique_database) + assert err in str(result) + + admin_client.execute( + "grant select on database {0} to user {1}".format( + unique_database, test_user), user=ADMIN) + self._refresh_authorization(admin_client, refresh_stmt) + + # A user is allowed to use functions in the fallback database if the user is + # explicitly granted the SELECT privilege. + self.execute_query_expect_success( + non_owner_client, + "select identity(1)", + query_options={'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, + user=test_user) + finally: + # Revoke the granted privileges. + admin_client.execute("revoke select on database default from user {0}" + .format(test_user), user=ADMIN) + admin_client.execute("revoke select on database {0} from user {1}" + .format(unique_database, test_user), user=ADMIN) + # Drop the database. + self._run_query_as_user("drop database {0} cascade".format(unique_database), + ADMIN, True) + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 7f899a4f8..035d01fe4 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -31,7 +31,7 @@ from tests.common.test_dimensions import ( create_exec_option_dimension_from_dict, create_uncompressed_text_dimension) from tests.util.calculation_util import get_random_id -from tests.util.filesystem_utils import get_fs_path, IS_S3 +from tests.util.filesystem_utils import get_fs_path, WAREHOUSE from tests.verifiers.metric_verifier import MetricVerifier class TestUdfBase(ImpalaTestSuite): @@ -643,3 +643,40 @@ class TestUdfTargeted(TestUdfBase): assert re.search( "User Defined Functions \(UDFs\): {0}\.hive_substring\s*[\r\n]".format( unique_database), profile) + + def test_set_fallback_db_for_functions(self, vector, unique_database): + """IMPALA-11728: Set fallback database for functions.""" + create_function_stmt = "create function `{0}`.fn() returns int "\ + "location '{1}/libTestUdfs.so' symbol='NoArgs'".format(unique_database, + WAREHOUSE) + self.client.execute(create_function_stmt) + + # case 1: When the function name is fully qualified then this query option + # has no effect. + assert '6' == self.execute_scalar("select {0}.fn() from functional.alltypes " + "limit 1".format(unique_database)) + + # case 2: Throw an exception without specifying the database. + query_stmt = "select fn() from functional.alltypes limit 1" + result = self.execute_query_expect_failure(self.client, query_stmt) + assert "default.fn() unknown for database default" in str(result) + + # case 3: Use fn() in fallback db after setting FALLBACK_DB_FOR_FUNCTIONS + assert '6' == self.execute_scalar(query_stmt, query_options={ + 'fallback_db_for_functions': unique_database}) + + # case 4: Test a function name that also exists as builtin function. + # Use function in _impala_builtins. + create_function_stmt = "create function `{0}`.abs(int) returns int "\ + "location '{1}/libTestUdfs.so' symbol='Identity'".format(unique_database, + WAREHOUSE) + self.client.execute(create_function_stmt) + + assert '1' == self.execute_scalar("select abs(-1)", query_options={ + 'fallback_db_for_functions': unique_database}) + + # case 5: It should return empty result for show function, even when + # FALLBACK_DB_FOR_FUNCTIONS is set. + result = self.execute_scalar("show functions", query_options={ + 'fallback_db_for_functions': unique_database}) + assert result is None
