This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 648209b17258cf610f4e73a3ed63de665216074f Author: Riza Suminto <[email protected]> AuthorDate: Mon Apr 14 17:05:31 2025 -0700 IMPALA-13967: Move away from setting user parameter in execute ImpalaConnection.execute and ImpalaConnection.execute_async have 'user' parameter to set specific user to run the query. This is mainly legacy of BeeswaxConnection, which allows using 1 client to run queries under different usernames. BeeswaxConnection and ImpylaHS2Connection actually allow specifying one user per client. Doing so will simplify user-specific tests such as test_ranger.py that often instantiates separate clients for admin user and regular user. There is no need to specify 'user' parameter anymore when calling execute() or execute_async(). Thus, reducing potential bugs from forgetting to set one or setting it with incorrect value. This patch applies one-user-per-client practice as much as possible for test_ranger.py, test_authorization.py, and test_admission_controller.py. Unused code and pytest fixtures are removed. Few flake8 issues are addressed too. Their default_test_protocol() is overridden to return 'hs2'. ImpylaHS2Connection.execute() and ImpylaHS2Connection.execute_async() are slightly modified to assume ImpylaHS2Connection.__user if 'user' parameter in None. BeeswaxConnection remains unchanged. Extend ImpylaHS2ResultSet.__convert_result_value() to lower case boolean return value to match beeswax result. Testing: Run and pass all modified tests in exhaustive exploration. Change-Id: I20990d773f3471c129040cefcdff1c6d89ce87eb Reviewed-on: http://gerrit.cloudera.org:8080/22782 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../queries/QueryTest/ranger_alltypes_mask.test | 2 + .../ranger_alltypes_mask_date_show_year.test | 2 + .../QueryTest/ranger_alltypes_mask_hash.test | 2 + .../QueryTest/ranger_alltypes_mask_none.test | 2 + .../QueryTest/ranger_alltypes_mask_null.test | 2 + .../ranger_alltypes_mask_show_first_4.test | 2 + .../ranger_alltypes_mask_show_last_4.test | 2 + tests/authorization/test_authorization.py | 96 +--- tests/authorization/test_ranger.py | 627 ++++++++++----------- tests/common/impala_connection.py | 26 +- tests/common/impala_service.py | 14 +- tests/common/impala_test_suite.py | 8 +- tests/custom_cluster/test_admission_controller.py | 8 +- 13 files changed, 361 insertions(+), 432 deletions(-) diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask.test index 2e59d6cf2..f9afc5a6f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'x ','x ','x' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_date_show_year.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_date_show_year.test index 2926859b1..976f6acfc 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_date_show_year.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_date_show_year.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'x ','x ','x' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_hash.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_hash.test index de5fd0e95..8660250c3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_hash.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_hash.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny '95e4a','c8fcce134c7d85f738457e0b1aa3b82090cb981de4ed54840a25e3e49a623900 ','2e7d2c03a9507ae265ecf5b5356885a5' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_none.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_none.test index ef8c72a7d..9ab52f74c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_none.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_none.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'a ','b ','c' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_null.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_null.test index 29cb94d3b..4558583c4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_null.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_null.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'NULL','NULL','NULL' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_first_4.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_first_4.test index 89818b2a6..a7787ff14 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_first_4.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_first_4.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'a ','b ','c' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_last_4.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_last_4.test index 697299648..c5815de9b 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_last_4.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_alltypes_mask_show_last_4.test @@ -55,4 +55,6 @@ select * from functional.chars_tiny 'x ','x ','c' ---- TYPES CHAR,CHAR,STRING +---- HS2_TYPES +CHAR,CHAR,VARCHAR ==== diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py index 594e5ab6e..4ba9c825d 100644 --- a/tests/authorization/test_authorization.py +++ b/tests/authorization/test_authorization.py @@ -24,15 +24,11 @@ import threading import time import pytest -from thrift.protocol import TBinaryProtocol -from thrift.transport.TSocket import TSocket -from thrift.transport.TTransport import TBufferedTransport -from impala_thrift_gen.ImpalaService import ImpalaHiveServer2Service -from impala_thrift_gen.TCLIService import TCLIService from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.file_utils import assert_file_in_dir_contains from tests.common.test_result_verifier import error_msg_equal +from tests.common.test_vector import HS2 PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select'] ADMIN = "admin" @@ -40,55 +36,9 @@ ADMIN = "admin" class TestAuthorization(CustomClusterTestSuite): - def setup_method(self, method): - super(TestAuthorization, self).setup_method(method) - host, port = (self.cluster.impalads[0].service.hostname, - self.cluster.impalads[0].service.hs2_port) - self.socket = TSocket(host, port) - self.transport = TBufferedTransport(self.socket) - self.transport.open() - self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) - self.hs2_client = ImpalaHiveServer2Service.Client(self.protocol) - - def teardown_method(self, method): - if self.socket: - self.socket.close() - super(TestAuthorization, self).teardown_method(method) - - def __execute_hs2_stmt(self, statement, verify=True): - """ - Executes an hs2 statement - - :param statement: the statement to execute - :param verify: If set to true, will thrown an exception on a failed hs2 execution - :return: the result of execution - """ - from tests.hs2.test_hs2 import TestHS2 - execute_statement_req = TCLIService.TExecuteStatementReq() - execute_statement_req.sessionHandle = self.session_handle - execute_statement_req.statement = statement - result = self.hs2_client.ExecuteStatement(execute_statement_req) - if verify: - TestHS2.check_response(result) - return result - - def __open_hs2(self, user, configuration, verify=True): - """ - Open a session with hs2 - - :param user: the user to open the session - :param configuration: the configuration for the session - :param verify: If set to true, will thrown an exception on failed session open - :return: the result of opening the session - """ - from tests.hs2.test_hs2 import TestHS2 - open_session_req = TCLIService.TOpenSessionReq() - open_session_req.username = user - open_session_req.configuration = configuration - resp = self.hs2_client.OpenSession(open_session_req) - if verify: - TestHS2.check_response(resp) - return resp + @classmethod + def default_test_protocol(cls): + return HS2 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -116,28 +66,30 @@ class TestAuthorization(CustomClusterTestSuite): user = getuser() query = "show metadata tables in {}".format(full_tbl_name) query_non_existent = "show metadata tables in {}".format(non_existent_tbl_name) - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + user_client = self.create_impala_client(user=user) try: - admin_client.execute("create database {}".format(unique_db), user=ADMIN) + admin_client.execute("drop database if exists {} cascade".format(unique_db)) + admin_client.execute("create database {}".format(unique_db)) admin_client.execute( - "create table {} (i int) stored as iceberg".format(full_tbl_name), user=ADMIN) + "create table {} (i int) stored as iceberg".format(full_tbl_name)) # Check that when the user has no privileges on the database, the error is the same # if we try an existing or a non-existing table. - exc1_str = str(self.execute_query_expect_failure(self.client, query, user=user)) - exc2_str = str(self.execute_query_expect_failure(self.client, query_non_existent, - user=user)) + exc1_str = str(self.execute_query_expect_failure(user_client, query)) + exc2_str = str(self.execute_query_expect_failure(user_client, query_non_existent)) assert error_msg_equal(exc1_str, exc2_str) assert "AuthorizationException" in exc1_str assert "does not have privileges to access" # Check that there is no error when the user has access to the table. - admin_client.execute("grant {priv} on database {db} to user {user}".format( - priv=priv, db=unique_db, user=user)) - self.execute_query_expect_success(self.client, query, user=user) + admin_client.execute("grant {priv} on database {db} to user {target_user}".format( + priv=priv, db=unique_db, target_user=user)) + self.execute_query_expect_success(user_client, query) finally: - admin_client.execute("revoke {priv} on database {db} from user {user}".format( - priv=priv, db=unique_db, user=user), user=ADMIN) + admin_client.execute( + "revoke {priv} on database {db} from user {target_user}".format( + priv=priv, db=unique_db, target_user=user)) admin_client.execute("drop database if exists {} cascade".format(unique_db)) @staticmethod @@ -155,10 +107,10 @@ class TestAuthorization(CustomClusterTestSuite): def _test_ranger_show_stmts_helper(self, unique_name, visibility_privileges): unique_db = unique_name + "_db" - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) try: - admin_client.execute("drop database if exists %s cascade" % unique_db, user=ADMIN) - admin_client.execute("create database %s" % unique_db, user=ADMIN) + admin_client.execute("drop database if exists %s cascade" % unique_db) + admin_client.execute("create database %s" % unique_db) for priv in PRIVILEGES: admin_client.execute("create database db_%s_%s" % (unique_name, priv)) admin_client.execute("grant {0} on database db_{1}_{2} to user {3}" @@ -239,17 +191,17 @@ class TestAuthorization(CustomClusterTestSuite): # Starts a background thread to create+drop the transient db. # Use admin user to have create+drop privileges. unique_database = unique_name + "_db" - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) stop = False def create_drop_db(): while not stop: - admin_client.execute("create database " + unique_database, user=ADMIN) + admin_client.execute("create database " + unique_database) # Sleep some time so coordinator can get the updates of it. time.sleep(0.1) if stop: break - admin_client.execute("drop database " + unique_database, user=ADMIN) + admin_client.execute("drop database " + unique_database) t = threading.Thread(target=create_drop_db) t.start() @@ -261,4 +213,4 @@ class TestAuthorization(CustomClusterTestSuite): finally: stop = True t.join() - admin_client.execute("drop database if exists " + unique_database, user=ADMIN) + admin_client.execute("drop database if exists " + unique_database) diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index f79e40e18..20a62f8d4 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -18,28 +18,31 @@ # Client tests for SQL statement authorization from __future__ import absolute_import, division, print_function -from builtins import map, range -import os +from getpass import getuser import grp import json -import pytest import logging -import requests +import os from subprocess import check_call import tempfile from time import sleep -from getpass import getuser +from builtins import map, range +import pytest +import requests + from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.file_utils import copy_files_to_hdfs_dir -from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIf -from tests.common.test_dimensions import (create_client_protocol_dimension, - create_orc_dimension) -from tests.common.test_vector import ImpalaTestVector +from tests.common.skip import SkipIf, SkipIfFS, SkipIfHive2 +from tests.common.test_dimensions import ( + create_client_protocol_dimension, + create_orc_dimension, +) +from tests.common.test_vector import HS2, ImpalaTestVector from tests.shell.util import run_impala_shell_cmd -from tests.util.hdfs_util import NAMENODE from tests.util.calculation_util import get_random_id -from tests.util.filesystem_utils import WAREHOUSE_PREFIX, WAREHOUSE +from tests.util.filesystem_utils import WAREHOUSE, WAREHOUSE_PREFIX +from tests.util.hdfs_util import NAMENODE from tests.util.iceberg_util import get_snapshots ADMIN = "admin" @@ -69,6 +72,10 @@ class TestRanger(CustomClusterTestSuite): Tests for Apache Ranger integration with Apache Impala. """ + @classmethod + def default_test_protocol(cls): + return HS2 + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", dir=os.getenv("LOG_DIR")), @@ -169,7 +176,7 @@ class TestRanger(CustomClusterTestSuite): the partition path are not writable according to HDFS permission. """ user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" unique_table = unique_name + "_tbl" partition_column = "year" @@ -184,13 +191,12 @@ class TestRanger(CustomClusterTestSuite): "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) try: admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int)" - .format(unique_database, unique_table, partition_column), user=ADMIN) + .format(unique_database, unique_table, partition_column)) admin_client.execute("alter table {0}.{1} add partition ({2}={3})" - .format(unique_database, unique_table, partition_column, partition_value), - user=ADMIN) + .format(unique_database, unique_table, partition_column, partition_value)) # Change the owner user and group of the HDFS paths corresponding to the table and # the partition so that according to Impala's FsPermissionChecker, the table is not @@ -202,7 +208,7 @@ class TestRanger(CustomClusterTestSuite): # Invalidate the table metadata to force the catalog server to reload the HDFS # table and the related partition(s). admin_client.execute("invalidate metadata {0}.{1}" - .format(unique_database, unique_table), user=ADMIN) + .format(unique_database, unique_table)) # Verify that the INSERT statement fails with AuthorizationException because the # requesting user does not have the INSERT privilege on the table. @@ -210,14 +216,14 @@ class TestRanger(CustomClusterTestSuite): assert authz_err in str(result) admin_client.execute("grant insert on table {0}.{1} to user {2}" - .format(unique_database, unique_table, user), user=ADMIN) + .format(unique_database, unique_table, user)) # Verify that the INSERT statement succeeds without AnalysisException. self._run_query_as_user(insert_statement, user, True) finally: admin_client.execute("revoke insert on table {0}.{1} from user {2}" .format(unique_database, unique_table, user)) admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) + .format(unique_database)) @pytest.mark.execute_serially @SkipIfFS.hdfs_acls @@ -231,7 +237,7 @@ class TestRanger(CustomClusterTestSuite): not writable according to Impala's FsPermissionChecker. """ user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" unique_table = unique_name + "_tbl" partition_column = "year" @@ -250,18 +256,16 @@ class TestRanger(CustomClusterTestSuite): "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) try: admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir) admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int) " "row format delimited fields terminated by ',' " - "stored as textfile".format(unique_database, unique_table, partition_column), - user=ADMIN) + "stored as textfile".format(unique_database, unique_table, partition_column)) # We need to add the partition. Otherwise, the LOAD DATA statement can't create new # partitions. admin_client.execute("alter table {0}.{1} add partition ({2}={3})" - .format(unique_database, unique_table, partition_column, partition_value), - user=ADMIN) + .format(unique_database, unique_table, partition_column, partition_value)) # Change the permissions of the HDFS path of the destination table partition. # Before IMPALA-11871, even we changed the table path to non-writable, loading @@ -273,7 +277,7 @@ class TestRanger(CustomClusterTestSuite): # Invalidate the table metadata to force the catalog server to reload the HDFS # table and the related partition(s). admin_client.execute("invalidate metadata {0}.{1}" - .format(unique_database, unique_table), user=ADMIN) + .format(unique_database, unique_table)) # To execute the LOAD DATA statement, a user has to be granted the ALL privilege # on the source HDFS path and the INSERT privilege on the destination table. @@ -283,23 +287,23 @@ class TestRanger(CustomClusterTestSuite): assert authz_err in str(result) admin_client.execute("grant all on uri '{0}/{1}' to user {2}" - .format(source_hdfs_dir, file_name, user), user=ADMIN) + .format(source_hdfs_dir, file_name, user)) # The following verifies the ALL privilege on the source file alone is not # sufficient to execute the LOAD DATA statement. result = self._run_query_as_user(load_data_statement, user, False) assert authz_err in str(result) admin_client.execute("grant insert on table {0}.{1} to user {2}" - .format(unique_database, unique_table, user), user=ADMIN) + .format(unique_database, unique_table, user)) # Verify the LOAD DATA statement fails without AnalysisException. self._run_query_as_user(load_data_statement, user, True) finally: admin_client.execute("revoke all on uri '{0}/{1}' from user {2}" - .format(source_hdfs_dir, file_name, user), user=ADMIN) + .format(source_hdfs_dir, file_name, user)) admin_client.execute("revoke insert on table {0}.{1} from user {2}" - .format(unique_database, unique_table, user), user=ADMIN) + .format(unique_database, unique_table, user)) admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) + .format(unique_database)) self.filesystem_client.delete_file_dir("{0}/{1}" .format(source_hdfs_dir, file_name)) @@ -314,7 +318,7 @@ class TestRanger(CustomClusterTestSuite): def _test_grant_revoke(self, unique_name, refresh_statements): user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" unique_table = unique_name + "_tbl" group = grp.getgrnam(getuser()).gr_name @@ -327,15 +331,14 @@ class TestRanger(CustomClusterTestSuite): try: # Set-up temp database/table admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0}.{1} (x int)" - .format(unique_database, unique_table), user=ADMIN) + .format(unique_database, unique_table)) self.execute_query_expect_success(admin_client, "grant select on database {0} to {1} {2}" - .format(unique_database, kw, ident), - user=ADMIN) + .format(unique_database, kw, ident)) self._refresh_authorization(admin_client, refresh_stmt) result = self.execute_query("show grant {0} {1} on database {2}" .format(kw, ident, unique_database)) @@ -344,44 +347,40 @@ class TestRanger(CustomClusterTestSuite): [kw, ident, unique_database, "*", "*", "", "", "", "", "select", "false"]]) self.execute_query_expect_success(admin_client, "revoke select on database {0} from {1} " - "{2}".format(unique_database, kw, ident), - user=ADMIN) + "{2}".format(unique_database, kw, ident)) self._refresh_authorization(admin_client, refresh_stmt) result = self.execute_query("show grant {0} {1} on database {2}" .format(kw, ident, unique_database)) TestRanger._check_privileges(result, []) finally: admin_client.execute("revoke select on database {0} from {1} {2}" - .format(unique_database, kw, ident), user=ADMIN) + .format(unique_database, kw, ident)) admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) + .format(unique_database)) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_grant_option(self, unique_name): user1 = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" unique_table = unique_name + "_tbl" try: # Set-up temp database/table - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0}.{1} (x int)" - .format(unique_database, unique_table), user=ADMIN) + .format(unique_database, unique_table)) # Give user 1 the ability to grant select privileges on unique_database self.execute_query_expect_success(admin_client, "grant select on database {0} to user {1} with " - "grant option".format(unique_database, user1), - user=ADMIN) + "grant option".format(unique_database, user1)) self.execute_query_expect_success(admin_client, "grant insert on database {0} to user {1} with " - "grant option".format(unique_database, user1), - user=ADMIN) + "grant option".format(unique_database, user1)) # Verify user 1 has with_grant privilege on unique_database result = self.execute_query("show grant user {0} on database {1}" @@ -395,7 +394,7 @@ class TestRanger(CustomClusterTestSuite): # Revoke select privilege and check grant option is still present self.execute_query_expect_success(admin_client, "revoke select on database {0} from user {1}" - .format(unique_database, user1), user=ADMIN) + .format(unique_database, user1)) result = self.execute_query("show grant user {0} on database {1}" .format(user1, unique_database)) # Revoking the select privilege also deprives the grantee of the permission to @@ -409,7 +408,7 @@ class TestRanger(CustomClusterTestSuite): # Revoke privilege granting from user 1 self.execute_query_expect_success(admin_client, "revoke grant option for insert " "on database {0} from user {1}" - .format(unique_database, user1), user=ADMIN) + .format(unique_database, user1)) # User 1 can no longer grant privileges on unique_database # In ranger it is currently not possible to revoke grant for a single access type @@ -420,9 +419,8 @@ class TestRanger(CustomClusterTestSuite): ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]]) finally: admin_client.execute("revoke insert on database {0} from user {1}" - .format(unique_database, user1), user=ADMIN) - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) + .format(unique_database, user1)) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) def _update_privileges_and_verify(self, admin_client, update_stmt, show_grant_stmt, expected_privileges): @@ -437,18 +435,17 @@ class TestRanger(CustomClusterTestSuite): user = getuser() group = grp.getgrnam(getuser()).gr_name test_data = [(user, "USER"), (group, "GROUP")] - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_db = unique_name + "_db" unique_table = unique_name + "_tbl" udf = "identity" try: # Create test database/table - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_db), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) admin_client.execute("create table {0}.{1} (x int)" - .format(unique_db, unique_table), user=ADMIN) + .format(unique_db, unique_table)) for data in test_data: # Test basic show grant functionality for user/group @@ -470,8 +467,7 @@ class TestRanger(CustomClusterTestSuite): # Test USER inherits privileges for their GROUP self._test_show_grant_user_group(admin_client, user, group, unique_db) finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) def _test_show_grant_without_on(self, kw, ident): self.execute_query_expect_failure(self.client, "show grant {0} {1}".format(kw, ident)) @@ -794,7 +790,7 @@ class TestRanger(CustomClusterTestSuite): unique_table): try: # Grant the select privilege on server - admin_client.execute("grant select on server to {0} {1}".format(kw, id), user=ADMIN) + admin_client.execute("grant select on server to {0} {1}".format(kw, id)) # Verify the privileges are correctly added result = self.client.execute("show grant {0} {1} on server".format(kw, id)) @@ -827,7 +823,7 @@ class TestRanger(CustomClusterTestSuite): # Grant the create privilege on database and verify admin_client.execute("grant create on database {0} to {1} {2}" - .format(unique_database, kw, id), user=ADMIN) + .format(unique_database, kw, id)) result = self.client.execute("show grant {0} {1} on database {2}" .format(kw, id, unique_database)) TestRanger._check_privileges(result, [ @@ -839,7 +835,7 @@ class TestRanger(CustomClusterTestSuite): # Grant the insert privilege on table and verify admin_client.execute("grant insert on table {0}.{1} to {2} {3}" - .format(unique_database, unique_table, kw, id), user=ADMIN) + .format(unique_database, unique_table, kw, id)) result = self.client.execute("show grant {0} {1} on table {2}.{3}" .format(kw, id, unique_database, unique_table)) TestRanger._check_privileges(result, [ @@ -850,7 +846,7 @@ class TestRanger(CustomClusterTestSuite): # Grant the select privilege on column and verify admin_client.execute("grant select(x) on table {0}.{1} to {2} {3}" - .format(unique_database, unique_table, kw, id), user=ADMIN) + .format(unique_database, unique_table, kw, id)) result = self.client.execute("show grant {0} {1} on column {2}.{3}.x" .format(kw, id, unique_database, unique_table)) TestRanger._check_privileges(result, [ @@ -861,7 +857,7 @@ class TestRanger(CustomClusterTestSuite): # The insert privilege on table masks the select privilege just added admin_client.execute("grant select on table {0}.{1} to {2} {3}" - .format(unique_database, unique_table, kw, id), user=ADMIN) + .format(unique_database, unique_table, kw, id)) result = self.client.execute("show grant {0} {1} on column {2}.{3}.x" .format(kw, id, unique_database, unique_table)) TestRanger._check_privileges(result, [ @@ -873,7 +869,7 @@ class TestRanger(CustomClusterTestSuite): # The all privilege on table masks the privileges of insert and select, but not the # select privilege on column. admin_client.execute("grant all on table {0}.{1} to {2} {3}" - .format(unique_database, unique_table, kw, id), user=ADMIN) + .format(unique_database, unique_table, kw, id)) result = self.client.execute("show grant {0} {1} on column {2}.{3}.x" .format(kw, id, unique_database, unique_table)) TestRanger._check_privileges(result, [ @@ -899,7 +895,7 @@ class TestRanger(CustomClusterTestSuite): impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_grant_revoke_ranger_api(self, unique_name): user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_db = unique_name + "_db" resource = { "database": unique_db, @@ -910,9 +906,8 @@ class TestRanger(CustomClusterTestSuite): try: # Create the test database - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_db), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) # Grant privileges via Ranger REST API TestRanger._grant_ranger_privilege(user, resource, access) @@ -952,8 +947,7 @@ class TestRanger(CustomClusterTestSuite): finally: admin_client.execute("revoke all on database {0} from user {1}" .format(unique_db, user)) - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) @pytest.mark.execute_serially @SkipIf.is_test_jdk @@ -961,7 +955,7 @@ class TestRanger(CustomClusterTestSuite): impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) def test_show_grant_hive_privilege(self, unique_name): user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_db = unique_name + "_db" resource = { "database": unique_db, @@ -972,9 +966,8 @@ class TestRanger(CustomClusterTestSuite): try: TestRanger._grant_ranger_privilege(user, resource, access) - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_db), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) admin_client.execute("refresh authorization") result = self.client.execute("show grant user {0} on database {1}" @@ -1001,8 +994,7 @@ class TestRanger(CustomClusterTestSuite): TestRanger._check_privileges(result, []) finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_db), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) TestRanger._revoke_ranger_privilege(user, resource, access) @staticmethod @@ -1261,17 +1253,17 @@ class TestRanger(CustomClusterTestSuite): def _run_query_as_user(self, query, username, expect_success): """Helper to run an input query as a given user.""" - impala_client = self.create_impala_client() + impala_client = self.create_impala_client(user=username) if expect_success: return self.execute_query_expect_success( - impala_client, query, user=username, query_options={'sync_ddl': 1}) - return self.execute_query_expect_failure(impala_client, query, user=username) + impala_client, query, query_options={'sync_ddl': 1}) + return self.execute_query_expect_failure(impala_client, query) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) def test_grant_multiple_columns(self): - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) access_type = "select" db = "functional" tbl = "alltypes" @@ -1294,9 +1286,9 @@ class TestRanger(CustomClusterTestSuite): try: policy_ids = set() if kw == "role": - admin_client.execute("create role {0}".format(principal_name), user=ADMIN) + admin_client.execute("create role {0}".format(principal_name)) admin_client.execute("grant {0}({1}) on table {2}.{3} to {4} {5}" - .format(access_type, cols_str, db, tbl, kw, principal_name), user=ADMIN) + .format(access_type, cols_str, db, tbl, kw, principal_name)) policies = TestRanger._get_ranger_privileges() for col in cols: policy_ids = policy_ids \ @@ -1308,9 +1300,9 @@ class TestRanger(CustomClusterTestSuite): assert len(policy_ids) == 1 finally: admin_client.execute("revoke {0}({1}) on table {2}.{3} from {4} {5}" - .format(access_type, cols_str, db, tbl, kw, principal_name), user=ADMIN) + .format(access_type, cols_str, db, tbl, kw, principal_name)) if kw == "role": - admin_client.execute("drop role {0}".format(principal_name), user=ADMIN) + admin_client.execute("drop role {0}".format(principal_name)) @staticmethod def _get_ranger_policy_ids(policies, principal_name, principal_key, db, tbl, col, @@ -1338,12 +1330,11 @@ class TestRanger(CustomClusterTestSuite): impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_unsupported_sql(self): """Tests unsupported SQL statements when running with Ranger.""" - user = "admin" - impala_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) error_msg = "UnsupportedOperationException: SHOW GRANT is not supported without a " \ "defined resource in Ranger." statement = "show grant role foo" - result = self.execute_query_expect_failure(impala_client, statement, user=user) + result = self.execute_query_expect_failure(admin_client, statement) assert error_msg in str(result) @pytest.mark.execute_serially @@ -1357,15 +1348,13 @@ class TestRanger(CustomClusterTestSuite): invalid_group = "invalid_group" # TODO(IMPALA-8640): Create two different Impala clients because the users to # workaround the bug. - invalid_impala_client = self.create_impala_client() - valid_impala_client = self.create_impala_client() + invalid_impala_client = self.create_impala_client(user=invalid_user) + valid_impala_client = self.create_impala_client(user=valid_user) for statement in ["grant select on table functional.alltypes to user {0}" .format(getuser()), "revoke select on table functional.alltypes from user {0}" .format(getuser())]: - result = self.execute_query_expect_failure(invalid_impala_client, - statement, - user=invalid_user) + result = self.execute_query_expect_failure(invalid_impala_client, statement) if "grant" in statement: assert "Error granting a privilege in Ranger. Ranger error message: " \ "HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result) @@ -1377,9 +1366,7 @@ class TestRanger(CustomClusterTestSuite): .format(invalid_user), "revoke select on table functional.alltypes from user {0}" .format(invalid_user)]: - result = self.execute_query_expect_failure(valid_impala_client, - statement, - user=valid_user) + result = self.execute_query_expect_failure(valid_impala_client, statement) if "grant" in statement: assert "Error granting a privilege in Ranger. Ranger error message: " \ "HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result) @@ -1391,9 +1378,7 @@ class TestRanger(CustomClusterTestSuite): .format(invalid_group), "revoke select on table functional.alltypes from group {0}" .format(invalid_group)]: - result = self.execute_query_expect_failure(valid_impala_client, - statement, - user=valid_user) + result = self.execute_query_expect_failure(valid_impala_client, statement) if "grant" in statement: assert "Error granting a privilege in Ranger. Ranger error message: " \ "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) @@ -1434,7 +1419,7 @@ class TestRanger(CustomClusterTestSuite): non_owner_group = NON_OWNER grantee_role = "grantee_role" resource_owner_role = OWNER_USER - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" table_name = "tbl" column_names = ["a", "b"] @@ -1445,14 +1430,13 @@ class TestRanger(CustomClusterTestSuite): privileges = ["alter", "drop", "create", "insert", "select", "refresh"] try: - admin_client.execute("create role {}".format(grantee_role), user=ADMIN) - admin_client.execute("create role {}".format(resource_owner_role), user=ADMIN) - admin_client.execute("grant create on server to user {0}".format(OWNER_USER), - user=ADMIN) + admin_client.execute("create role {}".format(grantee_role)) + admin_client.execute("create role {}".format(resource_owner_role)) + admin_client.execute("grant create on server to user {0}".format(OWNER_USER)) # A user has to be granted the ALL privilege on the URI of the UDF as well to be # able to create a UDF. admin_client.execute("grant all on uri '{0}{1}' to user {2}" - .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER), user=ADMIN) + .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER)) self._run_query_as_user("create database {0}".format(unique_database), OWNER_USER, True) self._run_query_as_user("create table {0}.{1} ({2} int, {3} string)" @@ -1483,14 +1467,12 @@ class TestRanger(CustomClusterTestSuite): self._test_grant_revoke_by_owner_on_udf(privilege, unique_database, udf_name, grantee_type, grantee) finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) - admin_client.execute("drop role {}".format(grantee_role), user=ADMIN) - admin_client.execute("drop role {}".format(resource_owner_role), user=ADMIN) - admin_client.execute("revoke create on server from user {0}".format(OWNER_USER), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("drop role {}".format(grantee_role)) + admin_client.execute("drop role {}".format(resource_owner_role)) + admin_client.execute("revoke create on server from user {0}".format(OWNER_USER)) admin_client.execute("revoke all on uri '{0}{1}' from user {2}" - .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER), user=ADMIN) + .format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER)) def _test_grant_revoke_by_owner_on_database(self, privilege, unique_database, grantee_type, grantee, resource_owner_role): @@ -1501,14 +1483,14 @@ class TestRanger(CustomClusterTestSuite): set_database_owner_group_stmt = "alter database {0} set owner group {1}" set_database_owner_role_stmt = "alter database {0} set owner role {1}" resource_owner_group = OWNER_USER - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) try: self._run_query_as_user(grant_database_stmt .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_database_stmt - .format(grantee_type, grantee, unique_database), user=ADMIN) + .format(grantee_type, grantee, unique_database)) TestRanger._check_privileges(result, [ [grantee_type, grantee, unique_database, "", "", "", "", "", "*", privilege, "false"], @@ -1519,7 +1501,7 @@ class TestRanger(CustomClusterTestSuite): .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_database_stmt - .format(grantee_type, grantee, unique_database), user=ADMIN) + .format(grantee_type, grantee, unique_database)) TestRanger._check_privileges(result, []) # Set the owner of the database to a group that has the same name as 'OWNER_USER' @@ -1531,7 +1513,7 @@ class TestRanger(CustomClusterTestSuite): # is made through Hive. self.run_stmt_in_hive(set_database_owner_group_stmt .format(unique_database, resource_owner_group)) - admin_client.execute("invalidate metadata", user=ADMIN) + admin_client.execute("invalidate metadata") result = self._run_query_as_user(grant_database_stmt .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, @@ -1546,7 +1528,7 @@ class TestRanger(CustomClusterTestSuite): # and verify that the user 'OWNER_USER' is not able to grant or revoke a # privilege on the database. admin_client.execute(set_database_owner_role_stmt - .format(unique_database, resource_owner_role), user=ADMIN) + .format(unique_database, resource_owner_role)) result = self._run_query_as_user(grant_database_stmt .format(privilege, unique_database, grantee_type, grantee), OWNER_USER, @@ -1558,13 +1540,13 @@ class TestRanger(CustomClusterTestSuite): assert ERROR_REVOKE in str(result) # Change the database owner back to the user 'OWNER_USER'. admin_client.execute(set_database_owner_user_stmt - .format(unique_database, OWNER_USER), user=ADMIN) + .format(unique_database, OWNER_USER)) finally: # Revoke the privilege that was granted by 'OWNER_USER' in case any of the # REVOKE statements submitted by 'owner_user' failed to prevent this test # from interfering with other tests. admin_client.execute(revoke_database_stmt - .format(privilege, unique_database, grantee_type, grantee), user=ADMIN) + .format(privilege, unique_database, grantee_type, grantee)) def _test_grant_revoke_by_owner_on_table(self, privilege, unique_database, table_name, grantee_type, grantee, resource_owner_role): @@ -1575,7 +1557,7 @@ class TestRanger(CustomClusterTestSuite): revoke_table_stmt = "revoke {0} on table {1}.{2} from {3} {4}" show_grant_table_stmt = "show grant {0} {1} on table {2}.{3}" resource_owner_group = OWNER_USER - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}" set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}" set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}" @@ -1585,8 +1567,7 @@ class TestRanger(CustomClusterTestSuite): .format(privilege, unique_database, table_name, grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_table_stmt - .format(grantee_type, grantee, unique_database, table_name), - user=ADMIN) + .format(grantee_type, grantee, unique_database, table_name)) TestRanger._check_privileges(result, [ [grantee_type, grantee, unique_database, table_name, "*", "", "", "", "", privilege, "false"]]) @@ -1595,8 +1576,7 @@ class TestRanger(CustomClusterTestSuite): .format(privilege, unique_database, table_name, grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_table_stmt - .format(grantee_type, grantee, unique_database, table_name), - user=ADMIN) + .format(grantee_type, grantee, unique_database, table_name)) TestRanger._check_privileges(result, []) # Set the owner of the table to a group that has the same name as @@ -1608,8 +1588,7 @@ class TestRanger(CustomClusterTestSuite): # is made through Hive. self.run_stmt_in_hive(set_table_owner_group_stmt .format(unique_database, table_name, resource_owner_group)) - admin_client.execute("refresh {0}.{1}".format(unique_database, table_name), - user=ADMIN) + admin_client.execute("refresh {0}.{1}".format(unique_database, table_name)) result = self._run_query_as_user(grant_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), @@ -1624,7 +1603,7 @@ class TestRanger(CustomClusterTestSuite): # 'OWNER_USER' and verify that the user 'OWNER_USER' is not able to grant or # revoke a privilege on the table. admin_client.execute(set_table_owner_role_stmt - .format(unique_database, table_name, resource_owner_role), user=ADMIN) + .format(unique_database, table_name, resource_owner_role)) result = self._run_query_as_user(grant_table_stmt .format(privilege, unique_database, table_name, grantee_type, grantee), @@ -1636,14 +1615,13 @@ class TestRanger(CustomClusterTestSuite): assert ERROR_REVOKE in str(result) # Change the table owner back to the user 'OWNER_USER'. admin_client.execute(set_table_owner_user_stmt - .format(unique_database, table_name, OWNER_USER), user=ADMIN) + .format(unique_database, table_name, OWNER_USER)) finally: # Revoke the privilege that was granted by 'OWNER_USER' in case any of the # REVOKE statements submitted by 'OWNER_USER' failed to prevent this test # from interfering with other tests. admin_client.execute(revoke_table_stmt - .format(privilege, unique_database, table_name, grantee_type, grantee), - user=ADMIN) + .format(privilege, unique_database, table_name, grantee_type, grantee)) def _test_grant_revoke_by_owner_on_column(self, privilege, column_names, unique_database, table_name, grantee_type, grantee, resource_owner_role): @@ -1654,7 +1632,7 @@ class TestRanger(CustomClusterTestSuite): revoke_column_stmt = "revoke {0} ({1}) on table {2}.{3} from {4} {5}" show_grant_column_stmt = "show grant {0} {1} on column {2}.{3}.{4}" resource_owner_group = OWNER_USER - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}" set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}" set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}" @@ -1665,7 +1643,7 @@ class TestRanger(CustomClusterTestSuite): grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_column_stmt .format(grantee_type, grantee, unique_database, table_name, - column_names[0]), user=ADMIN) + column_names[0])) TestRanger._check_privileges(result, [ [grantee_type, grantee, unique_database, table_name, column_names[0], "", "", "", "", privilege, "false"]]) @@ -1675,7 +1653,7 @@ class TestRanger(CustomClusterTestSuite): grantee_type, grantee), OWNER_USER, True) result = admin_client.execute(show_grant_column_stmt .format(grantee_type, grantee, unique_database, table_name, - column_names[0]), user=ADMIN) + column_names[0])) TestRanger._check_privileges(result, []) # Set the owner of the table to a group that has the same name as 'OWNER_USER' @@ -1683,8 +1661,7 @@ class TestRanger(CustomClusterTestSuite): # privilege on a column in the table. self.run_stmt_in_hive(set_table_owner_group_stmt .format(unique_database, table_name, resource_owner_group)) - admin_client.execute("refresh {0}.{1}".format(unique_database, table_name), - user=ADMIN) + admin_client.execute("refresh {0}.{1}".format(unique_database, table_name)) result = self._run_query_as_user(grant_column_stmt .format(privilege, column_names[0], unique_database, table_name, @@ -1699,7 +1676,7 @@ class TestRanger(CustomClusterTestSuite): # verify that the user 'OWNER_USER' is not able to grant or revoke a privilege on # a column in the table. admin_client.execute(set_table_owner_role_stmt - .format(unique_database, table_name, resource_owner_role), user=ADMIN) + .format(unique_database, table_name, resource_owner_role)) result = self._run_query_as_user(grant_column_stmt .format(privilege, column_names[0], unique_database, table_name, @@ -1711,14 +1688,14 @@ class TestRanger(CustomClusterTestSuite): assert ERROR_REVOKE in str(result) # Change the table owner back to the user 'owner_user'. admin_client.execute(set_table_owner_user_stmt - .format(unique_database, table_name, OWNER_USER), user=ADMIN) + .format(unique_database, table_name, OWNER_USER)) finally: # Revoke the privilege that was granted by 'OWNER_USER' in case any of the # REVOKE statements submitted by 'OWNER_USER' failed to prevent this test # from interfering with other tests. admin_client.execute(revoke_column_stmt .format(privilege, column_names[0], unique_database, table_name, - grantee_type, grantee), user=ADMIN) + grantee_type, grantee)) def _test_grant_revoke_by_owner_on_udf(self, privilege, unique_database, udf_name, grantee_type, grantee): @@ -1738,19 +1715,17 @@ class TestRanger(CustomClusterTestSuite): impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_show_functions(self, unique_name): user1 = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" privileges = ["ALTER", "DROP", "CREATE", "INSERT", "SELECT", "REFRESH"] fs_prefix = os.getenv("FILESYSTEM_PREFIX") or str() try: # Set-up temp database + function - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) self.execute_query_expect_success(admin_client, "create function {0}.foo() RETURNS" " int LOCATION '{1}/test-warehouse/libTestUdfs.so'" - "SYMBOL='Fn'".format(unique_database, fs_prefix), - user=ADMIN) + "SYMBOL='Fn'".format(unique_database, fs_prefix)) # Check "show functions" with no privilege granted. result = self._run_query_as_user("show functions in {0}".format(unique_database), user1, False) @@ -1762,16 +1737,15 @@ class TestRanger(CustomClusterTestSuite): # Grant privilege self.execute_query_expect_success(admin_client, "grant {0} on database {1} to user {2}" - .format(privilege, unique_database, user1), - user=ADMIN) + .format(privilege, unique_database, user1)) # Check with current privilege result = self._run_query_as_user("show functions in {0}" .format(unique_database), user1, True) - assert "foo()", str(result) + assert "foo()" in result.get_data() finally: # Revoke privilege admin_client.execute("revoke {0} on database {1} from user {2}" - .format(privilege, unique_database, user1), user=ADMIN) + .format(privilege, unique_database, user1)) finally: # Drop database self._run_query_as_user("drop database {0} cascade".format(unique_database), @@ -1785,20 +1759,19 @@ class TestRanger(CustomClusterTestSuite): SELECT privilege on the UDF, and b) any of the SELECT, INSERT, REFRESH privileges on all the tables, columns in the database.""" test_user = "non_owner" - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) unique_database = unique_name + "_db" fs_prefix = os.getenv("FILESYSTEM_PREFIX") or str() try: # Create a temporary database and a user-defined function. - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create function {0}.identity(bigint) " "RETURNS bigint " "LOCATION " "'{1}/test-warehouse/impala-hive-udfs.jar' " "SYMBOL='org.apache.impala.TestUdf'" - .format(unique_database, fs_prefix), user=ADMIN) + .format(unique_database, fs_prefix)) # Create a temporary table and grant 'test_user' the INSERT privilege on the table, # which is necessary for 'test_user' to insert values into a table. admin_client.execute("create table {0}.tbl (id bigint)".format(unique_database)) @@ -1876,8 +1849,7 @@ class TestRanger(CustomClusterTestSuite): []) result = admin_client.execute("show grant user {0} " "on user_defined_fn {1}.identity" - .format(test_user, unique_database), - user=ADMIN) + .format(test_user, unique_database)) TestRanger._check_privileges(result, [ ["USER", test_user, unique_database, "", "", "", "", "", "identity", "select", "false"]]) @@ -1889,10 +1861,10 @@ class TestRanger(CustomClusterTestSuite): # Revoke the granted privileges. admin_client.execute("revoke {0} on database {1} from user {2}" .format(privilege_on_database, unique_database, - test_user), user=ADMIN) + test_user)) admin_client.execute("revoke select on user_defined_fn {0}.identity " "from user {1}" - .format(unique_database, test_user), user=ADMIN) + .format(unique_database, test_user)) finally: # Revoke the granted privilege on the temporary table. self._run_query_as_user("revoke insert on table {0}.tbl from user {1}" @@ -1984,7 +1956,7 @@ class TestRanger(CustomClusterTestSuite): test_db = unique_name + "_db" # A client that only used by 'test_user'. Just need to set the username at the # first statement. It will keep using the same username. - user_client = self.create_impala_client() + user_client = self.create_impala_client(user=test_user) user_client.set_configuration({"sync_hms_events_wait_time_s": 10, "sync_hms_events_strict_mode": True}) @@ -2008,7 +1980,7 @@ class TestRanger(CustomClusterTestSuite): # Try to create a table under test_db as current user. It should fail since # test_user is not the db owner. create_tbl_stmt = "create table {0}.foo(a int)".format(test_db) - self.execute_query_expect_failure(user_client, create_tbl_stmt, user=test_user) + self.execute_query_expect_failure(user_client, create_tbl_stmt) change_db_owner_to_user() # Creating the table again should succeed once the ALTER_DATABASE event is synced. self.execute_query_expect_success(user_client, create_tbl_stmt) @@ -2063,21 +2035,20 @@ class TestRanger(CustomClusterTestSuite): """Verifies that Impala should not allow using functions in the fallback database unless the user has been granted sufficient privileges on the given database.""" test_user = "non_owner" - admin_client = self.create_impala_client() - non_owner_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + non_owner_client = self.create_impala_client(user=test_user) refresh_stmt = "refresh authorization" unique_database = unique_name + "_db" try: - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) - admin_client.execute("create database %s" % unique_database, user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("create database %s" % unique_database) admin_client.execute("create function {0}.identity(bigint) " "RETURNS bigint " "LOCATION " "'{1}/libTestUdfs.so' " "SYMBOL='Identity'" - .format(unique_database, WAREHOUSE), user=ADMIN) + .format(unique_database, WAREHOUSE)) # A user not granted any privilege is not allowed to execute the UDF. result = self._run_query_as_user("select identity(1)", test_user, False) err = "User '{0}' does not have privileges to SELECT functions in: " \ @@ -2085,8 +2056,7 @@ class TestRanger(CustomClusterTestSuite): assert err in str(result) admin_client.execute( - "grant select on database default to user {0}".format(test_user), - user=ADMIN) + "grant select on database default to user {0}".format(test_user)) self._refresh_authorization(admin_client, refresh_stmt) result = self._run_query_as_user("select identity(1)", test_user, False) @@ -2097,14 +2067,14 @@ class TestRanger(CustomClusterTestSuite): # privileges on it, whether the function exists or not. result = self.execute_query_expect_failure( non_owner_client, "select identity(1)", query_options={ - 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, user=test_user) + 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}) err = "User '{0}' does not have privileges to SELECT functions in: " \ "{1}.identity".format(test_user, unique_database) assert err in str(result) result = self.execute_query_expect_failure( non_owner_client, "select fn()", query_options={ - 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, user=test_user) + 'FALLBACK_DB_FOR_FUNCTIONS': unique_database}) err = "User '{0}' does not have privileges to SELECT functions in: " \ "{1}.fn".format(test_user, unique_database) assert err in str(result) @@ -2114,10 +2084,10 @@ class TestRanger(CustomClusterTestSuite): # the UDF in the fallback database in order to execute the UDF. admin_client.execute( "grant insert on database {0} to user {1}".format( - unique_database, test_user), user=ADMIN) + unique_database, test_user)) admin_client.execute( "grant select on user_defined_fn {0}.identity to user {1}".format( - unique_database, test_user), user=ADMIN) + unique_database, test_user)) self._refresh_authorization(admin_client, refresh_stmt) # A user is allowed to use functions in the fallback database if the user is @@ -2125,16 +2095,15 @@ class TestRanger(CustomClusterTestSuite): self.execute_query_expect_success( non_owner_client, "select identity(1)", - query_options={'FALLBACK_DB_FOR_FUNCTIONS': unique_database}, - user=test_user) + query_options={'FALLBACK_DB_FOR_FUNCTIONS': unique_database}) finally: # Revoke the granted privileges. admin_client.execute("revoke select on database default from user {0}" - .format(test_user), user=ADMIN) + .format(test_user)) admin_client.execute("revoke insert on database {0} from user {1}" - .format(unique_database, test_user), user=ADMIN) + .format(unique_database, test_user)) admin_client.execute("revoke select on user_defined_fn {0}.identity from user {1}" - .format(unique_database, test_user), user=ADMIN) + .format(unique_database, test_user),) # Drop the database. self._run_query_as_user("drop database {0} cascade".format(unique_database), ADMIN, True) @@ -2147,10 +2116,9 @@ class TestRanger(CustomClusterTestSuite): unique_database = unique_name + '_db' # Create another client for admin user since current user doesn't have privileges to # create/drop databases or refresh authorization. - admin_client = self.create_impala_client() - admin_client.execute("drop database if exists %s cascade" % unique_database, - user=ADMIN) - admin_client.execute("create database %s" % unique_database, user=ADMIN) + admin_client = self.create_impala_client(user=ADMIN) + admin_client.execute("drop database if exists %s cascade" % unique_database) + admin_client.execute("create database %s" % unique_database) # Grant CREATE on database to current user for tests on CTAS, CreateView etc. admin_client.execute("grant create on database %s to user %s" % (unique_database, user)) @@ -2208,8 +2176,7 @@ class TestRanger(CustomClusterTestSuite): unique_name + str(policy_cnt), user, "functional_parquet", "alltypessmall", "string_col", "CUSTOM", "concat(string_col, invalid_col)") policy_cnt += 1 - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") self.run_test_case("QueryTest/ranger_column_masking", vector, test_file_vars={'$UNIQUE_DB': unique_database}) # Add a policy on a primitive column of a table which contains nested columns. @@ -2227,8 +2194,7 @@ class TestRanger(CustomClusterTestSuite): unique_name + str(policy_cnt), user, db, "complextypestbl", "int_array", "MASK_NULL") policy_cnt += 1 - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") self.run_test_case("QueryTest/ranger_column_masking_complex_types", vector, use_db=db) finally: @@ -2241,7 +2207,7 @@ class TestRanger(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_block_metadata_update(self, vector, unique_name): + def test_block_metadata_update(self, unique_name): """Test that the metadata update operation on a table by a requesting user is denied if there exists a column masking policy defined on any column in the table for the requesting user even when the table metadata (e.g., list of columns) have been @@ -2249,17 +2215,17 @@ class TestRanger(CustomClusterTestSuite): metadata again. This test would have failed if we did not load the table metadata for ResetMetadataStmt.""" user = getuser() - admin_client = self.create_impala_client() - non_owner_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + non_owner_client = self.create_impala_client(user=user) try: TestRanger._add_column_masking_policy( unique_name, user, "functional", "alltypestiny", "id", "CUSTOM", "id * 100") self.execute_query_expect_success(admin_client, - "invalidate metadata functional.alltypestiny", user=ADMIN) + "invalidate metadata functional.alltypestiny") admin_client.execute("grant all on server to user {0}".format(user)) result = self.execute_query_expect_failure( - non_owner_client, "invalidate metadata functional.alltypestiny", user=user) + non_owner_client, "invalidate metadata functional.alltypestiny") assert "User '{0}' does not have privileges to execute " \ "'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \ in str(result) @@ -2289,8 +2255,8 @@ class TestRanger(CustomClusterTestSuite): """Verify that catalog cache operations are allowed for masked users when allow_catalog_cache_op_from_masked_users=true.""" user = getuser() - admin_client = self.create_impala_client() - non_admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + non_admin_client = self.create_impala_client(user=user) try: # Create a column masking policy on 'user' which is also the owner of the table TestRanger._add_column_masking_policy( @@ -2300,7 +2266,7 @@ class TestRanger(CustomClusterTestSuite): # At a cold start, the table is unloaded so its owner is unknown. # INVALIDATE METADATA <table> is denied since 'user' is not detected as the owner. result = self.execute_query_expect_failure( - non_admin_client, "invalidate metadata functional.alltypestiny", user=user) + non_admin_client, "invalidate metadata functional.alltypestiny") assert "User '{0}' does not have privileges to execute " \ "'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \ in str(result) @@ -2310,39 +2276,39 @@ class TestRanger(CustomClusterTestSuite): # Run a query to trigger metadata loading on the table self.execute_query_expect_success( - non_admin_client, "describe functional.alltypestiny", user=user) + non_admin_client, "describe functional.alltypestiny") # Verify catalogd loads metadata of this table self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=1) # INVALIDATE METADATA <table> is allowed since 'user' is detected as the owner. self.execute_query_expect_success( - non_admin_client, "invalidate metadata functional.alltypestiny", user=user) + non_admin_client, "invalidate metadata functional.alltypestiny") # Run a query to trigger metadata loading on the table self.execute_query_expect_success( - non_admin_client, "describe functional.alltypestiny", user=user) + non_admin_client, "describe functional.alltypestiny") # Verify catalogd loads metadata of this table self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=2) # Verify REFRESH <table> is allowed since 'user' is detected as the owner. self.execute_query_expect_success( - non_admin_client, "refresh functional.alltypestiny", user=user) + non_admin_client, "refresh functional.alltypestiny") self.execute_query_expect_success( non_admin_client, - "refresh functional.alltypestiny partition(year=2009, month=1)", user=user) + "refresh functional.alltypestiny partition(year=2009, month=1)") # Clear the catalog cache and grant 'user' enough privileges self.execute_query_expect_success( - admin_client, "invalidate metadata functional.alltypestiny", user=ADMIN) + admin_client, "invalidate metadata functional.alltypestiny") admin_client.execute("grant refresh on table functional.alltypestiny to user {0}" - .format(user), user=ADMIN) + .format(user)) try: # Now 'user' should be able to run REFRESH/INVALIDATE even if the table is # unloaded (not recognize it as the owner). self.execute_query_expect_success( - non_admin_client, "invalidate metadata functional.alltypestiny", user=user) + non_admin_client, "invalidate metadata functional.alltypestiny") self.execute_query_expect_success( - non_admin_client, "refresh functional.alltypestiny", user=user) + non_admin_client, "refresh functional.alltypestiny") finally: admin_client.execute( "revoke refresh on table functional.alltypestiny from user {0}".format(user)) @@ -2359,7 +2325,7 @@ class TestRanger(CustomClusterTestSuite): policy_names = [] # Create another client for admin user since current user doesn't have privileges to # create/drop databases or refresh authorization. - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) try: for mask_type in ["MASK", "MASK_SHOW_LAST_4", "MASK_SHOW_FIRST_4", "MASK_HASH", "MASK_NULL", "MASK_NONE", "MASK_DATE_SHOW_YEAR"]: @@ -2385,8 +2351,7 @@ class TestRanger(CustomClusterTestSuite): policy_name, user, "functional", "chars_tiny", col, mask_type) policy_names.append(policy_name) - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") self.run_test_case("QueryTest/ranger_alltypes_" + mask_type.lower(), vector) while policy_names: TestRanger._remove_policy(policy_names.pop()) @@ -2402,10 +2367,9 @@ class TestRanger(CustomClusterTestSuite): unique_database = unique_name + '_db' # Create another client for admin user since current user doesn't have privileges to # create/drop databases or refresh authorization. - admin_client = self.create_impala_client() - admin_client.execute("drop database if exists %s cascade" % unique_database, - user=ADMIN) - admin_client.execute("create database %s" % unique_database, user=ADMIN) + admin_client = self.create_impala_client(user=ADMIN) + admin_client.execute("drop database if exists %s cascade" % unique_database) + admin_client.execute("create database %s" % unique_database) # Grant CREATE on database to current user for tests on CTAS, CreateView etc. # Note that 'user' is the owner of the test tables. No additional GRANTs are required. admin_client.execute("grant create on database %s to user %s" @@ -2483,16 +2447,16 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute( "grant select on table functional_parquet.alltypestiny to user non_owner_2") admin_client.execute("refresh authorization") - non_owner_client = self.create_impala_client() - non_owner_2_client = self.create_impala_client() + non_owner_client = self.create_impala_client(user="non_owner") + non_owner_2_client = self.create_impala_client(user="non_owner_2") query = "select id from functional_parquet.alltypestiny" assert self.client.execute(query).get_data() == "0" - assert non_owner_client.execute(query, user="non_owner").get_data() == "1" - assert non_owner_2_client.execute(query, user="non_owner_2").get_data() == "2" + assert non_owner_client.execute(query).get_data() == "1" + assert non_owner_2_client.execute(query).get_data() == "2" query = "select max(id) from functional_parquet.alltypestiny" assert self.client.execute(query).get_data() == "0" - assert non_owner_client.execute(query, user="non_owner").get_data() == "1" - assert non_owner_2_client.execute(query, user="non_owner_2").get_data() == "2" + assert non_owner_client.execute(query).get_data() == "1" + assert non_owner_2_client.execute(query).get_data() == "2" ####################################################### # Test row filters that contains complex subqueries @@ -2587,7 +2551,7 @@ class TestRanger(CustomClusterTestSuite): impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_column_masking_and_row_filtering(self, vector, unique_name): user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) policy_cnt = 0 try: # 3 column masking policies and 1 row filtering policy on functional.alltypestiny. @@ -2626,8 +2590,7 @@ class TestRanger(CustomClusterTestSuite): "id >= -8 and date_string_col = 'nn/nn/nn'") policy_cnt += 1 - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") self.run_test_case("QueryTest/ranger_column_masking_and_row_filtering", vector) finally: for i in range(policy_cnt): @@ -2640,28 +2603,24 @@ class TestRanger(CustomClusterTestSuite): """When we do a time travel query on an iceberg table we will use the schema from the time of the snapshot. Make sure this works when column masking is being used.""" user = getuser() - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) short_table_name = "ice_1" unique_database = unique_name + "_db" tbl_name = unique_database + "." + short_table_name try: admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0} (a int, b string, c int) stored as iceberg" - .format(tbl_name), user=ADMIN) - admin_client.execute("insert into {0} values (1, 'one', 1)".format(tbl_name), - user=ADMIN) - admin_client.execute("alter table {0} drop column a".format(tbl_name), user=ADMIN) - admin_client.execute("insert into {0} values ('two', 2)".format(tbl_name), - user=ADMIN) + .format(tbl_name)) + admin_client.execute("insert into {0} values (1, 'one', 1)".format(tbl_name)) + admin_client.execute("alter table {0} drop column a".format(tbl_name)) + admin_client.execute("insert into {0} values ('two', 2)".format(tbl_name)) admin_client.execute("grant select on database {0} to user {1} with " - "grant option".format(unique_database, user), - user=ADMIN) + "grant option".format(unique_database, user)) admin_client.execute("grant insert on database {0} to user {1} with " - "grant option".format(unique_database, user), - user=ADMIN) + "grant option".format(unique_database, user)) snapshots = get_snapshots(admin_client, tbl_name, expected_result_size=2) # Create two versions of a simple query based on the two snapshot ids. @@ -2697,7 +2656,7 @@ class TestRanger(CustomClusterTestSuite): # Mask column C to null. TestRanger._add_column_masking_policy( unique_name, user, unique_database, short_table_name, "C", "MASK_NULL") - admin_client.execute("refresh authorization", user=ADMIN) + admin_client.execute("refresh authorization") # Run the time travel queries again, time travel should work, but column # 'C' is masked. @@ -2713,7 +2672,7 @@ class TestRanger(CustomClusterTestSuite): finally: # Remove the masking policy. TestRanger._remove_policy(unique_name) - admin_client.execute("refresh authorization", user=ADMIN) + admin_client.execute("refresh authorization") # Run the queries again without masking as we are here. results = self.client.execute(first_time_travel_query) @@ -2730,7 +2689,7 @@ class TestRanger(CustomClusterTestSuite): # Mask column B to null. TestRanger._add_column_masking_policy( unique_name, user, unique_database, short_table_name, "B", "MASK_NULL") - admin_client.execute("refresh authorization", user=ADMIN) + admin_client.execute("refresh authorization") # Run the time travel queries again, time travel should work, but column # 'B' is masked. @@ -2745,8 +2704,7 @@ class TestRanger(CustomClusterTestSuite): finally: TestRanger._remove_policy(unique_name) finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -2755,27 +2713,25 @@ class TestRanger(CustomClusterTestSuite): """Test that autorization is taken into account when performing a table migration to Iceberg.""" user = getuser() - admin_client = self.create_impala_client() - non_admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + non_admin_client = self.create_impala_client(user=user) unique_database = unique_name + "_db" tbl_name = unique_database + "." + "hive_tbl_to_convert" try: admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) # create table using admin user. admin_client.execute("create table {0} (a int, b string) stored as parquet".format( - tbl_name), user=ADMIN) - admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name), - user=ADMIN) + tbl_name)) + admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name)) try: # non-admin user can't convert table by default. result = self.execute_query_expect_failure( - non_admin_client, "alter table {0} convert to iceberg".format(tbl_name), - user=user) + non_admin_client, "alter table {0} convert to iceberg".format(tbl_name)) assert "User '{0}' does not have privileges to access: {1}".format( user, unique_database) in str(result) @@ -2783,102 +2739,93 @@ class TestRanger(CustomClusterTestSuite): # should fail as we expect DB level ALL privileges for table migration. Once # https://issues.apache.org/jira/browse/IMPALA-12190 is fixed, this should also # pass with table-level ALL privileges. - admin_client.execute("grant all on table {0} to user {1}".format(tbl_name, user), - user=ADMIN) + admin_client.execute("grant all on table {0} to user {1}".format(tbl_name, user)) result = self.execute_query_expect_failure( - non_admin_client, "alter table {0} convert to iceberg".format(tbl_name), - user=user) + non_admin_client, "alter table {0} convert to iceberg".format(tbl_name)) assert "User '{0}' does not have privileges to access: {1}".format( user, unique_database) in str(result) # After granting ALL privileges on the DB, the table migration should succeed. admin_client.execute("grant all on database {0} to user {1}" - .format(unique_database, user), user=ADMIN) + .format(unique_database, user)) self.execute_query_expect_success( - non_admin_client, "alter table {0} convert to iceberg".format(tbl_name), - user=user) - - result = non_admin_client.execute("describe formatted {0}".format(tbl_name), - user=user) - assert "org.apache.iceberg.mr.hive.HiveIcebergSerDe" in str(result) - assert "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" in str(result) - assert "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" in str(result) + non_admin_client, "alter table {0} convert to iceberg".format(tbl_name)) + + result = non_admin_client.execute("describe formatted {0}".format(tbl_name)) + all_data = result.get_data() + assert "org.apache.iceberg.mr.hive.HiveIcebergSerDe" in all_data + assert "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" in all_data + assert "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" in all_data finally: # Revoke privileges admin_client.execute("revoke all on table {0} from user {1}" - .format(tbl_name, user), user=ADMIN) + .format(tbl_name, user)) admin_client.execute("revoke all on database {0} from user {1}" - .format(unique_database, user), user=ADMIN) + .format(unique_database, user)) tbl_name2 = unique_database + "." + "hive_tbl_to_convert2" # create table using admin user. admin_client.execute("create table {0} (a int, b string) stored as parquet".format( - tbl_name2), user=ADMIN) - admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name2), - user=ADMIN) + tbl_name2)) + admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name2)) try: admin_client.execute("grant all on table {0} to user {1}" - .format(tbl_name2, user), user=ADMIN) + .format(tbl_name2, user)) result = self.execute_query_expect_success( - non_admin_client, "select count(*) from {0}".format(tbl_name2), user=user) + non_admin_client, "select count(*) from {0}".format(tbl_name2)) assert result.get_data() == "1" # Migrates the table by admin and checks if the non-admin usert still has # privileges. self.execute_query_expect_success( - admin_client, "alter table {0} convert to iceberg".format(tbl_name2), - user=ADMIN) + admin_client, "alter table {0} convert to iceberg".format(tbl_name2)) result = self.execute_query_expect_success( - non_admin_client, "select count(*) from {0}".format(tbl_name2), user=user) + non_admin_client, "select count(*) from {0}".format(tbl_name2)) assert result.get_data() == "1" finally: # Revoke privileges admin_client.execute("revoke all on table {0} from user {1}" - .format(tbl_name2, user), user=ADMIN) + .format(tbl_name2, user)) finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_iceberg_metadata_table_privileges(self, unique_name): user = getuser() - admin_client = self.create_impala_client() - non_admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) + non_admin_client = self.create_impala_client(user=user) short_table_name = "ice_1" unique_database = unique_name + "_db" tbl_name = unique_database + "." + short_table_name try: admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0} (a int) stored as iceberg" - .format(tbl_name), user=ADMIN) + .format(tbl_name)) # At this point, non-admin user without select privileges cannot query the metadata # tables result = self.execute_query_expect_failure(non_admin_client, - "select * from {0}.history".format(tbl_name), user=user) + "select * from {0}.history".format(tbl_name)) assert "User '{0}' does not have privileges to execute 'SELECT' on: {1}".format( user, unique_database) in str(result) # Grant 'user' select privilege on the table - admin_client.execute("grant select on table {0} to user {1}".format(tbl_name, user), - user=ADMIN) - result = non_admin_client.execute("select * from {0}.history".format(tbl_name), - user=user) + admin_client.execute("grant select on table {0} to user {1}".format(tbl_name, user)) + result = non_admin_client.execute("select * from {0}.history".format(tbl_name)) assert result.success is True finally: admin_client.execute("revoke select on table {0} from user {1}" - .format(tbl_name, user), user=ADMIN) - admin_client.execute("drop database if exists {0} cascade".format(unique_database), - user=ADMIN) + .format(tbl_name, user)) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially @SkipIf.is_test_jdk @@ -2904,21 +2851,21 @@ class TestRanger(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_profile_protection(self, vector): + def test_profile_protection(self): """Test that a requesting user is able to access the runtime profile or execution summary of a query involving a view only if the user is granted the privileges on all the underlying tables of the view. Recall that the view functional.complex_view we use here is created based on the tables functional.alltypesagg and functional.alltypestiny.""" - admin_client = self.create_impala_client() - non_owner_client = self.create_impala_client() + grantee_user = "non_owner" + admin_client = self.create_impala_client(user=ADMIN) + non_owner_client = self.create_impala_client(user=grantee_user) test_db = "functional" test_view = "complex_view" - grantee_user = "non_owner" try: admin_client.execute( "grant select on table {0}.{1} to user {2}" - .format(test_db, test_view, grantee_user), user=ADMIN) + .format(test_db, test_view, grantee_user)) admin_client.execute("refresh authorization") # Recall that in a successful execution, result.exec_summary and @@ -2927,30 +2874,27 @@ class TestRanger(CustomClusterTestSuite): # on the underlying tables, an exception will be thrown from # ImpalaBeeswaxClient.get_runtime_profile(). result = self.execute_query_expect_failure( - non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view), - user=grantee_user) + non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view)) assert "User {0} is not authorized to access the runtime profile or " \ "execution summary".format(grantee_user) in str(result) admin_client.execute( "grant select on table {0}.alltypesagg to user {1}" - .format(test_db, grantee_user), user=ADMIN) + .format(test_db, grantee_user)) admin_client.execute("refresh authorization") self.execute_query_expect_failure( - non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view), - user=grantee_user) + non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view)) assert "User {0} is not authorized to access the runtime profile or " \ "execution summary".format(grantee_user) in str(result) admin_client.execute( "grant select on table {0}.alltypestiny to user {1}" - .format(test_db, grantee_user), user=ADMIN) + .format(test_db, grantee_user)) admin_client.execute("refresh authorization") self.execute_query_expect_success( - non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view), - user=grantee_user) + non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view)) finally: cleanup_statements = [ "revoke select on table {0}.{1} from user {2}" @@ -2962,7 +2906,7 @@ class TestRanger(CustomClusterTestSuite): ] for statement in cleanup_statements: - admin_client.execute(statement, user=ADMIN) + admin_client.execute(statement) @pytest.mark.execute_serially @SkipIfFS.incorrent_reported_ec @@ -2977,7 +2921,7 @@ class TestRanger(CustomClusterTestSuite): "--use_customized_user_groups_mapper_for_ranger")) def test_grant_revoke_with_role(self, vector): """Test grant/revoke with role.""" - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) try: self.run_test_case('QueryTest/grant_revoke', vector, use_db="default") finally: @@ -3037,7 +2981,7 @@ class TestRanger(CustomClusterTestSuite): for statement in cleanup_statements: try: - admin_client.execute(statement, user=ADMIN) + admin_client.execute(statement) except Exception: # There could be an exception thrown due to the non-existence of the role or # resource involved in a statement that aims to revoke the privilege on a @@ -3064,63 +3008,63 @@ class TestRanger(CustomClusterTestSuite): """Test that except for the necessary privileges on the view, the requesting user has to be granted the necessary privileges on the underlying tables as well in order to access a view with its table property of 'Authorized' set to false.""" - admin_client = self.create_impala_client() - non_owner_client = self.create_impala_client() + grantee_user = "non_owner" + admin_client = self.create_impala_client(user=ADMIN) + non_owner_client = self.create_impala_client(user=grantee_user) unique_database = unique_name + "_db" test_tbl_1 = unique_name + "_tbl_1" test_tbl_2 = unique_name + "_tbl_2" test_view = unique_name + "_view" - grantee_user = "non_owner" try: # Set up temp database, tables, and the view. admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) - admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)" - .format(unique_database, test_tbl_1), user=ADMIN) + .format(unique_database, test_tbl_1)) admin_client.execute("create table {0}.{1} (id int, string_col string)" - .format(unique_database, test_tbl_2), user=ADMIN) + .format(unique_database, test_tbl_2)) admin_client.execute("create view {0}.{1} (abc, xyz) as " "select count(a.bigint_col), b.string_col " "from {0}.{2} a inner join {0}.{3} b on a.id = b.id " "group by b.string_col having count(a.bigint_col) > 1" - .format(unique_database, test_view, test_tbl_1, test_tbl_2), user=ADMIN) + .format(unique_database, test_view, test_tbl_1, test_tbl_2)) # Add this table property to simulate a view created by a non-superuser. self.run_stmt_in_hive("alter view {0}.{1} " "set tblproperties ('Authorized' = 'false')" .format(unique_database, test_view)) admin_client.execute("grant select(abc) on table {0}.{1} to user {2}" - .format(unique_database, test_view, grantee_user), user=ADMIN) + .format(unique_database, test_view, grantee_user)) admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}" - .format(unique_database, test_view, grantee_user), user=ADMIN) + .format(unique_database, test_view, grantee_user)) admin_client.execute("grant select on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_2, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_2, grantee_user)) admin_client.execute("refresh authorization") result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view), user=grantee_user) + "select * from {0}.{1}".format(unique_database, test_view)) assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) admin_client.execute("grant select(id) on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_1, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_1, grantee_user)) admin_client.execute("refresh authorization") # The query is not authorized since the SELECT privilege on the column 'bigint_col' # in the table 'test_tbl_1' is also required. result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view), user=grantee_user) + "select * from {0}.{1}".format(unique_database, test_view)) assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) admin_client.execute("grant select(bigint_col) on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_1, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_1, grantee_user)) admin_client.execute("refresh authorization") # The query is authorized successfully once sufficient privileges are granted. self.execute_query_expect_success(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view), user=grantee_user) + "select * from {0}.{1}".format(unique_database, test_view)) # Add a deny policy to prevent 'grantee_user' from accessing the column 'id' in # the table 'test_tbl_2', on which 'grantee_user' had been granted the SELECT @@ -3132,22 +3076,22 @@ class TestRanger(CustomClusterTestSuite): # The query is not authorized since the SELECT privilege on the column 'id' in the # table 'test_tbl_2' has been denied in the policy above. result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view), user=grantee_user) + "select * from {0}.{1}".format(unique_database, test_view)) assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ "{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in str(result) finally: admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}" - .format(unique_database, test_view, grantee_user), user=ADMIN) + .format(unique_database, test_view, grantee_user)) admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}" - .format(unique_database, test_view, grantee_user), user=ADMIN) + .format(unique_database, test_view, grantee_user)) admin_client.execute("revoke select(id) on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_1, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_1, grantee_user)) admin_client.execute("revoke select(bigint_col) on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_1, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_1, grantee_user)) admin_client.execute("revoke select on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_2, grantee_user), user=ADMIN) + .format(unique_database, test_tbl_2, grantee_user)) admin_client.execute("drop database if exists {0} cascade" - .format(unique_database), user=ADMIN) + .format(unique_database)) TestRanger._remove_policy(unique_name) @pytest.mark.execute_serially @@ -3160,23 +3104,22 @@ class TestRanger(CustomClusterTestSuite): # test, we do not have to worry about there could be existing roles when the test # is running. reset_ranger=True) - def test_no_exception_in_show_roles_if_no_roles_in_ranger(self, unique_name): - self._test_no_exception_in_show_roles_if_no_roles_in_ranger(unique_name) + def test_no_exception_in_show_roles_if_no_roles_in_ranger(self): + self._test_no_exception_in_show_roles_if_no_roles_in_ranger() - def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self, unique_name): + def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self): """ Ensure that no exception should throw for show roles statement if there are no roles in ranger. """ - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) show_roles_statements = [ "SHOW ROLES", "SHOW CURRENT ROLES", "SHOW ROLE GRANT GROUP admin" ] for statement in show_roles_statements: - result = self.execute_query_expect_success(admin_client, statement, - user=ADMIN) + result = self.execute_query_expect_success(admin_client, statement) assert len(result.data) == 0 @@ -3185,6 +3128,10 @@ class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite): Tests for Apache Ranger column masking policies on tpch nested tables. """ + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def get_workload(cls): return 'tpch_nested' @@ -3211,7 +3158,7 @@ class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite): } # Create another client for admin user since current user doesn't have privileges to # create/drop databases or refresh authorization. - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) try: for tbl in tbl_cols: for col in tbl_cols[tbl]: @@ -3220,8 +3167,7 @@ class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite): mask_type = "MASK_SHOW_FIRST_4" if col.endswith("phone") else "MASK" TestRanger._add_column_masking_policy( policy_name, user, db, tbl, col, mask_type) - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") same_result_queries = ["q1", "q3", "q4", "q5", "q6", "q7", "q8", "q11", "q12", "q13", "q14", "q16", "q17", "q19", "q22"] result_masked_queries = ["q9", "q10", "q15", "q18", "q20", "q21", "q2"] @@ -3243,6 +3189,10 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite): while some tests in TestRanger needs Beeswax interface otherwise some of them fails. """ + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestRangerColumnMaskingComplexTypesInSelectList, cls).add_test_dimensions() @@ -3266,7 +3216,7 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite): db = "functional_orc_def" # Create another client for admin user since current user doesn't have privileges to # create/drop databases or refresh authorization. - admin_client = self.create_impala_client() + admin_client = self.create_impala_client(user=ADMIN) policy_cnt = 0 try: # Add a policy on a primitive column of a table which contains nested columns. @@ -3290,8 +3240,7 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite): unique_name + str(policy_cnt), user, "functional_orc_def", "complextypestbl", "int_array_map", "MASK_NULL") policy_cnt += 1 - self.execute_query_expect_success(admin_client, "refresh authorization", - user=ADMIN) + self.execute_query_expect_success(admin_client, "refresh authorization") self.run_test_case("QueryTest/ranger_column_masking_struct_in_select_list", vector, use_db=db) finally: diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 2a6b2e7bc..8559cd321 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -27,10 +27,10 @@ import re import time from future.utils import with_metaclass + import impala.dbapi as impyla import impala.error as impyla_error import impala.hiveserver2 as hs2 - from impala_thrift_gen.beeswax.BeeswaxService import QueryState from impala_thrift_gen.Query.ttypes import TQueryOptions from impala_thrift_gen.RuntimeProfile.ttypes import TRuntimeProfileFormat @@ -724,6 +724,8 @@ class ImpylaHS2Connection(ImpalaConnection): def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING): + if user is None: + user = self.__user same_user = (user == self.__user) cursor = (self.default_cursor() if same_user # Must create a new cursor to supply 'user'. @@ -769,6 +771,8 @@ class ImpylaHS2Connection(ImpalaConnection): def execute_async(self, sql_stmt, user=None): async_cursor = None + if user is None: + user = self.__user try: async_cursor = self.__open_single_cursor(user=user) handle = OperationHandle(async_cursor, sql_stmt) @@ -964,9 +968,12 @@ class ImpylaHS2ResultSet(object): def __convert_result_row(self, result_tuple): """Take primitive values from a result tuple and construct the tab-separated string that would have been returned via beeswax.""" - return '\t'.join([self.__convert_result_value(val) for val in result_tuple]) + row = list() + for idx, val in enumerate(result_tuple): + row.append(self.__convert_result_value(val, self.column_types[idx])) + return '\t'.join(row) - def __convert_result_value(self, val): + def __convert_result_value(self, val, col_type): """Take a primitive value from a result tuple and its type and construct the string that would have been returned via beeswax.""" if val is None: @@ -974,24 +981,29 @@ class ImpylaHS2ResultSet(object): if type(val) == float: # Same format as what Beeswax uses in the backend. return "{:.16g}".format(val) + elif col_type == 'BOOLEAN': + # Beeswax return 'false' or 'true' for boolean column. + # HS2 return 'False' or 'True'. + return str(val).lower() else: return str(val) def create_connection(host_port, use_kerberos=False, protocol=BEESWAX, - is_hive=False, use_ssl=False, collect_profile_and_log=True): + is_hive=False, use_ssl=False, collect_profile_and_log=True, user=None): if protocol == BEESWAX: c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos, - use_ssl=use_ssl) + user=user, use_ssl=use_ssl) elif protocol == HS2: c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, is_hive=is_hive, use_ssl=use_ssl, - collect_profile_and_log=collect_profile_and_log) + collect_profile_and_log=collect_profile_and_log, user=user) else: assert protocol == HS2_HTTP c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, is_hive=is_hive, use_http_transport=True, http_path='cliservice', - use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log) + use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log, + user=user) # A hook in conftest sets tests.common.current_node. Skip for Hive connections since # Hive cannot modify client_identifier at runtime. diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 909e051f4..cff62f5b1 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -21,21 +21,22 @@ from __future__ import absolute_import, division, print_function from collections import defaultdict +from datetime import datetime import json import logging import os import re -import requests import socket import subprocess -from datetime import datetime from time import sleep, time -from tests.common.impala_connection import create_connection, create_ldap_connection -from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP +import requests from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport +from tests.common.impala_connection import create_connection, create_ldap_connection +from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP + LOG = logging.getLogger('impala_service') LOG.setLevel(level=logging.DEBUG) @@ -479,9 +480,10 @@ class ImpaladService(BaseImpalaService): client.connect() return client - def create_hs2_client(self): + def create_hs2_client(self, user=None): """Creates a new HS2 client connection to the impalad""" - client = create_connection('%s:%d' % (self.hostname, self.hs2_port), protocol=HS2) + client = create_connection('%s:%d' % (self.hostname, self.hs2_port), + protocol=HS2, user=user) client.connect() return client diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 70c162b6f..bf6329ebd 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -372,7 +372,7 @@ class ImpalaTestSuite(BaseTestSuite): return True @classmethod - def create_impala_client(cls, host_port=None, protocol=None, is_hive=False): + def create_impala_client(cls, host_port=None, protocol=None, is_hive=False, user=None): """ Create a new ImpalaConnection client. Make sure to always call this method using a with-as statement or manually close @@ -381,9 +381,9 @@ class ImpalaTestSuite(BaseTestSuite): protocol = cls.default_test_protocol() if host_port is None: host_port = cls.__get_default_host_port(protocol) - client = create_connection(host_port=host_port, - use_kerberos=pytest.config.option.use_kerberos, protocol=protocol, - is_hive=is_hive) + client = create_connection( + host_port=host_port, use_kerberos=pytest.config.option.use_kerberos, + protocol=protocol, is_hive=is_hive, user=user) client.connect() return client diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 5cee71bd3..22bf288be 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -1410,10 +1410,10 @@ class TestAdmissionController(TestAdmissionControllerBase): # Another query should be rejected impalad = self.cluster.impalads[limit % 2] - client = impalad.service.create_hs2_client() + client = impalad.service.create_hs2_client(user=user) client.set_configuration({'request_pool': pool}) try: - client.execute(SLOW_QUERY, user=user) + client.execute(SLOW_QUERY) assert False, "query should fail" except IMPALA_CONNECTION_EXCEPTION as e: # Construct the expected error message. @@ -1440,9 +1440,9 @@ class TestAdmissionController(TestAdmissionControllerBase): def execute_async_and_wait_for_running(self, impalad, query, user, pool): # Execute a query asynchronously, and wait for it to be running. - client = impalad.service.create_hs2_client() + client = impalad.service.create_hs2_client(user=user) client.set_configuration({'request_pool': pool}) - handle = client.execute_async(query, user=user) + handle = client.execute_async(query) timeout_s = 10 # Make sure the query has been admitted and is running. client.wait_for_impala_state(handle, RUNNING, timeout_s)
