This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 44e9b6f97d8613e19bad3a6a4e3a0f09c6bb64a4 Author: Riza Suminto <[email protected]> AuthorDate: Thu May 15 14:31:40 2025 -0700 IMPALA-14078: Reorganize test_ranger.py to share minicluster test_ranger.py is a custom cluster test consisting of 41 test methods. Each test method require minicluster restart. With IMPALA-13503, we can reorganize TestRanger class into 3 separate test class: TestRangerIndependent, TestRangerLegacyCatalog, and TestRangerLocalCatalog. Both TestRangerLegacyCatalog and TestRangerLocalCatalog can maintain the same minicluster without restarting it in between. Testing: - Run and pass test_ranger.py in exhaustive mode. - Confirmed that no test is missing after reorganization. Change-Id: I01ff2b3e98fccfffa8bcdfe1177be98634363b56 Reviewed-on: http://gerrit.cloudera.org:8080/22905 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/authorization/test_ranger.py | 2120 ++++++++++++++++++------------------ 1 file changed, 1047 insertions(+), 1073 deletions(-) diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index b43bc4d9a..034756961 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -69,253 +69,16 @@ LOG = logging.getLogger('impala_test_suite') class TestRanger(CustomClusterTestSuite): """ - Tests for Apache Ranger integration with Apache Impala. + Base class for Apache Ranger integration test with Apache Impala. + This class only contains common helper or base test method. + Pytest method should be declared in either of TestRangerIndependent, + TestRangerLegacyCatalog, or TestRangerLocalCatalog. """ @classmethod def default_test_protocol(cls): return HS2 - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", dir=os.getenv("LOG_DIR")), - impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"), - catalogd_args=CATALOGD_ARGS, - disable_log_buffering=True) - def test_xff_ranger_audit(self): - """ - Tests XFF client IP is included in ranger audit logs when using hs2-http protocol - """ - # Iterate over test vector within test function to avoid restarting cluster. - for vector in\ - [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]: - protocol = vector.get_value("protocol") - if protocol != "hs2-http": - continue - - # Query with XFF header in client request - args = ['--protocol=hs2-http', - '--hs2_x_forward= 10.20.30.40 ', - '-q', 'select count(1) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Query with XFF header in client request - args = ['--protocol=hs2-http', - '--hs2_x_forward=10.20.30.40, 1.1.2.3, 127.0.0.6', - '-q', 'select count(1) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Query with XFF header in client request - args = ['--protocol=hs2-http', - '--hs2_x_forward=10.20.30.40,1.1.2.3,127.0.0.6', - '-q', 'select count(1) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Query without XFF header in client request - args = ['--protocol=hs2-http', - '-q', 'select count(2) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Query with empty XFF header in client request - args = ['--protocol=hs2-http', - '--hs2_x_forward= ', - '-q', 'select count(2) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Query with invalid XFF header in client request - args = ['--protocol=hs2-http', - '--hs2_x_forward=foobar', - '-q', 'select count(3) from functional.alltypes'] - run_impala_shell_cmd(vector, args) - - # Shut down cluster to ensure logs flush to disk. - sleep(5) - self._stop_impala_cluster() - - # Expected audit log string - expected_string_valid_xff = ( - '"access":"select",' - '"resource":"functional/alltypes",' - '"resType":"@table",' - '"action":"select",' - '"result":1,' - '"agent":"impala",' - r'"policy":\d,' - '"enforcer":"ranger-acl",' - '"cliIP":"%s",' - '"reqData":"%s",' - '".+":".+","logType":"RangerAudit"' - ) - - # Ensure audit logs were logged in coordinator logs - self.assert_impalad_log_contains("INFO", expected_string_valid_xff % - ("10.20.30.40", r"select count\(1\) from functional.alltypes"), - expected_count=3) - self.assert_impalad_log_contains("INFO", expected_string_valid_xff % - ("127.0.0.1", r"select count\(2\) from functional.alltypes"), expected_count=2) - self.assert_impalad_log_contains("INFO", expected_string_valid_xff % - ("foobar", r"select count\(3\) from functional.alltypes"), expected_count=1) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_grant_revoke_with_catalog_v1(self, unique_name): - """Tests grant/revoke with catalog v1.""" - self._test_grant_revoke(unique_name, [None, "invalidate metadata", - "refresh authorization"]) - - @pytest.mark.execute_serially - @SkipIfFS.hdfs_acls - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_insert_with_catalog_v1(self, unique_name): - """ - Test that when Ranger is the authorization provider in the legacy catalog mode, - Impala does not throw an AnalysisException when an authorized user tries to execute - an INSERT query against a partitioned table of which the respective table path and - the partition path are not writable according to HDFS permission. - """ - user = getuser() - admin_client = self.create_impala_client(user=ADMIN) - unique_database = unique_name + "_db" - unique_table = unique_name + "_tbl" - partition_column = "year" - partition_value = "2008" - table_path = "test-warehouse/{0}.db/{1}".format(unique_database, unique_table) - table_partition_path = "{0}/{1}={2}"\ - .format(table_path, partition_column, partition_value) - insert_statement = "insert into {0}.{1} (name) partition ({2}) " \ - "values (\"Adam\", {3})".format(unique_database, unique_table, partition_column, - partition_value) - authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ - "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) - try: - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - admin_client.execute("create database {0}".format(unique_database)) - admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int)" - .format(unique_database, unique_table, partition_column)) - admin_client.execute("alter table {0}.{1} add partition ({2}={3})" - .format(unique_database, unique_table, partition_column, partition_value)) - - # Change the owner user and group of the HDFS paths corresponding to the table and - # the partition so that according to Impala's FsPermissionChecker, the table is not - # writable to the user that loads the table. This user usually is the one - # representing the Impala service. Before IMPALA-11871, changing either the table - # path or the partition path to non-writable would result in an AnalysisException. - self.hdfs_client.chown(table_path, "another_user", "another_group") - self.hdfs_client.chown(table_partition_path, "another_user", "another_group") - # Invalidate the table metadata to force the catalog server to reload the HDFS - # table and the related partition(s). - admin_client.execute("invalidate metadata {0}.{1}" - .format(unique_database, unique_table)) - - # Verify that the INSERT statement fails with AuthorizationException because the - # requesting user does not have the INSERT privilege on the table. - result = self._run_query_as_user(insert_statement, user, False) - assert authz_err in str(result) - - admin_client.execute("grant insert on table {0}.{1} to user {2}" - .format(unique_database, unique_table, user)) - # Verify that the INSERT statement succeeds without AnalysisException. - self._run_query_as_user(insert_statement, user, True) - finally: - admin_client.execute("revoke insert on table {0}.{1} from user {2}" - .format(unique_database, unique_table, user)) - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - - @pytest.mark.execute_serially - @SkipIfFS.hdfs_acls - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_load_data_with_catalog_v1(self, unique_name): - """ - Test that when Ranger is the authorization provider in the legacy catalog mode, - Impala does not throw an AnalysisException when an authorized user tries to execute - a LOAD DATA query against a table partition of which the respective partition path is - not writable according to Impala's FsPermissionChecker. - """ - user = getuser() - admin_client = self.create_impala_client(user=ADMIN) - unique_database = unique_name + "_db" - unique_table = unique_name + "_tbl" - partition_column = "year" - partition_value = "2008" - destination_table_path = "test-warehouse/{0}.db/{1}" \ - .format(unique_database, unique_table, ) - destination_table_partition_path = "{0}/{1}={2}"\ - .format(destination_table_path, partition_column, partition_value) - file_name = "load_data_with_catalog_v1.txt" - files_for_table = ["testdata/data/{0}".format(file_name)] - source_hdfs_dir = "/tmp" - load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \ - "partition ({4}={5})".format(source_hdfs_dir, file_name, unique_database, - unique_table, partition_column, partition_value) - authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ - "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) - try: - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - admin_client.execute("create database {0}".format(unique_database)) - copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir) - admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int) " - "row format delimited fields terminated by ',' " - "stored as textfile".format(unique_database, unique_table, partition_column)) - # We need to add the partition. Otherwise, the LOAD DATA statement can't create new - # partitions. - admin_client.execute("alter table {0}.{1} add partition ({2}={3})" - .format(unique_database, unique_table, partition_column, partition_value)) - - # Change the permissions of the HDFS path of the destination table partition. - # Before IMPALA-11871, even we changed the table path to non-writable, loading - # data into the partition was still allowed if the destination partition path - # was writable according to Impala's FsPermissionChecker. But if the destination - # partition path was not writable, an AnalysisException would be thrown. - self.hdfs_client.chown(destination_table_partition_path, "another_user", - "another_group") - # Invalidate the table metadata to force the catalog server to reload the HDFS - # table and the related partition(s). - admin_client.execute("invalidate metadata {0}.{1}" - .format(unique_database, unique_table)) - - # To execute the LOAD DATA statement, a user has to be granted the ALL privilege - # on the source HDFS path and the INSERT privilege on the destination table. - # The following verifies the LOAD DATA statement fails with AuthorizationException - # due to insufficient privileges. - result = self._run_query_as_user(load_data_statement, user, False) - assert authz_err in str(result) - - admin_client.execute("grant all on uri '{0}/{1}' to user {2}" - .format(source_hdfs_dir, file_name, user)) - # The following verifies the ALL privilege on the source file alone is not - # sufficient to execute the LOAD DATA statement. - result = self._run_query_as_user(load_data_statement, user, False) - assert authz_err in str(result) - - admin_client.execute("grant insert on table {0}.{1} to user {2}" - .format(unique_database, unique_table, user)) - # Verify the LOAD DATA statement fails without AnalysisException. - self._run_query_as_user(load_data_statement, user, True) - finally: - admin_client.execute("revoke all on uri '{0}/{1}' from user {2}" - .format(source_hdfs_dir, file_name, user)) - admin_client.execute("revoke insert on table {0}.{1} from user {2}" - .format(unique_database, unique_table, user)) - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - self.filesystem_client.delete_file_dir("{0}/{1}" - .format(source_hdfs_dir, file_name)) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args="{0} {1}".format(IMPALAD_ARGS, "--use_local_catalog=true"), - catalogd_args="{0} {1}".format(CATALOGD_ARGS, "--catalog_topic_mode=minimal")) - def test_grant_revoke_with_local_catalog(self, unique_name): - """Tests grant/revoke with catalog v2 (local catalog).""" - self._test_grant_revoke(unique_name, [None, "invalidate metadata", - "refresh authorization"]) - def _test_grant_revoke(self, unique_name, refresh_statements): user = getuser() admin_client = self.create_impala_client(user=ADMIN) @@ -358,121 +121,16 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade" .format(unique_database)) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_grant_option(self, unique_name): - user1 = getuser() - admin_client = self.create_impala_client(user=ADMIN) - unique_database = unique_name + "_db" - unique_table = unique_name + "_tbl" + def _update_privileges_and_verify(self, admin_client, update_stmt, show_grant_stmt, + expected_privileges): + admin_client.execute(update_stmt) + result = self.client.execute(show_grant_stmt) + TestRanger._check_privileges(result, expected_privileges) + + def _test_show_grant_without_on(self, kw, ident): + self.execute_query_expect_failure(self.client, "show grant {0} {1}".format(kw, ident)) - try: - # Set-up temp database/table - admin_client.execute("drop database if exists {0} cascade".format(unique_database)) - admin_client.execute("create database {0}".format(unique_database)) - admin_client.execute("create table {0}.{1} (x int)" - .format(unique_database, unique_table)) - - # Give user 1 the ability to grant select privileges on unique_database - self.execute_query_expect_success(admin_client, - "grant select on database {0} to user {1} with " - "grant option".format(unique_database, user1)) - self.execute_query_expect_success(admin_client, - "grant insert on database {0} to user {1} with " - "grant option".format(unique_database, user1)) - - # Verify user 1 has with_grant privilege on unique_database - result = self.execute_query("show grant user {0} on database {1}" - .format(user1, unique_database)) - TestRanger._check_privileges(result, [ - ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "true"], - ["USER", user1, unique_database, "", "", "", "", "", "*", "select", "true"], - ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "true"], - ["USER", user1, unique_database, "*", "*", "", "", "", "", "select", "true"]]) - - # Revoke select privilege and check grant option is still present - self.execute_query_expect_success(admin_client, - "revoke select on database {0} from user {1}" - .format(unique_database, user1)) - result = self.execute_query("show grant user {0} on database {1}" - .format(user1, unique_database)) - # Revoking the select privilege also deprives the grantee of the permission to - # transfer other privilege(s) on the same resource to other principals. This is a - # current limitation of Ranger since privileges on the same resource share the same - # delegateAdmin field in the corresponding RangerPolicyItem. - TestRanger._check_privileges(result, [ - ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"], - ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]]) - - # Revoke privilege granting from user 1 - self.execute_query_expect_success(admin_client, "revoke grant option for insert " - "on database {0} from user {1}" - .format(unique_database, user1)) - - # User 1 can no longer grant privileges on unique_database - # In ranger it is currently not possible to revoke grant for a single access type - result = self.execute_query("show grant user {0} on database {1}" - .format(user1, unique_database)) - TestRanger._check_privileges(result, [ - ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"], - ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]]) - finally: - admin_client.execute("revoke insert on database {0} from user {1}" - .format(unique_database, user1)) - admin_client.execute("drop database if exists {0} cascade".format(unique_database)) - - def _update_privileges_and_verify(self, admin_client, update_stmt, show_grant_stmt, - expected_privileges): - admin_client.execute(update_stmt) - result = self.client.execute(show_grant_stmt) - TestRanger._check_privileges(result, expected_privileges) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_show_grant(self, unique_name): - user = getuser() - group = grp.getgrnam(getuser()).gr_name - test_data = [(user, "USER"), (group, "GROUP")] - admin_client = self.create_impala_client(user=ADMIN) - unique_db = unique_name + "_db" - unique_table = unique_name + "_tbl" - udf = "identity" - - try: - # Create test database/table - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - admin_client.execute("create database {0}".format(unique_db)) - admin_client.execute("create table {0}.{1} (x int)" - .format(unique_db, unique_table)) - - for data in test_data: - # Test basic show grant functionality for user/group - self._test_show_grant_basic(admin_client, data[1], data[0], unique_db, - unique_table, udf) - # Test that omitting ON <resource> results in failure - self._test_show_grant_without_on(data[1], data[0]) - - # Test inherited privileges (server privileges show for database, etc.) - self._test_show_grant_inherited(admin_client, data[1], data[0], unique_db, - unique_table) - - # Test ALL privilege hides other privileges - self._test_show_grant_mask(admin_client, user) - - # Test ALL privilege on UDF hides other privileges - self._test_show_grant_mask_on_udf(admin_client, data[1], data[0], unique_db, udf) - - # Test USER inherits privileges for their GROUP - self._test_show_grant_user_group(admin_client, user, group, unique_db) - finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - - def _test_show_grant_without_on(self, kw, ident): - self.execute_query_expect_failure(self.client, "show grant {0} {1}".format(kw, ident)) - - def _test_show_grant_user_group(self, admin_client, user, group, unique_db): + def _test_show_grant_user_group(self, admin_client, user, group, unique_db): try: result = self.client.execute("show grant user {0} on database {1}" .format(user, unique_db)) @@ -890,113 +548,6 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("revoke all on table {0}.{1} from {2} {3}" .format(unique_database, unique_table, kw, id)) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_grant_revoke_ranger_api(self, unique_name): - user = getuser() - admin_client = self.create_impala_client(user=ADMIN) - unique_db = unique_name + "_db" - resource = { - "database": unique_db, - "column": "*", - "table": "*" - } - access = ["select", "create"] - - try: - # Create the test database - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - admin_client.execute("create database {0}".format(unique_db)) - - # Grant privileges via Ranger REST API - TestRanger._grant_ranger_privilege(user, resource, access) - - # Privileges should be stale before a refresh - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - TestRanger._check_privileges(result, []) - - # Refresh and check updated privileges - admin_client.execute("refresh authorization") - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - - TestRanger._check_privileges(result, [ - ["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"], - ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] - ]) - - # Revoke privileges via Ranger REST API - TestRanger._revoke_ranger_privilege(user, resource, access) - - # Privileges should be stale before a refresh - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - TestRanger._check_privileges(result, [ - ["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"], - ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] - ]) - - # Refresh and check updated privileges - admin_client.execute("refresh authorization") - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - - TestRanger._check_privileges(result, []) - finally: - admin_client.execute("revoke all on database {0} from user {1}" - .format(unique_db, user)) - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - - @pytest.mark.execute_serially - @SkipIf.is_test_jdk - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) - def test_show_grant_hive_privilege(self, unique_name): - user = getuser() - admin_client = self.create_impala_client(user=ADMIN) - unique_db = unique_name + "_db" - resource = { - "database": unique_db, - "column": "*", - "table": "*" - } - access = ["lock", "select"] - - try: - TestRanger._grant_ranger_privilege(user, resource, access) - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - admin_client.execute("create database {0}".format(unique_db)) - - admin_client.execute("refresh authorization") - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - - TestRanger._check_privileges(result, [ - ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] - ]) - - # Assert that lock, select privilege exists in Ranger server - assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db) - assert "select" in TestRanger._get_ranger_privileges_db(user, unique_db) - - admin_client.execute("revoke select on database {0} from user {1}" - .format(unique_db, user)) - - # Assert that lock is still present and select is revoked in Ranger server - assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db) - assert "select" not in TestRanger._get_ranger_privileges_db(user, unique_db) - - admin_client.execute("refresh authorization") - result = self.client.execute("show grant user {0} on database {1}" - .format(user, unique_db)) - - TestRanger._check_privileges(result, []) - finally: - admin_client.execute("drop database if exists {0} cascade".format(unique_db)) - TestRanger._revoke_ranger_privilege(user, resource, access) - @staticmethod def _grant_ranger_privilege(user, resource, access): data = { @@ -1259,20 +810,6 @@ class TestRanger(CustomClusterTestSuite): impala_client, query, query_options={'sync_ddl': 1}) return self.execute_query_expect_failure(impala_client, query) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) - def test_grant_multiple_columns(self): - self._test_grant_multiple_columns(13) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, - catalogd_args="{0} {1}".format(CATALOGD_ARGS, "--consolidate_grant_revoke_requests"), - reset_ranger=True) - def test_grant_multiple_columns_consolidate_grant_revoke_requests(self): - self._test_grant_multiple_columns(1) - def _test_grant_multiple_columns(self, expected_num_policies): admin_client = self.create_impala_client(user=ADMIN) access_type = "select" @@ -1336,109 +873,20 @@ class TestRanger(CustomClusterTestSuite): break return result - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_unsupported_sql(self): - """Tests unsupported SQL statements when running with Ranger.""" + def _test_grant_revoke_by_owner(self, unique_name): + non_owner_user = NON_OWNER + non_owner_group = NON_OWNER + grantee_role = "grantee_role" + resource_owner_role = OWNER_USER admin_client = self.create_impala_client(user=ADMIN) - error_msg = "UnsupportedOperationException: SHOW GRANT is not supported without a " \ - "defined resource in Ranger." - statement = "show grant role foo" - result = self.execute_query_expect_failure(admin_client, statement) - assert error_msg in str(result) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_grant_revoke_invalid_principal(self): - """Tests grant/revoke to/from invalid principal should return more readable - error messages.""" - valid_user = "admin" - invalid_user = "invalid_user" - invalid_group = "invalid_group" - # TODO(IMPALA-8640): Create two different Impala clients because the users to - # workaround the bug. - invalid_impala_client = self.create_impala_client(user=invalid_user) - valid_impala_client = self.create_impala_client(user=valid_user) - for statement in ["grant select on table functional.alltypes to user {0}" - .format(getuser()), - "revoke select on table functional.alltypes from user {0}" - .format(getuser())]: - result = self.execute_query_expect_failure(invalid_impala_client, statement) - if "grant" in statement: - assert "Error granting a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result) - else: - assert "Error revoking a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result) - - for statement in ["grant select on table functional.alltypes to user {0}" - .format(invalid_user), - "revoke select on table functional.alltypes from user {0}" - .format(invalid_user)]: - result = self.execute_query_expect_failure(valid_impala_client, statement) - if "grant" in statement: - assert "Error granting a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result) - else: - assert "Error revoking a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result) - - for statement in ["grant select on table functional.alltypes to group {0}" - .format(invalid_group), - "revoke select on table functional.alltypes from group {0}" - .format(invalid_group)]: - result = self.execute_query_expect_failure(valid_impala_client, statement) - if "grant" in statement: - assert "Error granting a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) - else: - assert "Error revoking a privilege in Ranger. Ranger error message: " \ - "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_legacy_catalog_ownership(self): - self._test_ownership() - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args(impalad_args=LOCAL_CATALOG_IMPALAD_ARGS, - catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS) - def test_local_catalog_ownership(self): - # getTableIfCached() in LocalCatalog loads a minimal incomplete table - # that does not include the ownership information. Hence show tables - # *never* show owned tables. TODO(bharathv): Fix in a follow up commit - pytest.xfail("getTableIfCached() faulty behavior, known issue") - self._test_ownership() - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) - def test_grant_revoke_by_owner_legacy_catalog(self, unique_name): - self._test_grant_revoke_by_owner(unique_name) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args(impalad_args=LOCAL_CATALOG_IMPALAD_ARGS, - catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS) - def test_grant_revoke_by_owner_local_catalog(self, unique_name): - self._test_grant_revoke_by_owner(unique_name) - - def _test_grant_revoke_by_owner(self, unique_name): - non_owner_user = NON_OWNER - non_owner_group = NON_OWNER - grantee_role = "grantee_role" - resource_owner_role = OWNER_USER - admin_client = self.create_impala_client(user=ADMIN) - unique_database = unique_name + "_db" - table_name = "tbl" - column_names = ["a", "b"] - udf_uri = "/test-warehouse/impala-hive-udfs.jar" - udf_name = "identity" - test_data = [("USER", non_owner_user), ("GROUP", non_owner_group), - ("ROLE", grantee_role)] - privileges = ["alter", "drop", "create", "insert", "select", "refresh"] + unique_database = unique_name + "_db" + table_name = "tbl" + column_names = ["a", "b"] + udf_uri = "/test-warehouse/impala-hive-udfs.jar" + udf_name = "identity" + test_data = [("USER", non_owner_user), ("GROUP", non_owner_group), + ("ROLE", grantee_role)] + privileges = ["alter", "drop", "create", "insert", "select", "refresh"] try: admin_client.execute("create role {}".format(grantee_role)) @@ -1721,9 +1169,1000 @@ class TestRanger(CustomClusterTestSuite): grantee_type, grantee), OWNER_USER, False) assert ERROR_REVOKE in str(result) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def _test_allow_catalog_cache_op_from_masked_users(self, unique_name): + """Verify that catalog cache operations are allowed for masked users + when allow_catalog_cache_op_from_masked_users=true.""" + user = getuser() + admin_client = self.create_impala_client(user=ADMIN) + non_admin_client = self.create_impala_client(user=user) + try: + # Create a column masking policy on 'user' which is also the owner of the table + TestRanger._add_column_masking_policy( + unique_name, user, "functional", "alltypestiny", "id", + "CUSTOM", "id * 100") + + # At a cold start, the table is unloaded so its owner is unknown. + # INVALIDATE METADATA <table> is denied since 'user' is not detected as the owner. + result = self.execute_query_expect_failure( + non_admin_client, "invalidate metadata functional.alltypestiny") + assert "User '{0}' does not have privileges to execute " \ + "'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \ + in str(result) + # Verify catalogd never loads metadata of this table + table_loaded_log = "Loaded metadata for: functional.alltypestiny" + self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=0) + + # Run a query to trigger metadata loading on the table + self.execute_query_expect_success( + non_admin_client, "describe functional.alltypestiny") + # Verify catalogd loads metadata of this table + self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=1) + + # INVALIDATE METADATA <table> is allowed since 'user' is detected as the owner. + self.execute_query_expect_success( + non_admin_client, "invalidate metadata functional.alltypestiny") + + # Run a query to trigger metadata loading on the table + self.execute_query_expect_success( + non_admin_client, "describe functional.alltypestiny") + # Verify catalogd loads metadata of this table + self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=2) + + # Verify REFRESH <table> is allowed since 'user' is detected as the owner. + self.execute_query_expect_success( + non_admin_client, "refresh functional.alltypestiny") + self.execute_query_expect_success( + non_admin_client, + "refresh functional.alltypestiny partition(year=2009, month=1)") + + # Clear the catalog cache and grant 'user' enough privileges + self.execute_query_expect_success( + admin_client, "invalidate metadata functional.alltypestiny") + admin_client.execute("grant refresh on table functional.alltypestiny to user {0}" + .format(user)) + try: + # Now 'user' should be able to run REFRESH/INVALIDATE even if the table is + # unloaded (not recognize it as the owner). + self.execute_query_expect_success( + non_admin_client, "invalidate metadata functional.alltypestiny") + self.execute_query_expect_success( + non_admin_client, "refresh functional.alltypestiny") + finally: + admin_client.execute( + "revoke refresh on table functional.alltypestiny from user {0}".format(user)) + finally: + TestRanger._remove_policy(unique_name) + + def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self): + """ + Ensure that no exception should throw for show roles statement + if there are no roles in ranger. + """ + admin_client = self.create_impala_client(user=ADMIN) + show_roles_statements = [ + "SHOW ROLES", + "SHOW CURRENT ROLES", + "SHOW ROLE GRANT GROUP admin" + ] + for statement in show_roles_statements: + result = self.execute_query_expect_success(admin_client, statement) + assert len(result.data) == 0 + + def _test_ownership(self): + """Tests ownership privileges for databases and tables with ranger along with + some known quirks in the implementation.""" + test_user = getuser() + test_role = 'test_role' + test_db = "test_ranger_ownership_" + get_random_id(5).lower() + # Create a test database as "admin" user. Owner is set accordingly. + self._run_query_as_user("create database {0}".format(test_db), ADMIN, True) + try: + # Try to create a table under test_db as current user. It should fail. + self._run_query_as_user( + "create table {0}.foo(a int)".format(test_db), test_user, False) + + # Change the owner of the database to the current user. + self._run_query_as_user( + "alter database {0} set owner user {1}".format(test_db, test_user), ADMIN, True) + + self._run_query_as_user("refresh authorization", ADMIN, True) + + # Create should succeed now. + self._run_query_as_user( + "create table {0}.foo(a int)".format(test_db), test_user, True) + # Run show tables on the db. The resulting list should be empty. This happens + # because the created table's ownership information is not aggressively cached + # by the current Catalog implementations. Hence the analysis pass does not + # have access to the ownership information to verify if the current session + # user is actually the owner. We need to fix this by caching the HMS metadata + # more aggressively when the table loads. TODO(IMPALA-8937). + result = \ + self._run_query_as_user("show tables in {0}".format(test_db), test_user, True) + assert len(result.data) == 0 + # Run a simple query that warms up the table metadata and repeat SHOW TABLES. + self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True) + result = \ + self._run_query_as_user("show tables in {0}".format(test_db), test_user, True) + assert len(result.data) == 1 + assert "foo" in result.data + # Change the owner of the db back to the admin user + self._run_query_as_user( + "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN, True) + result = self._run_query_as_user( + "show tables in {0}".format(test_db), test_user, False) + err = "User '{0}' does not have privileges to access: {1}.*.*". \ + format(test_user, test_db) + assert err in str(result) + # test_user is still the owner of the table, so select should work fine. + self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True) + # Change the table owner back to admin. + self._run_query_as_user( + "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN, True) + # create role before test begin. + self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True) + # test alter table owner to role statement, expect success result. + stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role) + self._run_query_as_user(stmt, ADMIN, True) + # drop the role. + self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True) + # alter table to a non-exist role, expect error showing "role doesn't exist". + stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role) + result = self._run_query_as_user(stmt, ADMIN, False) + err = "Role '{0}' does not exist.".format(test_role) + assert err in str(result) + # test_user should not be authorized to run the queries anymore. + result = self._run_query_as_user( + "select * from {0}.foo".format(test_db), test_user, False) + err = ("AuthorizationException: User '{0}' does not have privileges to execute" + + " 'SELECT' on: {1}.foo").format(test_user, test_db) + assert err in str(result) + finally: + self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True) + + def _verified_multiuser_results(self, admin_client, admin_query_tmpl, user_query, users, + user_clients): + assert len(users) == len(user_clients) + for i in range(len(users)): + admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % users[i])).get_data() + user_res = user_clients[i].execute(user_query).get_data() + assert admin_res == user_res + + def _test_select_view_created_by_non_superuser(self, unique_name): + """Test that except for the necessary privileges on the view, the requesting user + has to be granted the necessary privileges on the underlying tables as well in order + to access a view with its table property of 'Authorized' set to false.""" + grantee_user = "non_owner" + admin_client = self.create_impala_client(user=ADMIN) + non_owner_client = self.create_impala_client(user=grantee_user) + unique_database = unique_name + "_db" + test_tbl_1 = unique_name + "_tbl_1" + test_tbl_2 = unique_name + "_tbl_2" + test_view = unique_name + "_view" + + try: + # Set up temp database, tables, and the view. + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) + admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)" + .format(unique_database, test_tbl_1)) + admin_client.execute("create table {0}.{1} (id int, string_col string)" + .format(unique_database, test_tbl_2)) + admin_client.execute("create view {0}.{1} (abc, xyz) as " + "select count(a.bigint_col), b.string_col " + "from {0}.{2} a inner join {0}.{3} b on a.id = b.id " + "group by b.string_col having count(a.bigint_col) > 1" + .format(unique_database, test_view, test_tbl_1, test_tbl_2)) + # Add this table property to simulate a view created by a non-superuser. + self.run_stmt_in_hive("alter view {0}.{1} " + "set tblproperties ('Authorized' = 'false')" + .format(unique_database, test_view)) + + admin_client.execute("grant select(abc) on table {0}.{1} to user {2}" + .format(unique_database, test_view, grantee_user)) + admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}" + .format(unique_database, test_view, grantee_user)) + admin_client.execute("grant select on table {0}.{1} to user {2}" + .format(unique_database, test_tbl_2, grantee_user)) + admin_client.execute("refresh authorization") + + result = self.execute_query_expect_failure(non_owner_client, + "select * from {0}.{1}".format(unique_database, test_view)) + assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ + "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) + + admin_client.execute("grant select(id) on table {0}.{1} to user {2}" + .format(unique_database, test_tbl_1, grantee_user)) + admin_client.execute("refresh authorization") + # The query is not authorized since the SELECT privilege on the column 'bigint_col' + # in the table 'test_tbl_1' is also required. + result = self.execute_query_expect_failure(non_owner_client, + "select * from {0}.{1}".format(unique_database, test_view)) + assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ + "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) + + admin_client.execute("grant select(bigint_col) on table {0}.{1} to user {2}" + .format(unique_database, test_tbl_1, grantee_user)) + admin_client.execute("refresh authorization") + + # The query is authorized successfully once sufficient privileges are granted. + self.execute_query_expect_success(non_owner_client, + "select * from {0}.{1}".format(unique_database, test_view)) + + # Add a deny policy to prevent 'grantee_user' from accessing the column 'id' in + # the table 'test_tbl_2', on which 'grantee_user' had been granted the SELECT + # privilege. + TestRanger._add_deny_policy(unique_name, grantee_user, unique_database, test_tbl_2, + "id") + admin_client.execute("refresh authorization") + + # The query is not authorized since the SELECT privilege on the column 'id' in the + # table 'test_tbl_2' has been denied in the policy above. + result = self.execute_query_expect_failure(non_owner_client, + "select * from {0}.{1}".format(unique_database, test_view)) + assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ + "{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in str(result) + finally: + admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}" + .format(unique_database, test_view, grantee_user)) + admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}" + .format(unique_database, test_view, grantee_user)) + admin_client.execute("revoke select(id) on table {0}.{1} from user {2}" + .format(unique_database, test_tbl_1, grantee_user)) + admin_client.execute("revoke select(bigint_col) on table {0}.{1} from user {2}" + .format(unique_database, test_tbl_1, grantee_user)) + admin_client.execute("revoke select on table {0}.{1} from user {2}" + .format(unique_database, test_tbl_2, grantee_user)) + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + TestRanger._remove_policy(unique_name) + + +class TestRangerIndependent(TestRanger): + """ + Tests for Apache Ranger integration with Apache Impala that require cluster restart + between test method. + """ + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", dir=os.getenv("LOG_DIR")), + impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"), + catalogd_args=CATALOGD_ARGS, + disable_log_buffering=True) + def test_xff_ranger_audit(self): + """ + Tests XFF client IP is included in ranger audit logs when using hs2-http protocol + """ + # Iterate over test vector within test function to avoid restarting cluster. + for vector in\ + [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]: + protocol = vector.get_value("protocol") + if protocol != "hs2-http": + continue + + # Query with XFF header in client request + args = ['--protocol=hs2-http', + '--hs2_x_forward= 10.20.30.40 ', + '-q', 'select count(1) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Query with XFF header in client request + args = ['--protocol=hs2-http', + '--hs2_x_forward=10.20.30.40, 1.1.2.3, 127.0.0.6', + '-q', 'select count(1) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Query with XFF header in client request + args = ['--protocol=hs2-http', + '--hs2_x_forward=10.20.30.40,1.1.2.3,127.0.0.6', + '-q', 'select count(1) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Query without XFF header in client request + args = ['--protocol=hs2-http', + '-q', 'select count(2) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Query with empty XFF header in client request + args = ['--protocol=hs2-http', + '--hs2_x_forward= ', + '-q', 'select count(2) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Query with invalid XFF header in client request + args = ['--protocol=hs2-http', + '--hs2_x_forward=foobar', + '-q', 'select count(3) from functional.alltypes'] + run_impala_shell_cmd(vector, args) + + # Shut down cluster to ensure logs flush to disk. + sleep(5) + self._stop_impala_cluster() + + # Expected audit log string + expected_string_valid_xff = ( + '"access":"select",' + '"resource":"functional/alltypes",' + '"resType":"@table",' + '"action":"select",' + '"result":1,' + '"agent":"impala",' + r'"policy":\d,' + '"enforcer":"ranger-acl",' + '"cliIP":"%s",' + '"reqData":"%s",' + '".+":".+","logType":"RangerAudit"' + ) + + # Ensure audit logs were logged in coordinator logs + self.assert_impalad_log_contains("INFO", expected_string_valid_xff % + ("10.20.30.40", r"select count\(1\) from functional.alltypes"), + expected_count=3) + self.assert_impalad_log_contains("INFO", expected_string_valid_xff % + ("127.0.0.1", r"select count\(2\) from functional.alltypes"), expected_count=2) + self.assert_impalad_log_contains("INFO", expected_string_valid_xff % + ("foobar", r"select count\(3\) from functional.alltypes"), expected_count=1) + + @pytest.mark.execute_serially + @SkipIf.is_test_jdk + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) + def test_show_grant_hive_privilege(self, unique_name): + user = getuser() + admin_client = self.create_impala_client(user=ADMIN) + unique_db = unique_name + "_db" + resource = { + "database": unique_db, + "column": "*", + "table": "*" + } + access = ["lock", "select"] + + try: + TestRanger._grant_ranger_privilege(user, resource, access) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) + + admin_client.execute("refresh authorization") + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + + TestRanger._check_privileges(result, [ + ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] + ]) + + # Assert that lock, select privilege exists in Ranger server + assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db) + assert "select" in TestRanger._get_ranger_privileges_db(user, unique_db) + + admin_client.execute("revoke select on database {0} from user {1}" + .format(unique_db, user)) + + # Assert that lock is still present and select is revoked in Ranger server + assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db) + assert "select" not in TestRanger._get_ranger_privileges_db(user, unique_db) + + admin_client.execute("refresh authorization") + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + + TestRanger._check_privileges(result, []) + finally: + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + TestRanger._revoke_ranger_privilege(user, resource, access) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True) + def test_grant_multiple_columns(self): + self._test_grant_multiple_columns(13) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, + catalogd_args="{0} {1}".format(CATALOGD_ARGS, "--consolidate_grant_revoke_requests"), + reset_ranger=True) + def test_grant_multiple_columns_consolidate_grant_revoke_requests(self): + self._test_grant_multiple_columns(1) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, + catalogd_args=CATALOGD_ARGS + " --hms_event_polling_interval_s=5") + def test_alter_owner_hms_event_sync(self, unique_name): + """Test Impala queries that depends on database ownership changes in Hive. + Use a longer polling interval to mimic lag in event processing.""" + test_user = getuser() + test_db = unique_name + "_db" + # A client that only used by 'test_user'. Just need to set the username at the + # first statement. It will keep using the same username. + user_client = self.create_impala_client(user=test_user) + user_client.set_configuration({"sync_hms_events_wait_time_s": 10, + "sync_hms_events_strict_mode": True}) + + # Methods to change ownership in Hive which will generate ALTER_DATABASE events. + def change_db_owner_to_user(): + self.run_stmt_in_hive( + "alter database {0} set owner user {1}".format(test_db, test_user), ADMIN) + + def reset_db_owner_to_admin(): + self.run_stmt_in_hive( + "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN) + + # Create a test database as "admin" user. Owner is set accordingly. + # By default, only the "admin" user and owner of the db can read/write this db. + self._run_query_as_user( + "drop database if exists {0} cascade".format(test_db), ADMIN, expect_success=True) + self._run_query_as_user( + "create database {0}".format(test_db), ADMIN, expect_success=True) + try: + # Test table statement waits for db alter owner events + # Try to create a table under test_db as current user. It should fail since + # test_user is not the db owner. + create_tbl_stmt = "create table {0}.foo(a int)".format(test_db) + self.execute_query_expect_failure(user_client, create_tbl_stmt) + change_db_owner_to_user() + # Creating the table again should succeed once the ALTER_DATABASE event is synced. + self.execute_query_expect_success(user_client, create_tbl_stmt) + reset_db_owner_to_admin() + + # Test table statement waits for table alter owner events + stmts = [ + "describe {}.foo".format(test_db), + "insert into {}.foo values (0)".format(test_db), + "select * from {}.foo".format(test_db), + "compute stats {}.foo".format(test_db), + "refresh {}.foo".format(test_db), + "drop table {}.foo".format(test_db), + ] + for stmt in stmts: + # Change table owner to admin + self.run_stmt_in_hive( + "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN) + self.execute_query_expect_failure(user_client, stmt) + # Change table owner to user + self.run_stmt_in_hive( + "alter table {0}.foo set owner user {1}".format(test_db, test_user), ADMIN) + self.execute_query_expect_success(user_client, stmt) + # Create the table again since the last statement is DROP TABLE. + change_db_owner_to_user() + self.execute_query_expect_success(user_client, create_tbl_stmt) + reset_db_owner_to_admin() + + # Test SHOW DATABASES waits for db change owner events. The db is invisible if user + # is not the owner. + assert test_db not in self.all_db_names(user_client) + change_db_owner_to_user() + assert test_db in self.all_db_names(user_client) + reset_db_owner_to_admin() + + # Test SHOW TABLES + # Run a query on the table to make its ownership info loaded. Otherwise, it will + # be missing in SHOW TABLES due to IMPALA-8937. + self.execute_query_expect_success(user_client, "describe {}.foo".format(test_db)) + # SHOW TABLES should fail since user is not the owner of this db + self.execute_query_expect_failure(user_client, "show tables in " + test_db) + change_db_owner_to_user() + assert ["foo"] == self.all_table_names(user_client, test_db) + reset_db_owner_to_admin() + finally: + self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="{0} {1}".format(IMPALAD_ARGS, + "--allow_catalog_cache_op_from_masked_users=true"), + catalogd_args=CATALOGD_ARGS, + disable_log_buffering=True) + def test_allow_metadata_update(self, unique_name): + self._test_allow_catalog_cache_op_from_masked_users(unique_name) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS, + "--allow_catalog_cache_op_from_masked_users=true"), + catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS, + disable_log_buffering=True) + def test_allow_metadata_update_local_catalog(self, unique_name): + self._test_allow_catalog_cache_op_from_masked_users(unique_name) + + @pytest.mark.execute_serially + @SkipIf.is_test_jdk + @SkipIfFS.hive + @SkipIfHive2.ranger_auth + @CustomClusterTestSuite.with_args() + def test_hive_with_ranger_setup(self, vector): + """Test for setup of Hive-Ranger integration. Make sure future upgrades on + Hive/Ranger won't break the tool.""" + script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh') + try: + # Add the policy before restarting Hive. So it can take effect immediately after + # HiveServer2 starts. + TestRanger._add_column_masking_policy( + "col_mask_for_hive", getuser(), "functional", "alltypestiny", "id", "CUSTOM", + "{col} * 100") + check_call([script, '-with_ranger']) + self.run_test_case("QueryTest/hive_ranger_integration", vector) + finally: + check_call([script]) + TestRanger._remove_policy("col_mask_for_hive") + + @pytest.mark.execute_serially + @SkipIfFS.incorrent_reported_ec + @CustomClusterTestSuite.with_args( + # We additionally provide impalad and catalogd with the customized user-to-groups + # mapper since some test cases in grant_revoke.test require Impala to retrieve the + # groups a given user belongs to and such users might not exist in the underlying + # OS in the testing environment, e.g., the user 'non_owner'. + impalad_args="{0} {1}".format(IMPALAD_ARGS, + "--use_customized_user_groups_mapper_for_ranger"), + catalogd_args="{0} {1}".format(CATALOGD_ARGS, + "--use_customized_user_groups_mapper_for_ranger")) + def test_grant_revoke_with_role(self, vector): + """Test grant/revoke with role.""" + admin_client = self.create_impala_client(user=ADMIN) + try: + self.run_test_case('QueryTest/grant_revoke', vector, use_db="default") + finally: + # Below are the statements that need to be executed in order to clean up the + # privileges granted to the test roles as well as the test roles themselves. + # Note that we need to revoke those previously granted privileges so that each role + # is not referenced by any policy before we delete those roles. + # Moreover, we need to revoke the privilege on the database 'grant_rev_db' before + # dropping 'grant_rev_db'. Otherwise, the revocation would fail due to an + # AnalysisException thrown because 'grant_rev_db' does not exist. + cleanup_statements = [ + "revoke all on database grant_rev_db from grant_revoke_test_ALL_TEST_DB", + "revoke all on server from grant_revoke_test_ALL_SERVER", + "revoke all on table functional.alltypes from grant_revoke_test_NON_OWNER", + "revoke grant option for all on database functional " + "from grant_revoke_test_NON_OWNER", + "REVOKE SELECT (a, b, c, d, e, x, y) ON TABLE grant_rev_db.test_tbl3 " + "FROM grant_revoke_test_ALL_SERVER", + "REVOKE ALL ON DATABASE functional FROM grant_revoke_test_NON_OWNER", + "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_test_NON_OWNER", + "REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 " + "FROM grant_revoke_test_ALL_SERVER", + "REVOKE SELECT ON TABLE grant_rev_db.test_tbl1 " + "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL", + "REVOKE INSERT ON TABLE grant_rev_db.test_tbl1 " + "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL", + "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 " + "FROM grant_revoke_test_NON_OWNER", + "REVOKE SELECT (a) ON TABLE grant_rev_db.test_tbl3 " + "FROM grant_revoke_test_NON_OWNER", + "REVOKE SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 " + "FROM grant_revoke_test_ALL_SERVER", + "revoke all on server server1 from grant_revoke_test_ALL_SERVER1", + "revoke select(col1) on table grant_rev_db.test_tbl4 " + "from role grant_revoke_test_COLUMN_PRIV", + "{0}{1}{2}".format("revoke all on uri '", + os.getenv("FILESYSTEM_PREFIX"), + "/test-warehouse/grant_rev_test_tbl2'" + "from grant_revoke_test_ALL_URI"), + "{0}{1}{2}".format("revoke all on uri '", + os.getenv("FILESYSTEM_PREFIX"), + "/test-warehouse/GRANT_REV_TEST_TBL3'" + "from grant_revoke_test_ALL_URI"), + "{0}{1}{2}".format("revoke all on uri '", + os.getenv("FILESYSTEM_PREFIX"), + "/test-warehouse/grant_rev_test_prt'" + "from grant_revoke_test_ALL_URI"), + "drop role grant_revoke_test_ALL_TEST_DB", + "drop role grant_revoke_test_ALL_SERVER", + "drop role grant_revoke_test_SELECT_INSERT_TEST_TBL", + "drop role grant_revoke_test_ALL_URI", + "drop role grant_revoke_test_NON_OWNER", + "drop role grant_revoke_test_ALL_SERVER1", + "drop role grant_revoke_test_COLUMN_PRIV", + "drop database grant_rev_db cascade" + ] + + for statement in cleanup_statements: + try: + admin_client.execute(statement) + except Exception: + # There could be an exception thrown due to the non-existence of the role or + # resource involved in a statement that aims to revoke the privilege on a + # resource from a role, but we do not have to handle such an exception. We only + # need to make sure in the case when the role and the corresponding resource + # exist, the granted privilege is revoked. The same applies to the case when we + # drop a role. + pass + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=LOCAL_CATALOG_IMPALAD_ARGS, + catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS, + # We additionally set 'reset_ranger' to True, to reset all the policies in the + # Ranger service, so even if there were roles before this test, they will be + # deleted when this test runs. since the Ranger policies are reset before this + # test, we do not have to worry about there could be existing roles when the test + # is running. + reset_ranger=True) + def test_no_exception_in_show_roles_if_no_roles_in_ranger(self): + self._test_no_exception_in_show_roles_if_no_roles_in_ranger() + + [email protected]_args( + impalad_args=IMPALAD_ARGS, + catalogd_args=CATALOGD_ARGS) +class TestRangerLegacyCatalog(TestRanger): + """ + Tests for Apache Ranger integration with Apache Impala in legacy catalog mode. + Test methods shares common cluster. + """ + + @pytest.mark.execute_serially + def test_grant_revoke_with_catalog_v1(self, unique_name): + """Tests grant/revoke with catalog v1.""" + self._test_grant_revoke(unique_name, [None, "invalidate metadata", + "refresh authorization"]) + + @pytest.mark.execute_serially + @SkipIfFS.hdfs_acls + def test_insert_with_catalog_v1(self, unique_name): + """ + Test that when Ranger is the authorization provider in the legacy catalog mode, + Impala does not throw an AnalysisException when an authorized user tries to execute + an INSERT query against a partitioned table of which the respective table path and + the partition path are not writable according to HDFS permission. + """ + user = getuser() + admin_client = self.create_impala_client(user=ADMIN) + unique_database = unique_name + "_db" + unique_table = unique_name + "_tbl" + partition_column = "year" + partition_value = "2008" + table_path = "test-warehouse/{0}.db/{1}".format(unique_database, unique_table) + table_partition_path = "{0}/{1}={2}"\ + .format(table_path, partition_column, partition_value) + insert_statement = "insert into {0}.{1} (name) partition ({2}) " \ + "values (\"Adam\", {3})".format(unique_database, unique_table, partition_column, + partition_value) + authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ + "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) + admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int)" + .format(unique_database, unique_table, partition_column)) + admin_client.execute("alter table {0}.{1} add partition ({2}={3})" + .format(unique_database, unique_table, partition_column, partition_value)) + + # Change the owner user and group of the HDFS paths corresponding to the table and + # the partition so that according to Impala's FsPermissionChecker, the table is not + # writable to the user that loads the table. This user usually is the one + # representing the Impala service. Before IMPALA-11871, changing either the table + # path or the partition path to non-writable would result in an AnalysisException. + self.hdfs_client.chown(table_path, "another_user", "another_group") + self.hdfs_client.chown(table_partition_path, "another_user", "another_group") + # Invalidate the table metadata to force the catalog server to reload the HDFS + # table and the related partition(s). + admin_client.execute("invalidate metadata {0}.{1}" + .format(unique_database, unique_table)) + + # Verify that the INSERT statement fails with AuthorizationException because the + # requesting user does not have the INSERT privilege on the table. + result = self._run_query_as_user(insert_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant insert on table {0}.{1} to user {2}" + .format(unique_database, unique_table, user)) + # Verify that the INSERT statement succeeds without AnalysisException. + self._run_query_as_user(insert_statement, user, True) + finally: + admin_client.execute("revoke insert on table {0}.{1} from user {2}" + .format(unique_database, unique_table, user)) + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + + @pytest.mark.execute_serially + @SkipIfFS.hdfs_acls + def test_load_data_with_catalog_v1(self, unique_name): + """ + Test that when Ranger is the authorization provider in the legacy catalog mode, + Impala does not throw an AnalysisException when an authorized user tries to execute + a LOAD DATA query against a table partition of which the respective partition path is + not writable according to Impala's FsPermissionChecker. + """ + user = getuser() + admin_client = self.create_impala_client(user=ADMIN) + unique_database = unique_name + "_db" + unique_table = unique_name + "_tbl" + partition_column = "year" + partition_value = "2008" + destination_table_path = "test-warehouse/{0}.db/{1}" \ + .format(unique_database, unique_table, ) + destination_table_partition_path = "{0}/{1}={2}"\ + .format(destination_table_path, partition_column, partition_value) + file_name = "load_data_with_catalog_v1.txt" + files_for_table = ["testdata/data/{0}".format(file_name)] + source_hdfs_dir = "/tmp" + load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \ + "partition ({4}={5})".format(source_hdfs_dir, file_name, unique_database, + unique_table, partition_column, partition_value) + authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ + "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) + copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir) + admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int) " + "row format delimited fields terminated by ',' " + "stored as textfile".format(unique_database, unique_table, partition_column)) + # We need to add the partition. Otherwise, the LOAD DATA statement can't create new + # partitions. + admin_client.execute("alter table {0}.{1} add partition ({2}={3})" + .format(unique_database, unique_table, partition_column, partition_value)) + + # Change the permissions of the HDFS path of the destination table partition. + # Before IMPALA-11871, even we changed the table path to non-writable, loading + # data into the partition was still allowed if the destination partition path + # was writable according to Impala's FsPermissionChecker. But if the destination + # partition path was not writable, an AnalysisException would be thrown. + self.hdfs_client.chown(destination_table_partition_path, "another_user", + "another_group") + # Invalidate the table metadata to force the catalog server to reload the HDFS + # table and the related partition(s). + admin_client.execute("invalidate metadata {0}.{1}" + .format(unique_database, unique_table)) + + # To execute the LOAD DATA statement, a user has to be granted the ALL privilege + # on the source HDFS path and the INSERT privilege on the destination table. + # The following verifies the LOAD DATA statement fails with AuthorizationException + # due to insufficient privileges. + result = self._run_query_as_user(load_data_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant all on uri '{0}/{1}' to user {2}" + .format(source_hdfs_dir, file_name, user)) + # The following verifies the ALL privilege on the source file alone is not + # sufficient to execute the LOAD DATA statement. + result = self._run_query_as_user(load_data_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant insert on table {0}.{1} to user {2}" + .format(unique_database, unique_table, user)) + # Verify the LOAD DATA statement fails without AnalysisException. + self._run_query_as_user(load_data_statement, user, True) + finally: + admin_client.execute("revoke all on uri '{0}/{1}' from user {2}" + .format(source_hdfs_dir, file_name, user)) + admin_client.execute("revoke insert on table {0}.{1} from user {2}" + .format(unique_database, unique_table, user)) + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database)) + self.filesystem_client.delete_file_dir("{0}/{1}" + .format(source_hdfs_dir, file_name)) + + @pytest.mark.execute_serially + def test_grant_option(self, unique_name): + user1 = getuser() + admin_client = self.create_impala_client(user=ADMIN) + unique_database = unique_name + "_db" + unique_table = unique_name + "_tbl" + + try: + # Set-up temp database/table + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + admin_client.execute("create database {0}".format(unique_database)) + admin_client.execute("create table {0}.{1} (x int)" + .format(unique_database, unique_table)) + + # Give user 1 the ability to grant select privileges on unique_database + self.execute_query_expect_success(admin_client, + "grant select on database {0} to user {1} with " + "grant option".format(unique_database, user1)) + self.execute_query_expect_success(admin_client, + "grant insert on database {0} to user {1} with " + "grant option".format(unique_database, user1)) + + # Verify user 1 has with_grant privilege on unique_database + result = self.execute_query("show grant user {0} on database {1}" + .format(user1, unique_database)) + TestRanger._check_privileges(result, [ + ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "true"], + ["USER", user1, unique_database, "", "", "", "", "", "*", "select", "true"], + ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "true"], + ["USER", user1, unique_database, "*", "*", "", "", "", "", "select", "true"]]) + + # Revoke select privilege and check grant option is still present + self.execute_query_expect_success(admin_client, + "revoke select on database {0} from user {1}" + .format(unique_database, user1)) + result = self.execute_query("show grant user {0} on database {1}" + .format(user1, unique_database)) + # Revoking the select privilege also deprives the grantee of the permission to + # transfer other privilege(s) on the same resource to other principals. This is a + # current limitation of Ranger since privileges on the same resource share the same + # delegateAdmin field in the corresponding RangerPolicyItem. + TestRanger._check_privileges(result, [ + ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"], + ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]]) + + # Revoke privilege granting from user 1 + self.execute_query_expect_success(admin_client, "revoke grant option for insert " + "on database {0} from user {1}" + .format(unique_database, user1)) + + # User 1 can no longer grant privileges on unique_database + # In ranger it is currently not possible to revoke grant for a single access type + result = self.execute_query("show grant user {0} on database {1}" + .format(user1, unique_database)) + TestRanger._check_privileges(result, [ + ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"], + ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]]) + finally: + admin_client.execute("revoke insert on database {0} from user {1}" + .format(unique_database, user1)) + admin_client.execute("drop database if exists {0} cascade".format(unique_database)) + + @pytest.mark.execute_serially + def test_show_grant(self, unique_name): + user = getuser() + group = grp.getgrnam(getuser()).gr_name + test_data = [(user, "USER"), (group, "GROUP")] + admin_client = self.create_impala_client(user=ADMIN) + unique_db = unique_name + "_db" + unique_table = unique_name + "_tbl" + udf = "identity" + + try: + # Create test database/table + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) + admin_client.execute("create table {0}.{1} (x int)" + .format(unique_db, unique_table)) + + for data in test_data: + # Test basic show grant functionality for user/group + self._test_show_grant_basic(admin_client, data[1], data[0], unique_db, + unique_table, udf) + # Test that omitting ON <resource> results in failure + self._test_show_grant_without_on(data[1], data[0]) + + # Test inherited privileges (server privileges show for database, etc.) + self._test_show_grant_inherited(admin_client, data[1], data[0], unique_db, + unique_table) + + # Test ALL privilege hides other privileges + self._test_show_grant_mask(admin_client, user) + + # Test ALL privilege on UDF hides other privileges + self._test_show_grant_mask_on_udf(admin_client, data[1], data[0], unique_db, udf) + + # Test USER inherits privileges for their GROUP + self._test_show_grant_user_group(admin_client, user, group, unique_db) + finally: + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + + @pytest.mark.execute_serially + def test_grant_revoke_ranger_api(self, unique_name): + user = getuser() + admin_client = self.create_impala_client(user=ADMIN) + unique_db = unique_name + "_db" + resource = { + "database": unique_db, + "column": "*", + "table": "*" + } + access = ["select", "create"] + + try: + # Create the test database + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + admin_client.execute("create database {0}".format(unique_db)) + + # Grant privileges via Ranger REST API + TestRanger._grant_ranger_privilege(user, resource, access) + + # Privileges should be stale before a refresh + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + TestRanger._check_privileges(result, []) + + # Refresh and check updated privileges + admin_client.execute("refresh authorization") + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + + TestRanger._check_privileges(result, [ + ["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"], + ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] + ]) + + # Revoke privileges via Ranger REST API + TestRanger._revoke_ranger_privilege(user, resource, access) + + # Privileges should be stale before a refresh + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + TestRanger._check_privileges(result, [ + ["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"], + ["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"] + ]) + + # Refresh and check updated privileges + admin_client.execute("refresh authorization") + result = self.client.execute("show grant user {0} on database {1}" + .format(user, unique_db)) + + TestRanger._check_privileges(result, []) + finally: + admin_client.execute("revoke all on database {0} from user {1}" + .format(unique_db, user)) + admin_client.execute("drop database if exists {0} cascade".format(unique_db)) + + @pytest.mark.execute_serially + def test_legacy_catalog_ownership(self): + self._test_ownership() + + @pytest.mark.execute_serially + def test_grant_revoke_by_owner_legacy_catalog(self, unique_name): + self._test_grant_revoke_by_owner(unique_name) + + @pytest.mark.execute_serially + def test_unsupported_sql(self): + """Tests unsupported SQL statements when running with Ranger.""" + admin_client = self.create_impala_client(user=ADMIN) + error_msg = "UnsupportedOperationException: SHOW GRANT is not supported without a " \ + "defined resource in Ranger." + statement = "show grant role foo" + result = self.execute_query_expect_failure(admin_client, statement) + assert error_msg in str(result) + + @pytest.mark.execute_serially + def test_grant_revoke_invalid_principal(self): + """Tests grant/revoke to/from invalid principal should return more readable + error messages.""" + valid_user = "admin" + invalid_user = "invalid_user" + invalid_group = "invalid_group" + # TODO(IMPALA-8640): Create two different Impala clients because the users to + # workaround the bug. + invalid_impala_client = self.create_impala_client(user=invalid_user) + valid_impala_client = self.create_impala_client(user=valid_user) + for statement in ["grant select on table functional.alltypes to user {0}" + .format(getuser()), + "revoke select on table functional.alltypes from user {0}" + .format(getuser())]: + result = self.execute_query_expect_failure(invalid_impala_client, statement) + if "grant" in statement: + assert "Error granting a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result) + else: + assert "Error revoking a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result) + + for statement in ["grant select on table functional.alltypes to user {0}" + .format(invalid_user), + "revoke select on table functional.alltypes from user {0}" + .format(invalid_user)]: + result = self.execute_query_expect_failure(valid_impala_client, statement) + if "grant" in statement: + assert "Error granting a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result) + else: + assert "Error revoking a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result) + + for statement in ["grant select on table functional.alltypes to group {0}" + .format(invalid_group), + "revoke select on table functional.alltypes from group {0}" + .format(invalid_group)]: + result = self.execute_query_expect_failure(valid_impala_client, statement) + if "grant" in statement: + assert "Error granting a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) + else: + assert "Error revoking a privilege in Ranger. Ranger error message: " \ + "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result) + + @pytest.mark.execute_serially def test_show_functions(self, unique_name): user1 = getuser() admin_client = self.create_impala_client(user=ADMIN) @@ -1763,8 +2202,6 @@ class TestRanger(CustomClusterTestSuite): ADMIN, True) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_select_function(self, unique_name): """Verifies that to execute a UDF in a database, a user has to be granted a) the SELECT privilege on the UDF, and b) any of the SELECT, INSERT, REFRESH privileges on @@ -1885,163 +2322,7 @@ class TestRanger(CustomClusterTestSuite): self._run_query_as_user("drop database {0} cascade".format(unique_database), ADMIN, True) - def _test_ownership(self): - """Tests ownership privileges for databases and tables with ranger along with - some known quirks in the implementation.""" - test_user = getuser() - test_role = 'test_role' - test_db = "test_ranger_ownership_" + get_random_id(5).lower() - # Create a test database as "admin" user. Owner is set accordingly. - self._run_query_as_user("create database {0}".format(test_db), ADMIN, True) - try: - # Try to create a table under test_db as current user. It should fail. - self._run_query_as_user( - "create table {0}.foo(a int)".format(test_db), test_user, False) - - # Change the owner of the database to the current user. - self._run_query_as_user( - "alter database {0} set owner user {1}".format(test_db, test_user), ADMIN, True) - - self._run_query_as_user("refresh authorization", ADMIN, True) - - # Create should succeed now. - self._run_query_as_user( - "create table {0}.foo(a int)".format(test_db), test_user, True) - # Run show tables on the db. The resulting list should be empty. This happens - # because the created table's ownership information is not aggressively cached - # by the current Catalog implementations. Hence the analysis pass does not - # have access to the ownership information to verify if the current session - # user is actually the owner. We need to fix this by caching the HMS metadata - # more aggressively when the table loads. TODO(IMPALA-8937). - result = \ - self._run_query_as_user("show tables in {0}".format(test_db), test_user, True) - assert len(result.data) == 0 - # Run a simple query that warms up the table metadata and repeat SHOW TABLES. - self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True) - result = \ - self._run_query_as_user("show tables in {0}".format(test_db), test_user, True) - assert len(result.data) == 1 - assert "foo" in result.data - # Change the owner of the db back to the admin user - self._run_query_as_user( - "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN, True) - result = self._run_query_as_user( - "show tables in {0}".format(test_db), test_user, False) - err = "User '{0}' does not have privileges to access: {1}.*.*". \ - format(test_user, test_db) - assert err in str(result) - # test_user is still the owner of the table, so select should work fine. - self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True) - # Change the table owner back to admin. - self._run_query_as_user( - "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN, True) - # create role before test begin. - self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True) - # test alter table owner to role statement, expect success result. - stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role) - self._run_query_as_user(stmt, ADMIN, True) - # drop the role. - self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True) - # alter table to a non-exist role, expect error showing "role doesn't exist". - stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role) - result = self._run_query_as_user(stmt, ADMIN, False) - err = "Role '{0}' does not exist.".format(test_role) - assert err in str(result) - # test_user should not be authorized to run the queries anymore. - result = self._run_query_as_user( - "select * from {0}.foo".format(test_db), test_user, False) - err = ("AuthorizationException: User '{0}' does not have privileges to execute" - + " 'SELECT' on: {1}.foo").format(test_user, test_db) - assert err in str(result) - finally: - self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, - catalogd_args=CATALOGD_ARGS + " --hms_event_polling_interval_s=5") - def test_alter_owner_hms_event_sync(self, unique_name): - """Test Impala queries that depends on database ownership changes in Hive. - Use a longer polling interval to mimic lag in event processing.""" - test_user = getuser() - test_db = unique_name + "_db" - # A client that only used by 'test_user'. Just need to set the username at the - # first statement. It will keep using the same username. - user_client = self.create_impala_client(user=test_user) - user_client.set_configuration({"sync_hms_events_wait_time_s": 10, - "sync_hms_events_strict_mode": True}) - - # Methods to change ownership in Hive which will generate ALTER_DATABASE events. - def change_db_owner_to_user(): - self.run_stmt_in_hive( - "alter database {0} set owner user {1}".format(test_db, test_user), ADMIN) - - def reset_db_owner_to_admin(): - self.run_stmt_in_hive( - "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN) - - # Create a test database as "admin" user. Owner is set accordingly. - # By default, only the "admin" user and owner of the db can read/write this db. - self._run_query_as_user( - "drop database if exists {0} cascade".format(test_db), ADMIN, expect_success=True) - self._run_query_as_user( - "create database {0}".format(test_db), ADMIN, expect_success=True) - try: - # Test table statement waits for db alter owner events - # Try to create a table under test_db as current user. It should fail since - # test_user is not the db owner. - create_tbl_stmt = "create table {0}.foo(a int)".format(test_db) - self.execute_query_expect_failure(user_client, create_tbl_stmt) - change_db_owner_to_user() - # Creating the table again should succeed once the ALTER_DATABASE event is synced. - self.execute_query_expect_success(user_client, create_tbl_stmt) - reset_db_owner_to_admin() - - # Test table statement waits for table alter owner events - stmts = [ - "describe {}.foo".format(test_db), - "insert into {}.foo values (0)".format(test_db), - "select * from {}.foo".format(test_db), - "compute stats {}.foo".format(test_db), - "refresh {}.foo".format(test_db), - "drop table {}.foo".format(test_db), - ] - for stmt in stmts: - # Change table owner to admin - self.run_stmt_in_hive( - "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN) - self.execute_query_expect_failure(user_client, stmt) - # Change table owner to user - self.run_stmt_in_hive( - "alter table {0}.foo set owner user {1}".format(test_db, test_user), ADMIN) - self.execute_query_expect_success(user_client, stmt) - # Create the table again since the last statement is DROP TABLE. - change_db_owner_to_user() - self.execute_query_expect_success(user_client, create_tbl_stmt) - reset_db_owner_to_admin() - - # Test SHOW DATABASES waits for db change owner events. The db is invisible if user - # is not the owner. - assert test_db not in self.all_db_names(user_client) - change_db_owner_to_user() - assert test_db in self.all_db_names(user_client) - reset_db_owner_to_admin() - - # Test SHOW TABLES - # Run a query on the table to make its ownership info loaded. Otherwise, it will - # be missing in SHOW TABLES due to IMPALA-8937. - self.execute_query_expect_success(user_client, "describe {}.foo".format(test_db)) - # SHOW TABLES should fail since user is not the owner of this db - self.execute_query_expect_failure(user_client, "show tables in " + test_db) - change_db_owner_to_user() - assert ["foo"] == self.all_table_names(user_client, test_db) - reset_db_owner_to_admin() - finally: - self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_select_function_with_fallback_db(self, unique_name): """Verifies that Impala should not allow using functions in the fallback database unless the user has been granted sufficient privileges on the given database.""" @@ -2120,8 +2401,6 @@ class TestRanger(CustomClusterTestSuite): ADMIN, True) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_column_masking(self, vector, unique_name): user = getuser() unique_database = unique_name + '_db' @@ -2216,8 +2495,6 @@ class TestRanger(CustomClusterTestSuite): TestRanger._remove_policy(unique_name + str(i)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_block_metadata_update(self, unique_name): """Test that the metadata update operation on a table by a requesting user is denied if there exists a column masking policy defined on any column in the table for the @@ -2245,90 +2522,6 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("revoke all on server from user {0}".format(user)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args="{0} {1}".format(IMPALAD_ARGS, - "--allow_catalog_cache_op_from_masked_users=true"), - catalogd_args=CATALOGD_ARGS, - disable_log_buffering=True) - def test_allow_metadata_update(self, unique_name): - self.__test_allow_catalog_cache_op_from_masked_users(unique_name) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS, - "--allow_catalog_cache_op_from_masked_users=true"), - catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS, - disable_log_buffering=True) - def test_allow_metadata_update_local_catalog(self, unique_name): - self.__test_allow_catalog_cache_op_from_masked_users(unique_name) - - def __test_allow_catalog_cache_op_from_masked_users(self, unique_name): - """Verify that catalog cache operations are allowed for masked users - when allow_catalog_cache_op_from_masked_users=true.""" - user = getuser() - admin_client = self.create_impala_client(user=ADMIN) - non_admin_client = self.create_impala_client(user=user) - try: - # Create a column masking policy on 'user' which is also the owner of the table - TestRanger._add_column_masking_policy( - unique_name, user, "functional", "alltypestiny", "id", - "CUSTOM", "id * 100") - - # At a cold start, the table is unloaded so its owner is unknown. - # INVALIDATE METADATA <table> is denied since 'user' is not detected as the owner. - result = self.execute_query_expect_failure( - non_admin_client, "invalidate metadata functional.alltypestiny") - assert "User '{0}' does not have privileges to execute " \ - "'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \ - in str(result) - # Verify catalogd never loads metadata of this table - table_loaded_log = "Loaded metadata for: functional.alltypestiny" - self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=0) - - # Run a query to trigger metadata loading on the table - self.execute_query_expect_success( - non_admin_client, "describe functional.alltypestiny") - # Verify catalogd loads metadata of this table - self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=1) - - # INVALIDATE METADATA <table> is allowed since 'user' is detected as the owner. - self.execute_query_expect_success( - non_admin_client, "invalidate metadata functional.alltypestiny") - - # Run a query to trigger metadata loading on the table - self.execute_query_expect_success( - non_admin_client, "describe functional.alltypestiny") - # Verify catalogd loads metadata of this table - self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=2) - - # Verify REFRESH <table> is allowed since 'user' is detected as the owner. - self.execute_query_expect_success( - non_admin_client, "refresh functional.alltypestiny") - self.execute_query_expect_success( - non_admin_client, - "refresh functional.alltypestiny partition(year=2009, month=1)") - - # Clear the catalog cache and grant 'user' enough privileges - self.execute_query_expect_success( - admin_client, "invalidate metadata functional.alltypestiny") - admin_client.execute("grant refresh on table functional.alltypestiny to user {0}" - .format(user)) - try: - # Now 'user' should be able to run REFRESH/INVALIDATE even if the table is - # unloaded (not recognize it as the owner). - self.execute_query_expect_success( - non_admin_client, "invalidate metadata functional.alltypestiny") - self.execute_query_expect_success( - non_admin_client, "refresh functional.alltypestiny") - finally: - admin_client.execute( - "revoke refresh on table functional.alltypestiny from user {0}".format(user)) - finally: - TestRanger._remove_policy(unique_name) - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_masking_overload_coverage(self, vector, unique_name): """Test that we have cover all the overloads of the masking functions that could appear in using default policies.""" @@ -2371,8 +2564,6 @@ class TestRanger(CustomClusterTestSuite): TestRanger._remove_policy(policy_names.pop()) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_row_filtering(self, vector, unique_name): user = getuser() unique_database = unique_name + '_db' @@ -2545,21 +2736,11 @@ class TestRanger(CustomClusterTestSuite): ] for statement in cleanup_statements: try: - admin_client.execute(statement, user=ADMIN) - except Exception as e: - LOG.error("Ignored exception in cleanup: " + str(e)) - - def _verified_multiuser_results(self, admin_client, admin_query_tmpl, user_query, users, - user_clients): - assert len(users) == len(user_clients) - for i in range(len(users)): - admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % users[i])).get_data() - user_res = user_clients[i].execute(user_query).get_data() - assert admin_res == user_res + admin_client.execute(statement, user=ADMIN) + except Exception as e: + LOG.error("Ignored exception in cleanup: " + str(e)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_column_masking_and_row_filtering(self, vector, unique_name): user = getuser() admin_client = self.create_impala_client(user=ADMIN) @@ -2608,8 +2789,6 @@ class TestRanger(CustomClusterTestSuite): TestRanger._remove_policy(unique_name + str(i)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_iceberg_time_travel_with_masking(self, unique_name): """When we do a time travel query on an iceberg table we will use the schema from the time of the snapshot. Make sure this works when column masking is being used.""" @@ -2718,8 +2897,6 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_convert_table_to_iceberg(self, unique_name): """Test that autorization is taken into account when performing a table migration to Iceberg.""" @@ -2804,8 +2981,6 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_iceberg_metadata_table_privileges(self, unique_name): user = getuser() admin_client = self.create_impala_client(user=ADMIN) @@ -2839,29 +3014,6 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade".format(unique_database)) @pytest.mark.execute_serially - @SkipIf.is_test_jdk - @SkipIfFS.hive - @SkipIfHive2.ranger_auth - @CustomClusterTestSuite.with_args() - def test_hive_with_ranger_setup(self, vector): - """Test for setup of Hive-Ranger integration. Make sure future upgrades on - Hive/Ranger won't break the tool.""" - script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh') - try: - # Add the policy before restarting Hive. So it can take effect immediately after - # HiveServer2 starts. - TestRanger._add_column_masking_policy( - "col_mask_for_hive", getuser(), "functional", "alltypestiny", "id", "CUSTOM", - "{col} * 100") - check_call([script, '-with_ranger']) - self.run_test_case("QueryTest/hive_ranger_integration", vector) - finally: - check_call([script]) - TestRanger._remove_policy("col_mask_for_hive") - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_profile_protection(self): """Test that a requesting user is able to access the runtime profile or execution summary of a query involving a view only if the user is granted the privileges on all @@ -2920,218 +3072,40 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute(statement) @pytest.mark.execute_serially - @SkipIfFS.incorrent_reported_ec - @CustomClusterTestSuite.with_args( - # We additionally provide impalad and catalogd with the customized user-to-groups - # mapper since some test cases in grant_revoke.test require Impala to retrieve the - # groups a given user belongs to and such users might not exist in the underlying - # OS in the testing environment, e.g., the user 'non_owner'. - impalad_args="{0} {1}".format(IMPALAD_ARGS, - "--use_customized_user_groups_mapper_for_ranger"), - catalogd_args="{0} {1}".format(CATALOGD_ARGS, - "--use_customized_user_groups_mapper_for_ranger")) - def test_grant_revoke_with_role(self, vector): - """Test grant/revoke with role.""" - admin_client = self.create_impala_client(user=ADMIN) - try: - self.run_test_case('QueryTest/grant_revoke', vector, use_db="default") - finally: - # Below are the statements that need to be executed in order to clean up the - # privileges granted to the test roles as well as the test roles themselves. - # Note that we need to revoke those previously granted privileges so that each role - # is not referenced by any policy before we delete those roles. - # Moreover, we need to revoke the privilege on the database 'grant_rev_db' before - # dropping 'grant_rev_db'. Otherwise, the revocation would fail due to an - # AnalysisException thrown because 'grant_rev_db' does not exist. - cleanup_statements = [ - "revoke all on database grant_rev_db from grant_revoke_test_ALL_TEST_DB", - "revoke all on server from grant_revoke_test_ALL_SERVER", - "revoke all on table functional.alltypes from grant_revoke_test_NON_OWNER", - "revoke grant option for all on database functional " - "from grant_revoke_test_NON_OWNER", - "REVOKE SELECT (a, b, c, d, e, x, y) ON TABLE grant_rev_db.test_tbl3 " - "FROM grant_revoke_test_ALL_SERVER", - "REVOKE ALL ON DATABASE functional FROM grant_revoke_test_NON_OWNER", - "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_test_NON_OWNER", - "REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 " - "FROM grant_revoke_test_ALL_SERVER", - "REVOKE SELECT ON TABLE grant_rev_db.test_tbl1 " - "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL", - "REVOKE INSERT ON TABLE grant_rev_db.test_tbl1 " - "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL", - "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 " - "FROM grant_revoke_test_NON_OWNER", - "REVOKE SELECT (a) ON TABLE grant_rev_db.test_tbl3 " - "FROM grant_revoke_test_NON_OWNER", - "REVOKE SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 " - "FROM grant_revoke_test_ALL_SERVER", - "revoke all on server server1 from grant_revoke_test_ALL_SERVER1", - "revoke select(col1) on table grant_rev_db.test_tbl4 " - "from role grant_revoke_test_COLUMN_PRIV", - "{0}{1}{2}".format("revoke all on uri '", - os.getenv("FILESYSTEM_PREFIX"), - "/test-warehouse/grant_rev_test_tbl2'" - "from grant_revoke_test_ALL_URI"), - "{0}{1}{2}".format("revoke all on uri '", - os.getenv("FILESYSTEM_PREFIX"), - "/test-warehouse/GRANT_REV_TEST_TBL3'" - "from grant_revoke_test_ALL_URI"), - "{0}{1}{2}".format("revoke all on uri '", - os.getenv("FILESYSTEM_PREFIX"), - "/test-warehouse/grant_rev_test_prt'" - "from grant_revoke_test_ALL_URI"), - "drop role grant_revoke_test_ALL_TEST_DB", - "drop role grant_revoke_test_ALL_SERVER", - "drop role grant_revoke_test_SELECT_INSERT_TEST_TBL", - "drop role grant_revoke_test_ALL_URI", - "drop role grant_revoke_test_NON_OWNER", - "drop role grant_revoke_test_ALL_SERVER1", - "drop role grant_revoke_test_COLUMN_PRIV", - "drop database grant_rev_db cascade" - ] - - for statement in cleanup_statements: - try: - admin_client.execute(statement) - except Exception: - # There could be an exception thrown due to the non-existence of the role or - # resource involved in a statement that aims to revoke the privilege on a - # resource from a role, but we do not have to handle such an exception. We only - # need to make sure in the case when the role and the corresponding resource - # exist, the granted privilege is revoked. The same applies to the case when we - # drop a role. - pass - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) def test_select_view_created_by_non_superuser_with_catalog_v1(self, unique_name): self._test_select_view_created_by_non_superuser(unique_name) - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( + [email protected]_args( impalad_args=LOCAL_CATALOG_IMPALAD_ARGS, catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS) - def test_select_view_created_by_non_superuser_with_local_catalog(self, unique_name): - self._test_select_view_created_by_non_superuser(unique_name) - - def _test_select_view_created_by_non_superuser(self, unique_name): - """Test that except for the necessary privileges on the view, the requesting user - has to be granted the necessary privileges on the underlying tables as well in order - to access a view with its table property of 'Authorized' set to false.""" - grantee_user = "non_owner" - admin_client = self.create_impala_client(user=ADMIN) - non_owner_client = self.create_impala_client(user=grantee_user) - unique_database = unique_name + "_db" - test_tbl_1 = unique_name + "_tbl_1" - test_tbl_2 = unique_name + "_tbl_2" - test_view = unique_name + "_view" - - try: - # Set up temp database, tables, and the view. - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - admin_client.execute("create database {0}".format(unique_database)) - admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)" - .format(unique_database, test_tbl_1)) - admin_client.execute("create table {0}.{1} (id int, string_col string)" - .format(unique_database, test_tbl_2)) - admin_client.execute("create view {0}.{1} (abc, xyz) as " - "select count(a.bigint_col), b.string_col " - "from {0}.{2} a inner join {0}.{3} b on a.id = b.id " - "group by b.string_col having count(a.bigint_col) > 1" - .format(unique_database, test_view, test_tbl_1, test_tbl_2)) - # Add this table property to simulate a view created by a non-superuser. - self.run_stmt_in_hive("alter view {0}.{1} " - "set tblproperties ('Authorized' = 'false')" - .format(unique_database, test_view)) - - admin_client.execute("grant select(abc) on table {0}.{1} to user {2}" - .format(unique_database, test_view, grantee_user)) - admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}" - .format(unique_database, test_view, grantee_user)) - admin_client.execute("grant select on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_2, grantee_user)) - admin_client.execute("refresh authorization") - - result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view)) - assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ - "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) - - admin_client.execute("grant select(id) on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_1, grantee_user)) - admin_client.execute("refresh authorization") - # The query is not authorized since the SELECT privilege on the column 'bigint_col' - # in the table 'test_tbl_1' is also required. - result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view)) - assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ - "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result) - - admin_client.execute("grant select(bigint_col) on table {0}.{1} to user {2}" - .format(unique_database, test_tbl_1, grantee_user)) - admin_client.execute("refresh authorization") - - # The query is authorized successfully once sufficient privileges are granted. - self.execute_query_expect_success(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view)) +class TestRangerLocalCatalog(TestRanger): + """ + Tests for Apache Ranger integration with Apache Impala in local catalog mode. + Test methods shares common cluster. + """ - # Add a deny policy to prevent 'grantee_user' from accessing the column 'id' in - # the table 'test_tbl_2', on which 'grantee_user' had been granted the SELECT - # privilege. - TestRanger._add_deny_policy(unique_name, grantee_user, unique_database, test_tbl_2, - "id") - admin_client.execute("refresh authorization") + @pytest.mark.execute_serially + def test_grant_revoke_with_local_catalog(self, unique_name): + """Tests grant/revoke with catalog v2 (local catalog).""" + self._test_grant_revoke(unique_name, [None, "invalidate metadata", + "refresh authorization"]) - # The query is not authorized since the SELECT privilege on the column 'id' in the - # table 'test_tbl_2' has been denied in the policy above. - result = self.execute_query_expect_failure(non_owner_client, - "select * from {0}.{1}".format(unique_database, test_view)) - assert "User '{0}' does not have privileges to execute 'SELECT' on: " \ - "{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in str(result) - finally: - admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}" - .format(unique_database, test_view, grantee_user)) - admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}" - .format(unique_database, test_view, grantee_user)) - admin_client.execute("revoke select(id) on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_1, grantee_user)) - admin_client.execute("revoke select(bigint_col) on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_1, grantee_user)) - admin_client.execute("revoke select on table {0}.{1} from user {2}" - .format(unique_database, test_tbl_2, grantee_user)) - admin_client.execute("drop database if exists {0} cascade" - .format(unique_database)) - TestRanger._remove_policy(unique_name) + @pytest.mark.execute_serially + def test_local_catalog_ownership(self): + # getTableIfCached() in LocalCatalog loads a minimal incomplete table + # that does not include the ownership information. Hence show tables + # *never* show owned tables. TODO(bharathv): Fix in a follow up commit + pytest.xfail("getTableIfCached() faulty behavior, known issue") + self._test_ownership() @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args=LOCAL_CATALOG_IMPALAD_ARGS, - catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS, - # We additionally set 'reset_ranger' to True, to reset all the policies in the - # Ranger service, so even if there were roles before this test, they will be - # deleted when this test runs. since the Ranger policies are reset before this - # test, we do not have to worry about there could be existing roles when the test - # is running. - reset_ranger=True) - def test_no_exception_in_show_roles_if_no_roles_in_ranger(self): - self._test_no_exception_in_show_roles_if_no_roles_in_ranger() + def test_grant_revoke_by_owner_local_catalog(self, unique_name): + self._test_grant_revoke_by_owner(unique_name) - def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self): - """ - Ensure that no exception should throw for show roles statement - if there are no roles in ranger. - """ - admin_client = self.create_impala_client(user=ADMIN) - show_roles_statements = [ - "SHOW ROLES", - "SHOW CURRENT ROLES", - "SHOW ROLE GRANT GROUP admin" - ] - for statement in show_roles_statements: - result = self.execute_query_expect_success(admin_client, statement) - assert len(result.data) == 0 + @pytest.mark.execute_serially + def test_select_view_created_by_non_superuser_with_local_catalog(self, unique_name): + self._test_select_view_created_by_non_superuser(unique_name) class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):
