This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 44e9b6f97d8613e19bad3a6a4e3a0f09c6bb64a4
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 15 14:31:40 2025 -0700

    IMPALA-14078: Reorganize test_ranger.py to share minicluster
    
    test_ranger.py is a custom cluster test consisting of 41 test methods.
    Each test method require minicluster restart. With IMPALA-13503, we can
    reorganize TestRanger class into 3 separate test class:
    TestRangerIndependent, TestRangerLegacyCatalog, and
    TestRangerLocalCatalog. Both TestRangerLegacyCatalog and
    TestRangerLocalCatalog can maintain the same minicluster without
    restarting it in between.
    
    Testing:
    - Run and pass test_ranger.py in exhaustive mode.
    - Confirmed that no test is missing after reorganization.
    
    Change-Id: I01ff2b3e98fccfffa8bcdfe1177be98634363b56
    Reviewed-on: http://gerrit.cloudera.org:8080/22905
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/authorization/test_ranger.py | 2120 ++++++++++++++++++------------------
 1 file changed, 1047 insertions(+), 1073 deletions(-)

diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index b43bc4d9a..034756961 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -69,253 +69,16 @@ LOG = logging.getLogger('impala_test_suite')
 
 class TestRanger(CustomClusterTestSuite):
   """
-  Tests for Apache Ranger integration with Apache Impala.
+  Base class for Apache Ranger integration test with Apache Impala.
+  This class only contains common helper or base test method.
+  Pytest method should be declared in either of TestRangerIndependent,
+  TestRangerLegacyCatalog, or TestRangerLocalCatalog.
   """
 
   @classmethod
   def default_test_protocol(cls):
       return HS2
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", 
dir=os.getenv("LOG_DIR")),
-    impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"),
-    catalogd_args=CATALOGD_ARGS,
-    disable_log_buffering=True)
-  def test_xff_ranger_audit(self):
-    """
-    Tests XFF client IP is included in ranger audit logs when using hs2-http 
protocol
-    """
-    # Iterate over test vector within test function to avoid restarting 
cluster.
-    for vector in\
-        [ImpalaTestVector([value]) for value in 
create_client_protocol_dimension()]:
-      protocol = vector.get_value("protocol")
-      if protocol != "hs2-http":
-        continue
-
-      # Query with XFF header in client request
-      args = ['--protocol=hs2-http',
-              '--hs2_x_forward= 10.20.30.40 ',
-              '-q', 'select count(1) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Query with XFF header in client request
-      args = ['--protocol=hs2-http',
-              '--hs2_x_forward=10.20.30.40, 1.1.2.3, 127.0.0.6',
-              '-q', 'select count(1) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Query with XFF header in client request
-      args = ['--protocol=hs2-http',
-              '--hs2_x_forward=10.20.30.40,1.1.2.3,127.0.0.6',
-              '-q', 'select count(1) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Query without XFF header in client request
-      args = ['--protocol=hs2-http',
-              '-q', 'select count(2) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Query with empty XFF header in client request
-      args = ['--protocol=hs2-http',
-              '--hs2_x_forward=    ',
-              '-q', 'select count(2) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Query with invalid XFF header in client request
-      args = ['--protocol=hs2-http',
-              '--hs2_x_forward=foobar',
-              '-q', 'select count(3) from functional.alltypes']
-      run_impala_shell_cmd(vector, args)
-
-      # Shut down cluster to ensure logs flush to disk.
-      sleep(5)
-      self._stop_impala_cluster()
-
-      # Expected audit log string
-      expected_string_valid_xff = (
-        '"access":"select",'
-        '"resource":"functional/alltypes",'
-        '"resType":"@table",'
-        '"action":"select",'
-        '"result":1,'
-        '"agent":"impala",'
-        r'"policy":\d,'
-        '"enforcer":"ranger-acl",'
-        '"cliIP":"%s",'
-        '"reqData":"%s",'
-        '".+":".+","logType":"RangerAudit"'
-      )
-
-      # Ensure audit logs were logged in coordinator logs
-      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
-          ("10.20.30.40", r"select count\(1\) from functional.alltypes"),
-          expected_count=3)
-      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
-          ("127.0.0.1", r"select count\(2\) from functional.alltypes"), 
expected_count=2)
-      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
-          ("foobar", r"select count\(3\) from functional.alltypes"), 
expected_count=1)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_grant_revoke_with_catalog_v1(self, unique_name):
-    """Tests grant/revoke with catalog v1."""
-    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
-                                          "refresh authorization"])
-
-  @pytest.mark.execute_serially
-  @SkipIfFS.hdfs_acls
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_insert_with_catalog_v1(self, unique_name):
-    """
-    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
-    Impala does not throw an AnalysisException when an authorized user tries 
to execute
-    an INSERT query against a partitioned table of which the respective table 
path and
-    the partition path are not writable according to HDFS permission.
-    """
-    user = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_database = unique_name + "_db"
-    unique_table = unique_name + "_tbl"
-    partition_column = "year"
-    partition_value = "2008"
-    table_path = "test-warehouse/{0}.db/{1}".format(unique_database, 
unique_table)
-    table_partition_path = "{0}/{1}={2}"\
-        .format(table_path, partition_column, partition_value)
-    insert_statement = "insert into {0}.{1} (name) partition ({2}) " \
-        "values (\"Adam\", {3})".format(unique_database, unique_table, 
partition_column,
-        partition_value)
-    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
-        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
-    try:
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-      admin_client.execute("create database {0}".format(unique_database))
-      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int)"
-          .format(unique_database, unique_table, partition_column))
-      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
-          .format(unique_database, unique_table, partition_column, 
partition_value))
-
-      # Change the owner user and group of the HDFS paths corresponding to the 
table and
-      # the partition so that according to Impala's FsPermissionChecker, the 
table is not
-      # writable to the user that loads the table. This user usually is the one
-      # representing the Impala service. Before IMPALA-11871, changing either 
the table
-      # path or the partition path to non-writable would result in an 
AnalysisException.
-      self.hdfs_client.chown(table_path, "another_user", "another_group")
-      self.hdfs_client.chown(table_partition_path, "another_user", 
"another_group")
-      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
-      # table and the related partition(s).
-      admin_client.execute("invalidate metadata {0}.{1}"
-          .format(unique_database, unique_table))
-
-      # Verify that the INSERT statement fails with AuthorizationException 
because the
-      # requesting user does not have the INSERT privilege on the table.
-      result = self._run_query_as_user(insert_statement, user, False)
-      assert authz_err in str(result)
-
-      admin_client.execute("grant insert on table {0}.{1} to user {2}"
-          .format(unique_database, unique_table, user))
-      # Verify that the INSERT statement succeeds without AnalysisException.
-      self._run_query_as_user(insert_statement, user, True)
-    finally:
-      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
-          .format(unique_database, unique_table, user))
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-
-  @pytest.mark.execute_serially
-  @SkipIfFS.hdfs_acls
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_load_data_with_catalog_v1(self, unique_name):
-    """
-    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
-    Impala does not throw an AnalysisException when an authorized user tries 
to execute
-    a LOAD DATA query against a table partition of which the respective 
partition path is
-    not writable according to Impala's FsPermissionChecker.
-    """
-    user = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_database = unique_name + "_db"
-    unique_table = unique_name + "_tbl"
-    partition_column = "year"
-    partition_value = "2008"
-    destination_table_path = "test-warehouse/{0}.db/{1}" \
-        .format(unique_database, unique_table, )
-    destination_table_partition_path = "{0}/{1}={2}"\
-        .format(destination_table_path, partition_column, partition_value)
-    file_name = "load_data_with_catalog_v1.txt"
-    files_for_table = ["testdata/data/{0}".format(file_name)]
-    source_hdfs_dir = "/tmp"
-    load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \
-        "partition ({4}={5})".format(source_hdfs_dir, file_name, 
unique_database,
-        unique_table, partition_column, partition_value)
-    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
-        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
-    try:
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-      admin_client.execute("create database {0}".format(unique_database))
-      copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir)
-      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int) "
-          "row format delimited fields terminated by ',' "
-          "stored as textfile".format(unique_database, unique_table, 
partition_column))
-      # We need to add the partition. Otherwise, the LOAD DATA statement can't 
create new
-      # partitions.
-      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
-          .format(unique_database, unique_table, partition_column, 
partition_value))
-
-      # Change the permissions of the HDFS path of the destination table 
partition.
-      # Before IMPALA-11871, even we changed the table path to non-writable, 
loading
-      # data into the partition was still allowed if the destination partition 
path
-      # was writable according to Impala's FsPermissionChecker. But if the 
destination
-      # partition path was not writable, an AnalysisException would be thrown.
-      self.hdfs_client.chown(destination_table_partition_path, "another_user",
-          "another_group")
-      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
-      # table and the related partition(s).
-      admin_client.execute("invalidate metadata {0}.{1}"
-          .format(unique_database, unique_table))
-
-      # To execute the LOAD DATA statement, a user has to be granted the ALL 
privilege
-      # on the source HDFS path and the INSERT privilege on the destination 
table.
-      # The following verifies the LOAD DATA statement fails with 
AuthorizationException
-      # due to insufficient privileges.
-      result = self._run_query_as_user(load_data_statement, user, False)
-      assert authz_err in str(result)
-
-      admin_client.execute("grant all on uri '{0}/{1}' to user {2}"
-          .format(source_hdfs_dir, file_name, user))
-      # The following verifies the ALL privilege on the source file alone is 
not
-      # sufficient to execute the LOAD DATA statement.
-      result = self._run_query_as_user(load_data_statement, user, False)
-      assert authz_err in str(result)
-
-      admin_client.execute("grant insert on table {0}.{1} to user {2}"
-          .format(unique_database, unique_table, user))
-      # Verify the LOAD DATA statement fails without AnalysisException.
-      self._run_query_as_user(load_data_statement, user, True)
-    finally:
-      admin_client.execute("revoke all on uri '{0}/{1}' from user {2}"
-          .format(source_hdfs_dir, file_name, user))
-      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
-          .format(unique_database, unique_table, user))
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-      self.filesystem_client.delete_file_dir("{0}/{1}"
-          .format(source_hdfs_dir, file_name))
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args="{0} {1}".format(IMPALAD_ARGS, "--use_local_catalog=true"),
-    catalogd_args="{0} {1}".format(CATALOGD_ARGS, 
"--catalog_topic_mode=minimal"))
-  def test_grant_revoke_with_local_catalog(self, unique_name):
-    """Tests grant/revoke with catalog v2 (local catalog)."""
-    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
-                                          "refresh authorization"])
-
   def _test_grant_revoke(self, unique_name, refresh_statements):
     user = getuser()
     admin_client = self.create_impala_client(user=ADMIN)
@@ -358,121 +121,16 @@ class TestRanger(CustomClusterTestSuite):
           admin_client.execute("drop database if exists {0} cascade"
                                .format(unique_database))
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_grant_option(self, unique_name):
-    user1 = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_database = unique_name + "_db"
-    unique_table = unique_name + "_tbl"
+  def _update_privileges_and_verify(self, admin_client, update_stmt, 
show_grant_stmt,
+                                    expected_privileges):
+    admin_client.execute(update_stmt)
+    result = self.client.execute(show_grant_stmt)
+    TestRanger._check_privileges(result, expected_privileges)
+
+  def _test_show_grant_without_on(self, kw, ident):
+    self.execute_query_expect_failure(self.client, "show grant {0} 
{1}".format(kw, ident))
 
-    try:
-      # Set-up temp database/table
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
-      admin_client.execute("create database {0}".format(unique_database))
-      admin_client.execute("create table {0}.{1} (x int)"
-                           .format(unique_database, unique_table))
-
-      # Give user 1 the ability to grant select privileges on unique_database
-      self.execute_query_expect_success(admin_client,
-                                        "grant select on database {0} to user 
{1} with "
-                                        "grant option".format(unique_database, 
user1))
-      self.execute_query_expect_success(admin_client,
-                                        "grant insert on database {0} to user 
{1} with "
-                                        "grant option".format(unique_database, 
user1))
-
-      # Verify user 1 has with_grant privilege on unique_database
-      result = self.execute_query("show grant user {0} on database {1}"
-                                  .format(user1, unique_database))
-      TestRanger._check_privileges(result, [
-          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"true"],
-          ["USER", user1, unique_database, "", "", "", "", "", "*", "select", 
"true"],
-          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"true"],
-          ["USER", user1, unique_database, "*", "*", "", "", "", "", "select", 
"true"]])
-
-      # Revoke select privilege and check grant option is still present
-      self.execute_query_expect_success(admin_client,
-                                        "revoke select on database {0} from 
user {1}"
-                                        .format(unique_database, user1))
-      result = self.execute_query("show grant user {0} on database {1}"
-                                  .format(user1, unique_database))
-      # Revoking the select privilege also deprives the grantee of the 
permission to
-      # transfer other privilege(s) on the same resource to other principals. 
This is a
-      # current limitation of Ranger since privileges on the same resource 
share the same
-      # delegateAdmin field in the corresponding RangerPolicyItem.
-      TestRanger._check_privileges(result, [
-          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"false"],
-          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"false"]])
-
-      # Revoke privilege granting from user 1
-      self.execute_query_expect_success(admin_client, "revoke grant option for 
insert "
-                                                      "on database {0} from 
user {1}"
-                                        .format(unique_database, user1))
-
-      # User 1 can no longer grant privileges on unique_database
-      # In ranger it is currently not possible to revoke grant for a single 
access type
-      result = self.execute_query("show grant user {0} on database {1}"
-                                  .format(user1, unique_database))
-      TestRanger._check_privileges(result, [
-          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"false"],
-          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"false"]])
-    finally:
-      admin_client.execute("revoke insert on database {0} from user {1}"
-                           .format(unique_database, user1))
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
-
-  def _update_privileges_and_verify(self, admin_client, update_stmt, 
show_grant_stmt,
-                                    expected_privileges):
-    admin_client.execute(update_stmt)
-    result = self.client.execute(show_grant_stmt)
-    TestRanger._check_privileges(result, expected_privileges)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_show_grant(self, unique_name):
-    user = getuser()
-    group = grp.getgrnam(getuser()).gr_name
-    test_data = [(user, "USER"), (group, "GROUP")]
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_db = unique_name + "_db"
-    unique_table = unique_name + "_tbl"
-    udf = "identity"
-
-    try:
-      # Create test database/table
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-      admin_client.execute("create database {0}".format(unique_db))
-      admin_client.execute("create table {0}.{1} (x int)"
-                           .format(unique_db, unique_table))
-
-      for data in test_data:
-        # Test basic show grant functionality for user/group
-        self._test_show_grant_basic(admin_client, data[1], data[0], unique_db,
-                                    unique_table, udf)
-        # Test that omitting ON <resource> results in failure
-        self._test_show_grant_without_on(data[1], data[0])
-
-        # Test inherited privileges (server privileges show for database, etc.)
-        self._test_show_grant_inherited(admin_client, data[1], data[0], 
unique_db,
-                                        unique_table)
-
-      # Test ALL privilege hides other privileges
-      self._test_show_grant_mask(admin_client, user)
-
-      # Test ALL privilege on UDF hides other privileges
-      self._test_show_grant_mask_on_udf(admin_client, data[1], data[0], 
unique_db, udf)
-
-      # Test USER inherits privileges for their GROUP
-      self._test_show_grant_user_group(admin_client, user, group, unique_db)
-    finally:
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-
-  def _test_show_grant_without_on(self, kw, ident):
-    self.execute_query_expect_failure(self.client, "show grant {0} 
{1}".format(kw, ident))
-
-  def _test_show_grant_user_group(self, admin_client, user, group, unique_db):
+  def _test_show_grant_user_group(self, admin_client, user, group, unique_db):
     try:
       result = self.client.execute("show grant user {0} on database {1}"
                                    .format(user, unique_db))
@@ -890,113 +548,6 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("revoke all on table {0}.{1} from {2} {3}"
                            .format(unique_database, unique_table, kw, id))
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_grant_revoke_ranger_api(self, unique_name):
-    user = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_db = unique_name + "_db"
-    resource = {
-      "database": unique_db,
-      "column": "*",
-      "table": "*"
-    }
-    access = ["select", "create"]
-
-    try:
-      # Create the test database
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-      admin_client.execute("create database {0}".format(unique_db))
-
-      # Grant privileges via Ranger REST API
-      TestRanger._grant_ranger_privilege(user, resource, access)
-
-      # Privileges should be stale before a refresh
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-      TestRanger._check_privileges(result, [])
-
-      # Refresh and check updated privileges
-      admin_client.execute("refresh authorization")
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-
-      TestRanger._check_privileges(result, [
-          ["USER", user, unique_db, "*", "*", "", "", "", "", "create", 
"false"],
-          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
-      ])
-
-      # Revoke privileges via Ranger REST API
-      TestRanger._revoke_ranger_privilege(user, resource, access)
-
-      # Privileges should be stale before a refresh
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-      TestRanger._check_privileges(result, [
-          ["USER", user, unique_db, "*", "*", "", "", "", "", "create", 
"false"],
-          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
-      ])
-
-      # Refresh and check updated privileges
-      admin_client.execute("refresh authorization")
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-
-      TestRanger._check_privileges(result, [])
-    finally:
-      admin_client.execute("revoke all on database {0} from user {1}"
-                           .format(unique_db, user))
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-
-  @pytest.mark.execute_serially
-  @SkipIf.is_test_jdk
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
-  def test_show_grant_hive_privilege(self, unique_name):
-    user = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_db = unique_name + "_db"
-    resource = {
-      "database": unique_db,
-      "column": "*",
-      "table": "*"
-    }
-    access = ["lock", "select"]
-
-    try:
-      TestRanger._grant_ranger_privilege(user, resource, access)
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-      admin_client.execute("create database {0}".format(unique_db))
-
-      admin_client.execute("refresh authorization")
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-
-      TestRanger._check_privileges(result, [
-          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
-      ])
-
-      # Assert that lock, select privilege exists in Ranger server
-      assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
-      assert "select" in TestRanger._get_ranger_privileges_db(user, unique_db)
-
-      admin_client.execute("revoke select on database {0} from user {1}"
-                           .format(unique_db, user))
-
-      # Assert that lock is still present and select is revoked in Ranger 
server
-      assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
-      assert "select" not in TestRanger._get_ranger_privileges_db(user, 
unique_db)
-
-      admin_client.execute("refresh authorization")
-      result = self.client.execute("show grant user {0} on database {1}"
-                                   .format(user, unique_db))
-
-      TestRanger._check_privileges(result, [])
-    finally:
-      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
-      TestRanger._revoke_ranger_privilege(user, resource, access)
-
   @staticmethod
   def _grant_ranger_privilege(user, resource, access):
     data = {
@@ -1259,20 +810,6 @@ class TestRanger(CustomClusterTestSuite):
           impala_client, query, query_options={'sync_ddl': 1})
     return self.execute_query_expect_failure(impala_client, query)
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
-  def test_grant_multiple_columns(self):
-    self._test_grant_multiple_columns(13)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS,
-    catalogd_args="{0} {1}".format(CATALOGD_ARGS, 
"--consolidate_grant_revoke_requests"),
-    reset_ranger=True)
-  def test_grant_multiple_columns_consolidate_grant_revoke_requests(self):
-    self._test_grant_multiple_columns(1)
-
   def _test_grant_multiple_columns(self, expected_num_policies):
     admin_client = self.create_impala_client(user=ADMIN)
     access_type = "select"
@@ -1336,109 +873,20 @@ class TestRanger(CustomClusterTestSuite):
                 break
     return result
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_unsupported_sql(self):
-    """Tests unsupported SQL statements when running with Ranger."""
+  def _test_grant_revoke_by_owner(self, unique_name):
+    non_owner_user = NON_OWNER
+    non_owner_group = NON_OWNER
+    grantee_role = "grantee_role"
+    resource_owner_role = OWNER_USER
     admin_client = self.create_impala_client(user=ADMIN)
-    error_msg = "UnsupportedOperationException: SHOW GRANT is not supported 
without a " \
-        "defined resource in Ranger."
-    statement = "show grant role foo"
-    result = self.execute_query_expect_failure(admin_client, statement)
-    assert error_msg in str(result)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_grant_revoke_invalid_principal(self):
-    """Tests grant/revoke to/from invalid principal should return more readable
-       error messages."""
-    valid_user = "admin"
-    invalid_user = "invalid_user"
-    invalid_group = "invalid_group"
-    # TODO(IMPALA-8640): Create two different Impala clients because the users 
to
-    # workaround the bug.
-    invalid_impala_client = self.create_impala_client(user=invalid_user)
-    valid_impala_client = self.create_impala_client(user=valid_user)
-    for statement in ["grant select on table functional.alltypes to user {0}"
-                      .format(getuser()),
-                      "revoke select on table functional.alltypes from user 
{0}"
-                      .format(getuser())]:
-      result = self.execute_query_expect_failure(invalid_impala_client, 
statement)
-      if "grant" in statement:
-        assert "Error granting a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantor user invalid_user doesn't exist" in 
str(result)
-      else:
-        assert "Error revoking a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantor user invalid_user doesn't exist" in 
str(result)
-
-    for statement in ["grant select on table functional.alltypes to user {0}"
-                      .format(invalid_user),
-                      "revoke select on table functional.alltypes from user 
{0}"
-                      .format(invalid_user)]:
-      result = self.execute_query_expect_failure(valid_impala_client, 
statement)
-      if "grant" in statement:
-        assert "Error granting a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantee user invalid_user doesn't exist" in 
str(result)
-      else:
-        assert "Error revoking a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantee user invalid_user doesn't exist" in 
str(result)
-
-    for statement in ["grant select on table functional.alltypes to group {0}"
-                      .format(invalid_group),
-                      "revoke select on table functional.alltypes from group 
{0}"
-                      .format(invalid_group)]:
-      result = self.execute_query_expect_failure(valid_impala_client, 
statement)
-      if "grant" in statement:
-        assert "Error granting a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantee group invalid_group doesn't exist" in 
str(result)
-      else:
-        assert "Error revoking a privilege in Ranger. Ranger error message: " \
-               "HTTP 403 Error: Grantee group invalid_group doesn't exist" in 
str(result)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_legacy_catalog_ownership(self):
-      self._test_ownership()
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
-  def test_local_catalog_ownership(self):
-      # getTableIfCached() in LocalCatalog loads a minimal incomplete table
-      # that does not include the ownership information. Hence show tables
-      # *never* show owned tables. TODO(bharathv): Fix in a follow up commit
-      pytest.xfail("getTableIfCached() faulty behavior, known issue")
-      self._test_ownership()
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
-  def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
-    self._test_grant_revoke_by_owner(unique_name)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
-  def test_grant_revoke_by_owner_local_catalog(self, unique_name):
-    self._test_grant_revoke_by_owner(unique_name)
-
-  def _test_grant_revoke_by_owner(self, unique_name):
-    non_owner_user = NON_OWNER
-    non_owner_group = NON_OWNER
-    grantee_role = "grantee_role"
-    resource_owner_role = OWNER_USER
-    admin_client = self.create_impala_client(user=ADMIN)
-    unique_database = unique_name + "_db"
-    table_name = "tbl"
-    column_names = ["a", "b"]
-    udf_uri = "/test-warehouse/impala-hive-udfs.jar"
-    udf_name = "identity"
-    test_data = [("USER", non_owner_user), ("GROUP", non_owner_group),
-        ("ROLE", grantee_role)]
-    privileges = ["alter", "drop", "create", "insert", "select", "refresh"]
+    unique_database = unique_name + "_db"
+    table_name = "tbl"
+    column_names = ["a", "b"]
+    udf_uri = "/test-warehouse/impala-hive-udfs.jar"
+    udf_name = "identity"
+    test_data = [("USER", non_owner_user), ("GROUP", non_owner_group),
+        ("ROLE", grantee_role)]
+    privileges = ["alter", "drop", "create", "insert", "select", "refresh"]
 
     try:
       admin_client.execute("create role {}".format(grantee_role))
@@ -1721,9 +1169,1000 @@ class TestRanger(CustomClusterTestSuite):
         grantee_type, grantee), OWNER_USER, False)
     assert ERROR_REVOKE in str(result)
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def _test_allow_catalog_cache_op_from_masked_users(self, unique_name):
+    """Verify that catalog cache operations are allowed for masked users
+    when allow_catalog_cache_op_from_masked_users=true."""
+    user = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    non_admin_client = self.create_impala_client(user=user)
+    try:
+      # Create a column masking policy on 'user' which is also the owner of 
the table
+      TestRanger._add_column_masking_policy(
+          unique_name, user, "functional", "alltypestiny", "id",
+          "CUSTOM", "id * 100")
+
+      # At a cold start, the table is unloaded so its owner is unknown.
+      # INVALIDATE METADATA <table> is denied since 'user' is not detected as 
the owner.
+      result = self.execute_query_expect_failure(
+          non_admin_client, "invalidate metadata functional.alltypestiny")
+      assert "User '{0}' does not have privileges to execute " \
+             "'INVALIDATE METADATA/REFRESH' on: 
functional.alltypestiny".format(user) \
+             in str(result)
+      # Verify catalogd never loads metadata of this table
+      table_loaded_log = "Loaded metadata for: functional.alltypestiny"
+      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=0)
+
+      # Run a query to trigger metadata loading on the table
+      self.execute_query_expect_success(
+        non_admin_client, "describe functional.alltypestiny")
+      # Verify catalogd loads metadata of this table
+      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=1)
+
+      # INVALIDATE METADATA <table> is allowed since 'user' is detected as the 
owner.
+      self.execute_query_expect_success(
+          non_admin_client, "invalidate metadata functional.alltypestiny")
+
+      # Run a query to trigger metadata loading on the table
+      self.execute_query_expect_success(
+          non_admin_client, "describe functional.alltypestiny")
+      # Verify catalogd loads metadata of this table
+      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=2)
+
+      # Verify REFRESH <table> is allowed since 'user' is detected as the 
owner.
+      self.execute_query_expect_success(
+          non_admin_client, "refresh functional.alltypestiny")
+      self.execute_query_expect_success(
+          non_admin_client,
+          "refresh functional.alltypestiny partition(year=2009, month=1)")
+
+      # Clear the catalog cache and grant 'user' enough privileges
+      self.execute_query_expect_success(
+          admin_client, "invalidate metadata functional.alltypestiny")
+      admin_client.execute("grant refresh on table functional.alltypestiny to 
user {0}"
+                           .format(user))
+      try:
+        # Now 'user' should be able to run REFRESH/INVALIDATE even if the 
table is
+        # unloaded (not recognize it as the owner).
+        self.execute_query_expect_success(
+            non_admin_client, "invalidate metadata functional.alltypestiny")
+        self.execute_query_expect_success(
+            non_admin_client, "refresh functional.alltypestiny")
+      finally:
+        admin_client.execute(
+            "revoke refresh on table functional.alltypestiny from user 
{0}".format(user))
+    finally:
+      TestRanger._remove_policy(unique_name)
+
+  def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
+    """
+    Ensure that no exception should throw for show roles statement
+    if there are no roles in ranger.
+    """
+    admin_client = self.create_impala_client(user=ADMIN)
+    show_roles_statements = [
+      "SHOW ROLES",
+      "SHOW CURRENT ROLES",
+      "SHOW ROLE GRANT GROUP admin"
+    ]
+    for statement in show_roles_statements:
+      result = self.execute_query_expect_success(admin_client, statement)
+      assert len(result.data) == 0
+
+  def _test_ownership(self):
+    """Tests ownership privileges for databases and tables with ranger along 
with
+    some known quirks in the implementation."""
+    test_user = getuser()
+    test_role = 'test_role'
+    test_db = "test_ranger_ownership_" + get_random_id(5).lower()
+    # Create a test database as "admin" user. Owner is set accordingly.
+    self._run_query_as_user("create database {0}".format(test_db), ADMIN, True)
+    try:
+      # Try to create a table under test_db as current user. It should fail.
+      self._run_query_as_user(
+          "create table {0}.foo(a int)".format(test_db), test_user, False)
+
+      # Change the owner of the database to the current user.
+      self._run_query_as_user(
+          "alter database {0} set owner user {1}".format(test_db, test_user), 
ADMIN, True)
+
+      self._run_query_as_user("refresh authorization", ADMIN, True)
+
+      # Create should succeed now.
+      self._run_query_as_user(
+          "create table {0}.foo(a int)".format(test_db), test_user, True)
+      # Run show tables on the db. The resulting list should be empty. This 
happens
+      # because the created table's ownership information is not aggressively 
cached
+      # by the current Catalog implementations. Hence the analysis pass does 
not
+      # have access to the ownership information to verify if the current 
session
+      # user is actually the owner. We need to fix this by caching the HMS 
metadata
+      # more aggressively when the table loads. TODO(IMPALA-8937).
+      result = \
+          self._run_query_as_user("show tables in {0}".format(test_db), 
test_user, True)
+      assert len(result.data) == 0
+      # Run a simple query that warms up the table metadata and repeat SHOW 
TABLES.
+      self._run_query_as_user("select * from {0}.foo".format(test_db), 
test_user, True)
+      result = \
+          self._run_query_as_user("show tables in {0}".format(test_db), 
test_user, True)
+      assert len(result.data) == 1
+      assert "foo" in result.data
+      # Change the owner of the db back to the admin user
+      self._run_query_as_user(
+          "alter database {0} set owner user {1}".format(test_db, ADMIN), 
ADMIN, True)
+      result = self._run_query_as_user(
+          "show tables in {0}".format(test_db), test_user, False)
+      err = "User '{0}' does not have privileges to access: {1}.*.*". \
+          format(test_user, test_db)
+      assert err in str(result)
+      # test_user is still the owner of the table, so select should work fine.
+      self._run_query_as_user("select * from {0}.foo".format(test_db), 
test_user, True)
+      # Change the table owner back to admin.
+      self._run_query_as_user(
+          "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), 
ADMIN, True)
+      # create role before test begin.
+      self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True)
+      # test alter table owner to role statement, expect success result.
+      stmt = "alter table {0}.foo set owner role {1}".format(test_db, 
test_role)
+      self._run_query_as_user(stmt, ADMIN, True)
+      # drop the role.
+      self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True)
+      # alter table to a non-exist role, expect error showing "role doesn't 
exist".
+      stmt = "alter table {0}.foo set owner role {1}".format(test_db, 
test_role)
+      result = self._run_query_as_user(stmt, ADMIN, False)
+      err = "Role '{0}' does not exist.".format(test_role)
+      assert err in str(result)
+      # test_user should not be authorized to run the queries anymore.
+      result = self._run_query_as_user(
+          "select * from {0}.foo".format(test_db), test_user, False)
+      err = ("AuthorizationException: User '{0}' does not have privileges to 
execute"
+             + " 'SELECT' on: {1}.foo").format(test_user, test_db)
+      assert err in str(result)
+    finally:
+      self._run_query_as_user("drop database {0} cascade".format(test_db), 
ADMIN, True)
+
+  def _verified_multiuser_results(self, admin_client, admin_query_tmpl, 
user_query, users,
+                                 user_clients):
+    assert len(users) == len(user_clients)
+    for i in range(len(users)):
+      admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % 
users[i])).get_data()
+      user_res = user_clients[i].execute(user_query).get_data()
+      assert admin_res == user_res
+
+  def _test_select_view_created_by_non_superuser(self, unique_name):
+    """Test that except for the necessary privileges on the view, the 
requesting user
+    has to be granted the necessary privileges on the underlying tables as 
well in order
+    to access a view with its table property of 'Authorized' set to false."""
+    grantee_user = "non_owner"
+    admin_client = self.create_impala_client(user=ADMIN)
+    non_owner_client = self.create_impala_client(user=grantee_user)
+    unique_database = unique_name + "_db"
+    test_tbl_1 = unique_name + "_tbl_1"
+    test_tbl_2 = unique_name + "_tbl_2"
+    test_view = unique_name + "_view"
+
+    try:
+      # Set up temp database, tables, and the view.
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      admin_client.execute("create database {0}".format(unique_database))
+      admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)"
+          .format(unique_database, test_tbl_1))
+      admin_client.execute("create table {0}.{1} (id int, string_col string)"
+          .format(unique_database, test_tbl_2))
+      admin_client.execute("create view {0}.{1} (abc, xyz) as "
+          "select count(a.bigint_col), b.string_col "
+          "from {0}.{2} a inner join {0}.{3} b on a.id = b.id "
+          "group by b.string_col having count(a.bigint_col) > 1"
+          .format(unique_database, test_view, test_tbl_1, test_tbl_2))
+      # Add this table property to simulate a view created by a non-superuser.
+      self.run_stmt_in_hive("alter view {0}.{1} "
+          "set tblproperties ('Authorized' = 'false')"
+          .format(unique_database, test_view))
+
+      admin_client.execute("grant select(abc) on table {0}.{1} to user {2}"
+          .format(unique_database, test_view, grantee_user))
+      admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}"
+          .format(unique_database, test_view, grantee_user))
+      admin_client.execute("grant select on table {0}.{1} to user {2}"
+          .format(unique_database, test_tbl_2, grantee_user))
+      admin_client.execute("refresh authorization")
+
+      result = self.execute_query_expect_failure(non_owner_client,
+          "select * from {0}.{1}".format(unique_database, test_view))
+      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
+          "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in 
str(result)
+
+      admin_client.execute("grant select(id) on table {0}.{1} to user {2}"
+          .format(unique_database, test_tbl_1, grantee_user))
+      admin_client.execute("refresh authorization")
+      # The query is not authorized since the SELECT privilege on the column 
'bigint_col'
+      # in the table 'test_tbl_1' is also required.
+      result = self.execute_query_expect_failure(non_owner_client,
+          "select * from {0}.{1}".format(unique_database, test_view))
+      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
+          "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in 
str(result)
+
+      admin_client.execute("grant select(bigint_col) on table {0}.{1} to user 
{2}"
+          .format(unique_database, test_tbl_1, grantee_user))
+      admin_client.execute("refresh authorization")
+
+      # The query is authorized successfully once sufficient privileges are 
granted.
+      self.execute_query_expect_success(non_owner_client,
+          "select * from {0}.{1}".format(unique_database, test_view))
+
+      # Add a deny policy to prevent 'grantee_user' from accessing the column 
'id' in
+      # the table 'test_tbl_2', on which 'grantee_user' had been granted the 
SELECT
+      # privilege.
+      TestRanger._add_deny_policy(unique_name, grantee_user, unique_database, 
test_tbl_2,
+          "id")
+      admin_client.execute("refresh authorization")
+
+      # The query is not authorized since the SELECT privilege on the column 
'id' in the
+      # table 'test_tbl_2' has been denied in the policy above.
+      result = self.execute_query_expect_failure(non_owner_client,
+          "select * from {0}.{1}".format(unique_database, test_view))
+      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
+          "{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in 
str(result)
+    finally:
+      admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}"
+          .format(unique_database, test_view, grantee_user))
+      admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}"
+          .format(unique_database, test_view, grantee_user))
+      admin_client.execute("revoke select(id) on table {0}.{1} from user {2}"
+          .format(unique_database, test_tbl_1, grantee_user))
+      admin_client.execute("revoke select(bigint_col) on table {0}.{1} from 
user {2}"
+          .format(unique_database, test_tbl_1, grantee_user))
+      admin_client.execute("revoke select on table {0}.{1} from user {2}"
+          .format(unique_database, test_tbl_2, grantee_user))
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      TestRanger._remove_policy(unique_name)
+
+
+class TestRangerIndependent(TestRanger):
+  """
+  Tests for Apache Ranger integration with Apache Impala that require cluster 
restart
+  between test method.
+  """
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", 
dir=os.getenv("LOG_DIR")),
+    impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"),
+    catalogd_args=CATALOGD_ARGS,
+    disable_log_buffering=True)
+  def test_xff_ranger_audit(self):
+    """
+    Tests XFF client IP is included in ranger audit logs when using hs2-http 
protocol
+    """
+    # Iterate over test vector within test function to avoid restarting 
cluster.
+    for vector in\
+        [ImpalaTestVector([value]) for value in 
create_client_protocol_dimension()]:
+      protocol = vector.get_value("protocol")
+      if protocol != "hs2-http":
+        continue
+
+      # Query with XFF header in client request
+      args = ['--protocol=hs2-http',
+              '--hs2_x_forward= 10.20.30.40 ',
+              '-q', 'select count(1) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Query with XFF header in client request
+      args = ['--protocol=hs2-http',
+              '--hs2_x_forward=10.20.30.40, 1.1.2.3, 127.0.0.6',
+              '-q', 'select count(1) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Query with XFF header in client request
+      args = ['--protocol=hs2-http',
+              '--hs2_x_forward=10.20.30.40,1.1.2.3,127.0.0.6',
+              '-q', 'select count(1) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Query without XFF header in client request
+      args = ['--protocol=hs2-http',
+              '-q', 'select count(2) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Query with empty XFF header in client request
+      args = ['--protocol=hs2-http',
+              '--hs2_x_forward=    ',
+              '-q', 'select count(2) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Query with invalid XFF header in client request
+      args = ['--protocol=hs2-http',
+              '--hs2_x_forward=foobar',
+              '-q', 'select count(3) from functional.alltypes']
+      run_impala_shell_cmd(vector, args)
+
+      # Shut down cluster to ensure logs flush to disk.
+      sleep(5)
+      self._stop_impala_cluster()
+
+      # Expected audit log string
+      expected_string_valid_xff = (
+        '"access":"select",'
+        '"resource":"functional/alltypes",'
+        '"resType":"@table",'
+        '"action":"select",'
+        '"result":1,'
+        '"agent":"impala",'
+        r'"policy":\d,'
+        '"enforcer":"ranger-acl",'
+        '"cliIP":"%s",'
+        '"reqData":"%s",'
+        '".+":".+","logType":"RangerAudit"'
+      )
+
+      # Ensure audit logs were logged in coordinator logs
+      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
+          ("10.20.30.40", r"select count\(1\) from functional.alltypes"),
+          expected_count=3)
+      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
+          ("127.0.0.1", r"select count\(2\) from functional.alltypes"), 
expected_count=2)
+      self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
+          ("foobar", r"select count\(3\) from functional.alltypes"), 
expected_count=1)
+
+  @pytest.mark.execute_serially
+  @SkipIf.is_test_jdk
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
+  def test_show_grant_hive_privilege(self, unique_name):
+    user = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_db = unique_name + "_db"
+    resource = {
+      "database": unique_db,
+      "column": "*",
+      "table": "*"
+    }
+    access = ["lock", "select"]
+
+    try:
+      TestRanger._grant_ranger_privilege(user, resource, access)
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+      admin_client.execute("create database {0}".format(unique_db))
+
+      admin_client.execute("refresh authorization")
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+
+      TestRanger._check_privileges(result, [
+          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
+      ])
+
+      # Assert that lock, select privilege exists in Ranger server
+      assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
+      assert "select" in TestRanger._get_ranger_privileges_db(user, unique_db)
+
+      admin_client.execute("revoke select on database {0} from user {1}"
+                           .format(unique_db, user))
+
+      # Assert that lock is still present and select is revoked in Ranger 
server
+      assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
+      assert "select" not in TestRanger._get_ranger_privileges_db(user, 
unique_db)
+
+      admin_client.execute("refresh authorization")
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+
+      TestRanger._check_privileges(result, [])
+    finally:
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+      TestRanger._revoke_ranger_privilege(user, resource, access)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
+  def test_grant_multiple_columns(self):
+    self._test_grant_multiple_columns(13)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args="{0} {1}".format(CATALOGD_ARGS, 
"--consolidate_grant_revoke_requests"),
+    reset_ranger=True)
+  def test_grant_multiple_columns_consolidate_grant_revoke_requests(self):
+    self._test_grant_multiple_columns(1)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args=CATALOGD_ARGS + " --hms_event_polling_interval_s=5")
+  def test_alter_owner_hms_event_sync(self, unique_name):
+    """Test Impala queries that depends on database ownership changes in Hive.
+       Use a longer polling interval to mimic lag in event processing."""
+    test_user = getuser()
+    test_db = unique_name + "_db"
+    # A client that only used by 'test_user'. Just need to set the username at 
the
+    # first statement. It will keep using the same username.
+    user_client = self.create_impala_client(user=test_user)
+    user_client.set_configuration({"sync_hms_events_wait_time_s": 10,
+                                   "sync_hms_events_strict_mode": True})
+
+    # Methods to change ownership in Hive which will generate ALTER_DATABASE 
events.
+    def change_db_owner_to_user():
+      self.run_stmt_in_hive(
+        "alter database {0} set owner user {1}".format(test_db, test_user), 
ADMIN)
+
+    def reset_db_owner_to_admin():
+      self.run_stmt_in_hive(
+        "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN)
+
+    # Create a test database as "admin" user. Owner is set accordingly.
+    # By default, only the "admin" user and owner of the db can read/write 
this db.
+    self._run_query_as_user(
+        "drop database if exists {0} cascade".format(test_db), ADMIN, 
expect_success=True)
+    self._run_query_as_user(
+        "create database {0}".format(test_db), ADMIN, expect_success=True)
+    try:
+      # Test table statement waits for db alter owner events
+      # Try to create a table under test_db as current user. It should fail 
since
+      # test_user is not the db owner.
+      create_tbl_stmt = "create table {0}.foo(a int)".format(test_db)
+      self.execute_query_expect_failure(user_client, create_tbl_stmt)
+      change_db_owner_to_user()
+      # Creating the table again should succeed once the ALTER_DATABASE event 
is synced.
+      self.execute_query_expect_success(user_client, create_tbl_stmt)
+      reset_db_owner_to_admin()
+
+      # Test table statement waits for table alter owner events
+      stmts = [
+        "describe {}.foo".format(test_db),
+        "insert into {}.foo values (0)".format(test_db),
+        "select * from {}.foo".format(test_db),
+        "compute stats {}.foo".format(test_db),
+        "refresh {}.foo".format(test_db),
+        "drop table {}.foo".format(test_db),
+      ]
+      for stmt in stmts:
+        # Change table owner to admin
+        self.run_stmt_in_hive(
+            "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), 
ADMIN)
+        self.execute_query_expect_failure(user_client, stmt)
+        # Change table owner to user
+        self.run_stmt_in_hive(
+            "alter table {0}.foo set owner user {1}".format(test_db, 
test_user), ADMIN)
+        self.execute_query_expect_success(user_client, stmt)
+      # Create the table again since the last statement is DROP TABLE.
+      change_db_owner_to_user()
+      self.execute_query_expect_success(user_client, create_tbl_stmt)
+      reset_db_owner_to_admin()
+
+      # Test SHOW DATABASES waits for db change owner events. The db is 
invisible if user
+      # is not the owner.
+      assert test_db not in self.all_db_names(user_client)
+      change_db_owner_to_user()
+      assert test_db in self.all_db_names(user_client)
+      reset_db_owner_to_admin()
+
+      # Test SHOW TABLES
+      # Run a query on the table to make its ownership info loaded. Otherwise, 
it will
+      # be missing in SHOW TABLES due to IMPALA-8937.
+      self.execute_query_expect_success(user_client, "describe 
{}.foo".format(test_db))
+      # SHOW TABLES should fail since user is not the owner of this db
+      self.execute_query_expect_failure(user_client, "show tables in " + 
test_db)
+      change_db_owner_to_user()
+      assert ["foo"] == self.all_table_names(user_client, test_db)
+      reset_db_owner_to_admin()
+    finally:
+      self._run_query_as_user("drop database {0} cascade".format(test_db), 
ADMIN, True)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} {1}".format(IMPALAD_ARGS,
+                                  
"--allow_catalog_cache_op_from_masked_users=true"),
+    catalogd_args=CATALOGD_ARGS,
+    disable_log_buffering=True)
+  def test_allow_metadata_update(self, unique_name):
+    self._test_allow_catalog_cache_op_from_masked_users(unique_name)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS,
+                                  
"--allow_catalog_cache_op_from_masked_users=true"),
+    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
+    disable_log_buffering=True)
+  def test_allow_metadata_update_local_catalog(self, unique_name):
+    self._test_allow_catalog_cache_op_from_masked_users(unique_name)
+
+  @pytest.mark.execute_serially
+  @SkipIf.is_test_jdk
+  @SkipIfFS.hive
+  @SkipIfHive2.ranger_auth
+  @CustomClusterTestSuite.with_args()
+  def test_hive_with_ranger_setup(self, vector):
+    """Test for setup of Hive-Ranger integration. Make sure future upgrades on
+    Hive/Ranger won't break the tool."""
+    script = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/run-hive-server.sh')
+    try:
+      # Add the policy before restarting Hive. So it can take effect 
immediately after
+      # HiveServer2 starts.
+      TestRanger._add_column_masking_policy(
+          "col_mask_for_hive", getuser(), "functional", "alltypestiny", "id", 
"CUSTOM",
+          "{col} * 100")
+      check_call([script, '-with_ranger'])
+      self.run_test_case("QueryTest/hive_ranger_integration", vector)
+    finally:
+      check_call([script])
+      TestRanger._remove_policy("col_mask_for_hive")
+
+  @pytest.mark.execute_serially
+  @SkipIfFS.incorrent_reported_ec
+  @CustomClusterTestSuite.with_args(
+    # We additionally provide impalad and catalogd with the customized 
user-to-groups
+    # mapper since some test cases in grant_revoke.test require Impala to 
retrieve the
+    # groups a given user belongs to and such users might not exist in the 
underlying
+    # OS in the testing environment, e.g., the user 'non_owner'.
+    impalad_args="{0} {1}".format(IMPALAD_ARGS,
+                                  
"--use_customized_user_groups_mapper_for_ranger"),
+    catalogd_args="{0} {1}".format(CATALOGD_ARGS,
+                                   
"--use_customized_user_groups_mapper_for_ranger"))
+  def test_grant_revoke_with_role(self, vector):
+    """Test grant/revoke with role."""
+    admin_client = self.create_impala_client(user=ADMIN)
+    try:
+      self.run_test_case('QueryTest/grant_revoke', vector, use_db="default")
+    finally:
+      # Below are the statements that need to be executed in order to clean up 
the
+      # privileges granted to the test roles as well as the test roles 
themselves.
+      # Note that we need to revoke those previously granted privileges so 
that each role
+      # is not referenced by any policy before we delete those roles.
+      # Moreover, we need to revoke the privilege on the database 
'grant_rev_db' before
+      # dropping 'grant_rev_db'. Otherwise, the revocation would fail due to an
+      # AnalysisException thrown because 'grant_rev_db' does not exist.
+      cleanup_statements = [
+        "revoke all on database grant_rev_db from 
grant_revoke_test_ALL_TEST_DB",
+        "revoke all on server from grant_revoke_test_ALL_SERVER",
+        "revoke all on table functional.alltypes from 
grant_revoke_test_NON_OWNER",
+        "revoke grant option for all on database functional "
+        "from grant_revoke_test_NON_OWNER",
+        "REVOKE SELECT (a, b, c, d, e, x, y) ON TABLE grant_rev_db.test_tbl3 "
+        "FROM grant_revoke_test_ALL_SERVER",
+        "REVOKE ALL ON DATABASE functional FROM grant_revoke_test_NON_OWNER",
+        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 FROM 
grant_revoke_test_NON_OWNER",
+        "REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 
"
+        "FROM grant_revoke_test_ALL_SERVER",
+        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl1 "
+        "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
+        "REVOKE INSERT ON TABLE grant_rev_db.test_tbl1 "
+        "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
+        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 "
+        "FROM grant_revoke_test_NON_OWNER",
+        "REVOKE SELECT (a) ON TABLE grant_rev_db.test_tbl3 "
+        "FROM grant_revoke_test_NON_OWNER",
+        "REVOKE SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 "
+        "FROM grant_revoke_test_ALL_SERVER",
+        "revoke all on server server1 from grant_revoke_test_ALL_SERVER1",
+        "revoke select(col1) on table grant_rev_db.test_tbl4 "
+        "from role grant_revoke_test_COLUMN_PRIV",
+        "{0}{1}{2}".format("revoke all on uri '",
+                           os.getenv("FILESYSTEM_PREFIX"),
+                           "/test-warehouse/grant_rev_test_tbl2'"
+                           "from grant_revoke_test_ALL_URI"),
+        "{0}{1}{2}".format("revoke all on uri '",
+                           os.getenv("FILESYSTEM_PREFIX"),
+                           "/test-warehouse/GRANT_REV_TEST_TBL3'"
+                           "from grant_revoke_test_ALL_URI"),
+        "{0}{1}{2}".format("revoke all on uri '",
+                           os.getenv("FILESYSTEM_PREFIX"),
+                           "/test-warehouse/grant_rev_test_prt'"
+                           "from grant_revoke_test_ALL_URI"),
+        "drop role grant_revoke_test_ALL_TEST_DB",
+        "drop role grant_revoke_test_ALL_SERVER",
+        "drop role grant_revoke_test_SELECT_INSERT_TEST_TBL",
+        "drop role grant_revoke_test_ALL_URI",
+        "drop role grant_revoke_test_NON_OWNER",
+        "drop role grant_revoke_test_ALL_SERVER1",
+        "drop role grant_revoke_test_COLUMN_PRIV",
+        "drop database grant_rev_db cascade"
+      ]
+
+      for statement in cleanup_statements:
+        try:
+          admin_client.execute(statement)
+        except Exception:
+          # There could be an exception thrown due to the non-existence of the 
role or
+          # resource involved in a statement that aims to revoke the privilege 
on a
+          # resource from a role, but we do not have to handle such an 
exception. We only
+          # need to make sure in the case when the role and the corresponding 
resource
+          # exist, the granted privilege is revoked. The same applies to the 
case when we
+          # drop a role.
+          pass
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
+    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
+    # We additionally set 'reset_ranger' to True, to reset all the policies in 
the
+    # Ranger service, so even if there were roles before this test, they will 
be
+    # deleted when this test runs. since the Ranger policies are reset before 
this
+    # test, we do not have to worry about there could be existing roles when 
the test
+    # is running.
+    reset_ranger=True)
+  def test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
+    self._test_no_exception_in_show_roles_if_no_roles_in_ranger()
+
+
[email protected]_args(
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args=CATALOGD_ARGS)
+class TestRangerLegacyCatalog(TestRanger):
+  """
+  Tests for Apache Ranger integration with Apache Impala in legacy catalog 
mode.
+  Test methods shares common cluster.
+  """
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_with_catalog_v1(self, unique_name):
+    """Tests grant/revoke with catalog v1."""
+    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
+                                          "refresh authorization"])
+
+  @pytest.mark.execute_serially
+  @SkipIfFS.hdfs_acls
+  def test_insert_with_catalog_v1(self, unique_name):
+    """
+    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
+    Impala does not throw an AnalysisException when an authorized user tries 
to execute
+    an INSERT query against a partitioned table of which the respective table 
path and
+    the partition path are not writable according to HDFS permission.
+    """
+    user = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_database = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+    partition_column = "year"
+    partition_value = "2008"
+    table_path = "test-warehouse/{0}.db/{1}".format(unique_database, 
unique_table)
+    table_partition_path = "{0}/{1}={2}"\
+        .format(table_path, partition_column, partition_value)
+    insert_statement = "insert into {0}.{1} (name) partition ({2}) " \
+        "values (\"Adam\", {3})".format(unique_database, unique_table, 
partition_column,
+        partition_value)
+    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
+        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
+    try:
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      admin_client.execute("create database {0}".format(unique_database))
+      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int)"
+          .format(unique_database, unique_table, partition_column))
+      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
+          .format(unique_database, unique_table, partition_column, 
partition_value))
+
+      # Change the owner user and group of the HDFS paths corresponding to the 
table and
+      # the partition so that according to Impala's FsPermissionChecker, the 
table is not
+      # writable to the user that loads the table. This user usually is the one
+      # representing the Impala service. Before IMPALA-11871, changing either 
the table
+      # path or the partition path to non-writable would result in an 
AnalysisException.
+      self.hdfs_client.chown(table_path, "another_user", "another_group")
+      self.hdfs_client.chown(table_partition_path, "another_user", 
"another_group")
+      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
+      # table and the related partition(s).
+      admin_client.execute("invalidate metadata {0}.{1}"
+          .format(unique_database, unique_table))
+
+      # Verify that the INSERT statement fails with AuthorizationException 
because the
+      # requesting user does not have the INSERT privilege on the table.
+      result = self._run_query_as_user(insert_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant insert on table {0}.{1} to user {2}"
+          .format(unique_database, unique_table, user))
+      # Verify that the INSERT statement succeeds without AnalysisException.
+      self._run_query_as_user(insert_statement, user, True)
+    finally:
+      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
+          .format(unique_database, unique_table, user))
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+
+  @pytest.mark.execute_serially
+  @SkipIfFS.hdfs_acls
+  def test_load_data_with_catalog_v1(self, unique_name):
+    """
+    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
+    Impala does not throw an AnalysisException when an authorized user tries 
to execute
+    a LOAD DATA query against a table partition of which the respective 
partition path is
+    not writable according to Impala's FsPermissionChecker.
+    """
+    user = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_database = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+    partition_column = "year"
+    partition_value = "2008"
+    destination_table_path = "test-warehouse/{0}.db/{1}" \
+        .format(unique_database, unique_table, )
+    destination_table_partition_path = "{0}/{1}={2}"\
+        .format(destination_table_path, partition_column, partition_value)
+    file_name = "load_data_with_catalog_v1.txt"
+    files_for_table = ["testdata/data/{0}".format(file_name)]
+    source_hdfs_dir = "/tmp"
+    load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \
+        "partition ({4}={5})".format(source_hdfs_dir, file_name, 
unique_database,
+        unique_table, partition_column, partition_value)
+    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
+        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
+    try:
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      admin_client.execute("create database {0}".format(unique_database))
+      copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir)
+      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int) "
+          "row format delimited fields terminated by ',' "
+          "stored as textfile".format(unique_database, unique_table, 
partition_column))
+      # We need to add the partition. Otherwise, the LOAD DATA statement can't 
create new
+      # partitions.
+      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
+          .format(unique_database, unique_table, partition_column, 
partition_value))
+
+      # Change the permissions of the HDFS path of the destination table 
partition.
+      # Before IMPALA-11871, even we changed the table path to non-writable, 
loading
+      # data into the partition was still allowed if the destination partition 
path
+      # was writable according to Impala's FsPermissionChecker. But if the 
destination
+      # partition path was not writable, an AnalysisException would be thrown.
+      self.hdfs_client.chown(destination_table_partition_path, "another_user",
+          "another_group")
+      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
+      # table and the related partition(s).
+      admin_client.execute("invalidate metadata {0}.{1}"
+          .format(unique_database, unique_table))
+
+      # To execute the LOAD DATA statement, a user has to be granted the ALL 
privilege
+      # on the source HDFS path and the INSERT privilege on the destination 
table.
+      # The following verifies the LOAD DATA statement fails with 
AuthorizationException
+      # due to insufficient privileges.
+      result = self._run_query_as_user(load_data_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant all on uri '{0}/{1}' to user {2}"
+          .format(source_hdfs_dir, file_name, user))
+      # The following verifies the ALL privilege on the source file alone is 
not
+      # sufficient to execute the LOAD DATA statement.
+      result = self._run_query_as_user(load_data_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant insert on table {0}.{1} to user {2}"
+          .format(unique_database, unique_table, user))
+      # Verify the LOAD DATA statement fails without AnalysisException.
+      self._run_query_as_user(load_data_statement, user, True)
+    finally:
+      admin_client.execute("revoke all on uri '{0}/{1}' from user {2}"
+          .format(source_hdfs_dir, file_name, user))
+      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
+          .format(unique_database, unique_table, user))
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      self.filesystem_client.delete_file_dir("{0}/{1}"
+          .format(source_hdfs_dir, file_name))
+
+  @pytest.mark.execute_serially
+  def test_grant_option(self, unique_name):
+    user1 = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_database = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+
+    try:
+      # Set-up temp database/table
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
+      admin_client.execute("create database {0}".format(unique_database))
+      admin_client.execute("create table {0}.{1} (x int)"
+                           .format(unique_database, unique_table))
+
+      # Give user 1 the ability to grant select privileges on unique_database
+      self.execute_query_expect_success(admin_client,
+                                        "grant select on database {0} to user 
{1} with "
+                                        "grant option".format(unique_database, 
user1))
+      self.execute_query_expect_success(admin_client,
+                                        "grant insert on database {0} to user 
{1} with "
+                                        "grant option".format(unique_database, 
user1))
+
+      # Verify user 1 has with_grant privilege on unique_database
+      result = self.execute_query("show grant user {0} on database {1}"
+                                  .format(user1, unique_database))
+      TestRanger._check_privileges(result, [
+          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"true"],
+          ["USER", user1, unique_database, "", "", "", "", "", "*", "select", 
"true"],
+          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"true"],
+          ["USER", user1, unique_database, "*", "*", "", "", "", "", "select", 
"true"]])
+
+      # Revoke select privilege and check grant option is still present
+      self.execute_query_expect_success(admin_client,
+                                        "revoke select on database {0} from 
user {1}"
+                                        .format(unique_database, user1))
+      result = self.execute_query("show grant user {0} on database {1}"
+                                  .format(user1, unique_database))
+      # Revoking the select privilege also deprives the grantee of the 
permission to
+      # transfer other privilege(s) on the same resource to other principals. 
This is a
+      # current limitation of Ranger since privileges on the same resource 
share the same
+      # delegateAdmin field in the corresponding RangerPolicyItem.
+      TestRanger._check_privileges(result, [
+          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"false"],
+          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"false"]])
+
+      # Revoke privilege granting from user 1
+      self.execute_query_expect_success(admin_client, "revoke grant option for 
insert "
+                                                      "on database {0} from 
user {1}"
+                                        .format(unique_database, user1))
+
+      # User 1 can no longer grant privileges on unique_database
+      # In ranger it is currently not possible to revoke grant for a single 
access type
+      result = self.execute_query("show grant user {0} on database {1}"
+                                  .format(user1, unique_database))
+      TestRanger._check_privileges(result, [
+          ["USER", user1, unique_database, "", "", "", "", "", "*", "insert", 
"false"],
+          ["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", 
"false"]])
+    finally:
+      admin_client.execute("revoke insert on database {0} from user {1}"
+                           .format(unique_database, user1))
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
+
+  @pytest.mark.execute_serially
+  def test_show_grant(self, unique_name):
+    user = getuser()
+    group = grp.getgrnam(getuser()).gr_name
+    test_data = [(user, "USER"), (group, "GROUP")]
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_db = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+    udf = "identity"
+
+    try:
+      # Create test database/table
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+      admin_client.execute("create database {0}".format(unique_db))
+      admin_client.execute("create table {0}.{1} (x int)"
+                           .format(unique_db, unique_table))
+
+      for data in test_data:
+        # Test basic show grant functionality for user/group
+        self._test_show_grant_basic(admin_client, data[1], data[0], unique_db,
+                                    unique_table, udf)
+        # Test that omitting ON <resource> results in failure
+        self._test_show_grant_without_on(data[1], data[0])
+
+        # Test inherited privileges (server privileges show for database, etc.)
+        self._test_show_grant_inherited(admin_client, data[1], data[0], 
unique_db,
+                                        unique_table)
+
+      # Test ALL privilege hides other privileges
+      self._test_show_grant_mask(admin_client, user)
+
+      # Test ALL privilege on UDF hides other privileges
+      self._test_show_grant_mask_on_udf(admin_client, data[1], data[0], 
unique_db, udf)
+
+      # Test USER inherits privileges for their GROUP
+      self._test_show_grant_user_group(admin_client, user, group, unique_db)
+    finally:
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_ranger_api(self, unique_name):
+    user = getuser()
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_db = unique_name + "_db"
+    resource = {
+      "database": unique_db,
+      "column": "*",
+      "table": "*"
+    }
+    access = ["select", "create"]
+
+    try:
+      # Create the test database
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+      admin_client.execute("create database {0}".format(unique_db))
+
+      # Grant privileges via Ranger REST API
+      TestRanger._grant_ranger_privilege(user, resource, access)
+
+      # Privileges should be stale before a refresh
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+      TestRanger._check_privileges(result, [])
+
+      # Refresh and check updated privileges
+      admin_client.execute("refresh authorization")
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+
+      TestRanger._check_privileges(result, [
+          ["USER", user, unique_db, "*", "*", "", "", "", "", "create", 
"false"],
+          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
+      ])
+
+      # Revoke privileges via Ranger REST API
+      TestRanger._revoke_ranger_privilege(user, resource, access)
+
+      # Privileges should be stale before a refresh
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+      TestRanger._check_privileges(result, [
+          ["USER", user, unique_db, "*", "*", "", "", "", "", "create", 
"false"],
+          ["USER", user, unique_db, "*", "*", "", "", "", "", "select", 
"false"]
+      ])
+
+      # Refresh and check updated privileges
+      admin_client.execute("refresh authorization")
+      result = self.client.execute("show grant user {0} on database {1}"
+                                   .format(user, unique_db))
+
+      TestRanger._check_privileges(result, [])
+    finally:
+      admin_client.execute("revoke all on database {0} from user {1}"
+                           .format(unique_db, user))
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
+
+  @pytest.mark.execute_serially
+  def test_legacy_catalog_ownership(self):
+      self._test_ownership()
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
+    self._test_grant_revoke_by_owner(unique_name)
+
+  @pytest.mark.execute_serially
+  def test_unsupported_sql(self):
+    """Tests unsupported SQL statements when running with Ranger."""
+    admin_client = self.create_impala_client(user=ADMIN)
+    error_msg = "UnsupportedOperationException: SHOW GRANT is not supported 
without a " \
+        "defined resource in Ranger."
+    statement = "show grant role foo"
+    result = self.execute_query_expect_failure(admin_client, statement)
+    assert error_msg in str(result)
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_invalid_principal(self):
+    """Tests grant/revoke to/from invalid principal should return more readable
+       error messages."""
+    valid_user = "admin"
+    invalid_user = "invalid_user"
+    invalid_group = "invalid_group"
+    # TODO(IMPALA-8640): Create two different Impala clients because the users 
to
+    # workaround the bug.
+    invalid_impala_client = self.create_impala_client(user=invalid_user)
+    valid_impala_client = self.create_impala_client(user=valid_user)
+    for statement in ["grant select on table functional.alltypes to user {0}"
+                      .format(getuser()),
+                      "revoke select on table functional.alltypes from user 
{0}"
+                      .format(getuser())]:
+      result = self.execute_query_expect_failure(invalid_impala_client, 
statement)
+      if "grant" in statement:
+        assert "Error granting a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantor user invalid_user doesn't exist" in 
str(result)
+      else:
+        assert "Error revoking a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantor user invalid_user doesn't exist" in 
str(result)
+
+    for statement in ["grant select on table functional.alltypes to user {0}"
+                      .format(invalid_user),
+                      "revoke select on table functional.alltypes from user 
{0}"
+                      .format(invalid_user)]:
+      result = self.execute_query_expect_failure(valid_impala_client, 
statement)
+      if "grant" in statement:
+        assert "Error granting a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantee user invalid_user doesn't exist" in 
str(result)
+      else:
+        assert "Error revoking a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantee user invalid_user doesn't exist" in 
str(result)
+
+    for statement in ["grant select on table functional.alltypes to group {0}"
+                      .format(invalid_group),
+                      "revoke select on table functional.alltypes from group 
{0}"
+                      .format(invalid_group)]:
+      result = self.execute_query_expect_failure(valid_impala_client, 
statement)
+      if "grant" in statement:
+        assert "Error granting a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantee group invalid_group doesn't exist" in 
str(result)
+      else:
+        assert "Error revoking a privilege in Ranger. Ranger error message: " \
+               "HTTP 403 Error: Grantee group invalid_group doesn't exist" in 
str(result)
+
+  @pytest.mark.execute_serially
   def test_show_functions(self, unique_name):
     user1 = getuser()
     admin_client = self.create_impala_client(user=ADMIN)
@@ -1763,8 +2202,6 @@ class TestRanger(CustomClusterTestSuite):
                               ADMIN, True)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_select_function(self, unique_name):
     """Verifies that to execute a UDF in a database, a user has to be granted 
a) the
     SELECT privilege on the UDF, and b) any of the SELECT, INSERT, REFRESH 
privileges on
@@ -1885,163 +2322,7 @@ class TestRanger(CustomClusterTestSuite):
       self._run_query_as_user("drop database {0} 
cascade".format(unique_database),
                               ADMIN, True)
 
-  def _test_ownership(self):
-    """Tests ownership privileges for databases and tables with ranger along 
with
-    some known quirks in the implementation."""
-    test_user = getuser()
-    test_role = 'test_role'
-    test_db = "test_ranger_ownership_" + get_random_id(5).lower()
-    # Create a test database as "admin" user. Owner is set accordingly.
-    self._run_query_as_user("create database {0}".format(test_db), ADMIN, True)
-    try:
-      # Try to create a table under test_db as current user. It should fail.
-      self._run_query_as_user(
-          "create table {0}.foo(a int)".format(test_db), test_user, False)
-
-      # Change the owner of the database to the current user.
-      self._run_query_as_user(
-          "alter database {0} set owner user {1}".format(test_db, test_user), 
ADMIN, True)
-
-      self._run_query_as_user("refresh authorization", ADMIN, True)
-
-      # Create should succeed now.
-      self._run_query_as_user(
-          "create table {0}.foo(a int)".format(test_db), test_user, True)
-      # Run show tables on the db. The resulting list should be empty. This 
happens
-      # because the created table's ownership information is not aggressively 
cached
-      # by the current Catalog implementations. Hence the analysis pass does 
not
-      # have access to the ownership information to verify if the current 
session
-      # user is actually the owner. We need to fix this by caching the HMS 
metadata
-      # more aggressively when the table loads. TODO(IMPALA-8937).
-      result = \
-          self._run_query_as_user("show tables in {0}".format(test_db), 
test_user, True)
-      assert len(result.data) == 0
-      # Run a simple query that warms up the table metadata and repeat SHOW 
TABLES.
-      self._run_query_as_user("select * from {0}.foo".format(test_db), 
test_user, True)
-      result = \
-          self._run_query_as_user("show tables in {0}".format(test_db), 
test_user, True)
-      assert len(result.data) == 1
-      assert "foo" in result.data
-      # Change the owner of the db back to the admin user
-      self._run_query_as_user(
-          "alter database {0} set owner user {1}".format(test_db, ADMIN), 
ADMIN, True)
-      result = self._run_query_as_user(
-          "show tables in {0}".format(test_db), test_user, False)
-      err = "User '{0}' does not have privileges to access: {1}.*.*". \
-          format(test_user, test_db)
-      assert err in str(result)
-      # test_user is still the owner of the table, so select should work fine.
-      self._run_query_as_user("select * from {0}.foo".format(test_db), 
test_user, True)
-      # Change the table owner back to admin.
-      self._run_query_as_user(
-          "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), 
ADMIN, True)
-      # create role before test begin.
-      self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True)
-      # test alter table owner to role statement, expect success result.
-      stmt = "alter table {0}.foo set owner role {1}".format(test_db, 
test_role)
-      self._run_query_as_user(stmt, ADMIN, True)
-      # drop the role.
-      self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True)
-      # alter table to a non-exist role, expect error showing "role doesn't 
exist".
-      stmt = "alter table {0}.foo set owner role {1}".format(test_db, 
test_role)
-      result = self._run_query_as_user(stmt, ADMIN, False)
-      err = "Role '{0}' does not exist.".format(test_role)
-      assert err in str(result)
-      # test_user should not be authorized to run the queries anymore.
-      result = self._run_query_as_user(
-          "select * from {0}.foo".format(test_db), test_user, False)
-      err = ("AuthorizationException: User '{0}' does not have privileges to 
execute"
-             + " 'SELECT' on: {1}.foo").format(test_user, test_db)
-      assert err in str(result)
-    finally:
-      self._run_query_as_user("drop database {0} cascade".format(test_db), 
ADMIN, True)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS,
-    catalogd_args=CATALOGD_ARGS + " --hms_event_polling_interval_s=5")
-  def test_alter_owner_hms_event_sync(self, unique_name):
-    """Test Impala queries that depends on database ownership changes in Hive.
-       Use a longer polling interval to mimic lag in event processing."""
-    test_user = getuser()
-    test_db = unique_name + "_db"
-    # A client that only used by 'test_user'. Just need to set the username at 
the
-    # first statement. It will keep using the same username.
-    user_client = self.create_impala_client(user=test_user)
-    user_client.set_configuration({"sync_hms_events_wait_time_s": 10,
-                                   "sync_hms_events_strict_mode": True})
-
-    # Methods to change ownership in Hive which will generate ALTER_DATABASE 
events.
-    def change_db_owner_to_user():
-      self.run_stmt_in_hive(
-        "alter database {0} set owner user {1}".format(test_db, test_user), 
ADMIN)
-
-    def reset_db_owner_to_admin():
-      self.run_stmt_in_hive(
-        "alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN)
-
-    # Create a test database as "admin" user. Owner is set accordingly.
-    # By default, only the "admin" user and owner of the db can read/write 
this db.
-    self._run_query_as_user(
-        "drop database if exists {0} cascade".format(test_db), ADMIN, 
expect_success=True)
-    self._run_query_as_user(
-        "create database {0}".format(test_db), ADMIN, expect_success=True)
-    try:
-      # Test table statement waits for db alter owner events
-      # Try to create a table under test_db as current user. It should fail 
since
-      # test_user is not the db owner.
-      create_tbl_stmt = "create table {0}.foo(a int)".format(test_db)
-      self.execute_query_expect_failure(user_client, create_tbl_stmt)
-      change_db_owner_to_user()
-      # Creating the table again should succeed once the ALTER_DATABASE event 
is synced.
-      self.execute_query_expect_success(user_client, create_tbl_stmt)
-      reset_db_owner_to_admin()
-
-      # Test table statement waits for table alter owner events
-      stmts = [
-        "describe {}.foo".format(test_db),
-        "insert into {}.foo values (0)".format(test_db),
-        "select * from {}.foo".format(test_db),
-        "compute stats {}.foo".format(test_db),
-        "refresh {}.foo".format(test_db),
-        "drop table {}.foo".format(test_db),
-      ]
-      for stmt in stmts:
-        # Change table owner to admin
-        self.run_stmt_in_hive(
-            "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), 
ADMIN)
-        self.execute_query_expect_failure(user_client, stmt)
-        # Change table owner to user
-        self.run_stmt_in_hive(
-            "alter table {0}.foo set owner user {1}".format(test_db, 
test_user), ADMIN)
-        self.execute_query_expect_success(user_client, stmt)
-      # Create the table again since the last statement is DROP TABLE.
-      change_db_owner_to_user()
-      self.execute_query_expect_success(user_client, create_tbl_stmt)
-      reset_db_owner_to_admin()
-
-      # Test SHOW DATABASES waits for db change owner events. The db is 
invisible if user
-      # is not the owner.
-      assert test_db not in self.all_db_names(user_client)
-      change_db_owner_to_user()
-      assert test_db in self.all_db_names(user_client)
-      reset_db_owner_to_admin()
-
-      # Test SHOW TABLES
-      # Run a query on the table to make its ownership info loaded. Otherwise, 
it will
-      # be missing in SHOW TABLES due to IMPALA-8937.
-      self.execute_query_expect_success(user_client, "describe 
{}.foo".format(test_db))
-      # SHOW TABLES should fail since user is not the owner of this db
-      self.execute_query_expect_failure(user_client, "show tables in " + 
test_db)
-      change_db_owner_to_user()
-      assert ["foo"] == self.all_table_names(user_client, test_db)
-      reset_db_owner_to_admin()
-    finally:
-      self._run_query_as_user("drop database {0} cascade".format(test_db), 
ADMIN, True)
-
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_select_function_with_fallback_db(self, unique_name):
     """Verifies that Impala should not allow using functions in the fallback 
database
     unless the user has been granted sufficient privileges on the given 
database."""
@@ -2120,8 +2401,6 @@ class TestRanger(CustomClusterTestSuite):
                               ADMIN, True)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_column_masking(self, vector, unique_name):
     user = getuser()
     unique_database = unique_name + '_db'
@@ -2216,8 +2495,6 @@ class TestRanger(CustomClusterTestSuite):
         TestRanger._remove_policy(unique_name + str(i))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_block_metadata_update(self, unique_name):
     """Test that the metadata update operation on a table by a requesting user 
is denied
        if there exists a column masking policy defined on any column in the 
table for the
@@ -2245,90 +2522,6 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("revoke all on server from user {0}".format(user))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args="{0} {1}".format(IMPALAD_ARGS,
-                                  
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=CATALOGD_ARGS,
-    disable_log_buffering=True)
-  def test_allow_metadata_update(self, unique_name):
-    self.__test_allow_catalog_cache_op_from_masked_users(unique_name)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS,
-                                  
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
-    disable_log_buffering=True)
-  def test_allow_metadata_update_local_catalog(self, unique_name):
-    self.__test_allow_catalog_cache_op_from_masked_users(unique_name)
-
-  def __test_allow_catalog_cache_op_from_masked_users(self, unique_name):
-    """Verify that catalog cache operations are allowed for masked users
-    when allow_catalog_cache_op_from_masked_users=true."""
-    user = getuser()
-    admin_client = self.create_impala_client(user=ADMIN)
-    non_admin_client = self.create_impala_client(user=user)
-    try:
-      # Create a column masking policy on 'user' which is also the owner of 
the table
-      TestRanger._add_column_masking_policy(
-          unique_name, user, "functional", "alltypestiny", "id",
-          "CUSTOM", "id * 100")
-
-      # At a cold start, the table is unloaded so its owner is unknown.
-      # INVALIDATE METADATA <table> is denied since 'user' is not detected as 
the owner.
-      result = self.execute_query_expect_failure(
-          non_admin_client, "invalidate metadata functional.alltypestiny")
-      assert "User '{0}' does not have privileges to execute " \
-             "'INVALIDATE METADATA/REFRESH' on: 
functional.alltypestiny".format(user) \
-             in str(result)
-      # Verify catalogd never loads metadata of this table
-      table_loaded_log = "Loaded metadata for: functional.alltypestiny"
-      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=0)
-
-      # Run a query to trigger metadata loading on the table
-      self.execute_query_expect_success(
-        non_admin_client, "describe functional.alltypestiny")
-      # Verify catalogd loads metadata of this table
-      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=1)
-
-      # INVALIDATE METADATA <table> is allowed since 'user' is detected as the 
owner.
-      self.execute_query_expect_success(
-          non_admin_client, "invalidate metadata functional.alltypestiny")
-
-      # Run a query to trigger metadata loading on the table
-      self.execute_query_expect_success(
-          non_admin_client, "describe functional.alltypestiny")
-      # Verify catalogd loads metadata of this table
-      self.assert_catalogd_log_contains("INFO", table_loaded_log, 
expected_count=2)
-
-      # Verify REFRESH <table> is allowed since 'user' is detected as the 
owner.
-      self.execute_query_expect_success(
-          non_admin_client, "refresh functional.alltypestiny")
-      self.execute_query_expect_success(
-          non_admin_client,
-          "refresh functional.alltypestiny partition(year=2009, month=1)")
-
-      # Clear the catalog cache and grant 'user' enough privileges
-      self.execute_query_expect_success(
-          admin_client, "invalidate metadata functional.alltypestiny")
-      admin_client.execute("grant refresh on table functional.alltypestiny to 
user {0}"
-                           .format(user))
-      try:
-        # Now 'user' should be able to run REFRESH/INVALIDATE even if the 
table is
-        # unloaded (not recognize it as the owner).
-        self.execute_query_expect_success(
-            non_admin_client, "invalidate metadata functional.alltypestiny")
-        self.execute_query_expect_success(
-            non_admin_client, "refresh functional.alltypestiny")
-      finally:
-        admin_client.execute(
-            "revoke refresh on table functional.alltypestiny from user 
{0}".format(user))
-    finally:
-      TestRanger._remove_policy(unique_name)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_masking_overload_coverage(self, vector, unique_name):
     """Test that we have cover all the overloads of the masking functions that 
could
        appear in using default policies."""
@@ -2371,8 +2564,6 @@ class TestRanger(CustomClusterTestSuite):
         TestRanger._remove_policy(policy_names.pop())
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_row_filtering(self, vector, unique_name):
     user = getuser()
     unique_database = unique_name + '_db'
@@ -2545,21 +2736,11 @@ class TestRanger(CustomClusterTestSuite):
       ]
       for statement in cleanup_statements:
         try:
-          admin_client.execute(statement, user=ADMIN)
-        except Exception as e:
-          LOG.error("Ignored exception in cleanup: " + str(e))
-
-  def _verified_multiuser_results(self, admin_client, admin_query_tmpl, 
user_query, users,
-                                 user_clients):
-    assert len(users) == len(user_clients)
-    for i in range(len(users)):
-      admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % 
users[i])).get_data()
-      user_res = user_clients[i].execute(user_query).get_data()
-      assert admin_res == user_res
+          admin_client.execute(statement, user=ADMIN)
+        except Exception as e:
+          LOG.error("Ignored exception in cleanup: " + str(e))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_column_masking_and_row_filtering(self, vector, unique_name):
     user = getuser()
     admin_client = self.create_impala_client(user=ADMIN)
@@ -2608,8 +2789,6 @@ class TestRanger(CustomClusterTestSuite):
         TestRanger._remove_policy(unique_name + str(i))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_iceberg_time_travel_with_masking(self, unique_name):
     """When we do a time travel query on an iceberg table we will use the 
schema from
     the time of the snapshot. Make sure this works when column masking is 
being used."""
@@ -2718,8 +2897,6 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_convert_table_to_iceberg(self, unique_name):
     """Test that autorization is taken into account when performing a table 
migration to
     Iceberg."""
@@ -2804,8 +2981,6 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_iceberg_metadata_table_privileges(self, unique_name):
     user = getuser()
     admin_client = self.create_impala_client(user=ADMIN)
@@ -2839,29 +3014,6 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
 
   @pytest.mark.execute_serially
-  @SkipIf.is_test_jdk
-  @SkipIfFS.hive
-  @SkipIfHive2.ranger_auth
-  @CustomClusterTestSuite.with_args()
-  def test_hive_with_ranger_setup(self, vector):
-    """Test for setup of Hive-Ranger integration. Make sure future upgrades on
-    Hive/Ranger won't break the tool."""
-    script = os.path.join(os.environ['IMPALA_HOME'], 
'testdata/bin/run-hive-server.sh')
-    try:
-      # Add the policy before restarting Hive. So it can take effect 
immediately after
-      # HiveServer2 starts.
-      TestRanger._add_column_masking_policy(
-          "col_mask_for_hive", getuser(), "functional", "alltypestiny", "id", 
"CUSTOM",
-          "{col} * 100")
-      check_call([script, '-with_ranger'])
-      self.run_test_case("QueryTest/hive_ranger_integration", vector)
-    finally:
-      check_call([script])
-      TestRanger._remove_policy("col_mask_for_hive")
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_profile_protection(self):
     """Test that a requesting user is able to access the runtime profile or 
execution
     summary of a query involving a view only if the user is granted the 
privileges on all
@@ -2920,218 +3072,40 @@ class TestRanger(CustomClusterTestSuite):
         admin_client.execute(statement)
 
   @pytest.mark.execute_serially
-  @SkipIfFS.incorrent_reported_ec
-  @CustomClusterTestSuite.with_args(
-    # We additionally provide impalad and catalogd with the customized 
user-to-groups
-    # mapper since some test cases in grant_revoke.test require Impala to 
retrieve the
-    # groups a given user belongs to and such users might not exist in the 
underlying
-    # OS in the testing environment, e.g., the user 'non_owner'.
-    impalad_args="{0} {1}".format(IMPALAD_ARGS,
-                                  
"--use_customized_user_groups_mapper_for_ranger"),
-    catalogd_args="{0} {1}".format(CATALOGD_ARGS,
-                                   
"--use_customized_user_groups_mapper_for_ranger"))
-  def test_grant_revoke_with_role(self, vector):
-    """Test grant/revoke with role."""
-    admin_client = self.create_impala_client(user=ADMIN)
-    try:
-      self.run_test_case('QueryTest/grant_revoke', vector, use_db="default")
-    finally:
-      # Below are the statements that need to be executed in order to clean up 
the
-      # privileges granted to the test roles as well as the test roles 
themselves.
-      # Note that we need to revoke those previously granted privileges so 
that each role
-      # is not referenced by any policy before we delete those roles.
-      # Moreover, we need to revoke the privilege on the database 
'grant_rev_db' before
-      # dropping 'grant_rev_db'. Otherwise, the revocation would fail due to an
-      # AnalysisException thrown because 'grant_rev_db' does not exist.
-      cleanup_statements = [
-        "revoke all on database grant_rev_db from 
grant_revoke_test_ALL_TEST_DB",
-        "revoke all on server from grant_revoke_test_ALL_SERVER",
-        "revoke all on table functional.alltypes from 
grant_revoke_test_NON_OWNER",
-        "revoke grant option for all on database functional "
-        "from grant_revoke_test_NON_OWNER",
-        "REVOKE SELECT (a, b, c, d, e, x, y) ON TABLE grant_rev_db.test_tbl3 "
-        "FROM grant_revoke_test_ALL_SERVER",
-        "REVOKE ALL ON DATABASE functional FROM grant_revoke_test_NON_OWNER",
-        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 FROM 
grant_revoke_test_NON_OWNER",
-        "REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 
"
-        "FROM grant_revoke_test_ALL_SERVER",
-        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl1 "
-        "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
-        "REVOKE INSERT ON TABLE grant_rev_db.test_tbl1 "
-        "FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
-        "REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 "
-        "FROM grant_revoke_test_NON_OWNER",
-        "REVOKE SELECT (a) ON TABLE grant_rev_db.test_tbl3 "
-        "FROM grant_revoke_test_NON_OWNER",
-        "REVOKE SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 "
-        "FROM grant_revoke_test_ALL_SERVER",
-        "revoke all on server server1 from grant_revoke_test_ALL_SERVER1",
-        "revoke select(col1) on table grant_rev_db.test_tbl4 "
-        "from role grant_revoke_test_COLUMN_PRIV",
-        "{0}{1}{2}".format("revoke all on uri '",
-                           os.getenv("FILESYSTEM_PREFIX"),
-                           "/test-warehouse/grant_rev_test_tbl2'"
-                           "from grant_revoke_test_ALL_URI"),
-        "{0}{1}{2}".format("revoke all on uri '",
-                           os.getenv("FILESYSTEM_PREFIX"),
-                           "/test-warehouse/GRANT_REV_TEST_TBL3'"
-                           "from grant_revoke_test_ALL_URI"),
-        "{0}{1}{2}".format("revoke all on uri '",
-                           os.getenv("FILESYSTEM_PREFIX"),
-                           "/test-warehouse/grant_rev_test_prt'"
-                           "from grant_revoke_test_ALL_URI"),
-        "drop role grant_revoke_test_ALL_TEST_DB",
-        "drop role grant_revoke_test_ALL_SERVER",
-        "drop role grant_revoke_test_SELECT_INSERT_TEST_TBL",
-        "drop role grant_revoke_test_ALL_URI",
-        "drop role grant_revoke_test_NON_OWNER",
-        "drop role grant_revoke_test_ALL_SERVER1",
-        "drop role grant_revoke_test_COLUMN_PRIV",
-        "drop database grant_rev_db cascade"
-      ]
-
-      for statement in cleanup_statements:
-        try:
-          admin_client.execute(statement)
-        except Exception:
-          # There could be an exception thrown due to the non-existence of the 
role or
-          # resource involved in a statement that aims to revoke the privilege 
on a
-          # resource from a role, but we do not have to handle such an 
exception. We only
-          # need to make sure in the case when the role and the corresponding 
resource
-          # exist, the granted privilege is revoked. The same applies to the 
case when we
-          # drop a role.
-          pass
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_select_view_created_by_non_superuser_with_catalog_v1(self, 
unique_name):
     self._test_select_view_created_by_non_superuser(unique_name)
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
+
[email protected]_args(
     impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
     catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
-  def test_select_view_created_by_non_superuser_with_local_catalog(self, 
unique_name):
-    self._test_select_view_created_by_non_superuser(unique_name)
-
-  def _test_select_view_created_by_non_superuser(self, unique_name):
-    """Test that except for the necessary privileges on the view, the 
requesting user
-    has to be granted the necessary privileges on the underlying tables as 
well in order
-    to access a view with its table property of 'Authorized' set to false."""
-    grantee_user = "non_owner"
-    admin_client = self.create_impala_client(user=ADMIN)
-    non_owner_client = self.create_impala_client(user=grantee_user)
-    unique_database = unique_name + "_db"
-    test_tbl_1 = unique_name + "_tbl_1"
-    test_tbl_2 = unique_name + "_tbl_2"
-    test_view = unique_name + "_view"
-
-    try:
-      # Set up temp database, tables, and the view.
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-      admin_client.execute("create database {0}".format(unique_database))
-      admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)"
-          .format(unique_database, test_tbl_1))
-      admin_client.execute("create table {0}.{1} (id int, string_col string)"
-          .format(unique_database, test_tbl_2))
-      admin_client.execute("create view {0}.{1} (abc, xyz) as "
-          "select count(a.bigint_col), b.string_col "
-          "from {0}.{2} a inner join {0}.{3} b on a.id = b.id "
-          "group by b.string_col having count(a.bigint_col) > 1"
-          .format(unique_database, test_view, test_tbl_1, test_tbl_2))
-      # Add this table property to simulate a view created by a non-superuser.
-      self.run_stmt_in_hive("alter view {0}.{1} "
-          "set tblproperties ('Authorized' = 'false')"
-          .format(unique_database, test_view))
-
-      admin_client.execute("grant select(abc) on table {0}.{1} to user {2}"
-          .format(unique_database, test_view, grantee_user))
-      admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}"
-          .format(unique_database, test_view, grantee_user))
-      admin_client.execute("grant select on table {0}.{1} to user {2}"
-          .format(unique_database, test_tbl_2, grantee_user))
-      admin_client.execute("refresh authorization")
-
-      result = self.execute_query_expect_failure(non_owner_client,
-          "select * from {0}.{1}".format(unique_database, test_view))
-      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
-          "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in 
str(result)
-
-      admin_client.execute("grant select(id) on table {0}.{1} to user {2}"
-          .format(unique_database, test_tbl_1, grantee_user))
-      admin_client.execute("refresh authorization")
-      # The query is not authorized since the SELECT privilege on the column 
'bigint_col'
-      # in the table 'test_tbl_1' is also required.
-      result = self.execute_query_expect_failure(non_owner_client,
-          "select * from {0}.{1}".format(unique_database, test_view))
-      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
-          "{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in 
str(result)
-
-      admin_client.execute("grant select(bigint_col) on table {0}.{1} to user 
{2}"
-          .format(unique_database, test_tbl_1, grantee_user))
-      admin_client.execute("refresh authorization")
-
-      # The query is authorized successfully once sufficient privileges are 
granted.
-      self.execute_query_expect_success(non_owner_client,
-          "select * from {0}.{1}".format(unique_database, test_view))
+class TestRangerLocalCatalog(TestRanger):
+  """
+  Tests for Apache Ranger integration with Apache Impala in local catalog mode.
+  Test methods shares common cluster.
+  """
 
-      # Add a deny policy to prevent 'grantee_user' from accessing the column 
'id' in
-      # the table 'test_tbl_2', on which 'grantee_user' had been granted the 
SELECT
-      # privilege.
-      TestRanger._add_deny_policy(unique_name, grantee_user, unique_database, 
test_tbl_2,
-          "id")
-      admin_client.execute("refresh authorization")
+  @pytest.mark.execute_serially
+  def test_grant_revoke_with_local_catalog(self, unique_name):
+    """Tests grant/revoke with catalog v2 (local catalog)."""
+    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
+                                          "refresh authorization"])
 
-      # The query is not authorized since the SELECT privilege on the column 
'id' in the
-      # table 'test_tbl_2' has been denied in the policy above.
-      result = self.execute_query_expect_failure(non_owner_client,
-          "select * from {0}.{1}".format(unique_database, test_view))
-      assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
-          "{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in 
str(result)
-    finally:
-      admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}"
-          .format(unique_database, test_view, grantee_user))
-      admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}"
-          .format(unique_database, test_view, grantee_user))
-      admin_client.execute("revoke select(id) on table {0}.{1} from user {2}"
-          .format(unique_database, test_tbl_1, grantee_user))
-      admin_client.execute("revoke select(bigint_col) on table {0}.{1} from 
user {2}"
-          .format(unique_database, test_tbl_1, grantee_user))
-      admin_client.execute("revoke select on table {0}.{1} from user {2}"
-          .format(unique_database, test_tbl_2, grantee_user))
-      admin_client.execute("drop database if exists {0} cascade"
-          .format(unique_database))
-      TestRanger._remove_policy(unique_name)
+  @pytest.mark.execute_serially
+  def test_local_catalog_ownership(self):
+      # getTableIfCached() in LocalCatalog loads a minimal incomplete table
+      # that does not include the ownership information. Hence show tables
+      # *never* show owned tables. TODO(bharathv): Fix in a follow up commit
+      pytest.xfail("getTableIfCached() faulty behavior, known issue")
+      self._test_ownership()
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
-    # We additionally set 'reset_ranger' to True, to reset all the policies in 
the
-    # Ranger service, so even if there were roles before this test, they will 
be
-    # deleted when this test runs. since the Ranger policies are reset before 
this
-    # test, we do not have to worry about there could be existing roles when 
the test
-    # is running.
-    reset_ranger=True)
-  def test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
-    self._test_no_exception_in_show_roles_if_no_roles_in_ranger()
+  def test_grant_revoke_by_owner_local_catalog(self, unique_name):
+    self._test_grant_revoke_by_owner(unique_name)
 
-  def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
-    """
-    Ensure that no exception should throw for show roles statement
-    if there are no roles in ranger.
-    """
-    admin_client = self.create_impala_client(user=ADMIN)
-    show_roles_statements = [
-      "SHOW ROLES",
-      "SHOW CURRENT ROLES",
-      "SHOW ROLE GRANT GROUP admin"
-    ]
-    for statement in show_roles_statements:
-      result = self.execute_query_expect_success(admin_client, statement)
-      assert len(result.data) == 0
+  @pytest.mark.execute_serially
+  def test_select_view_created_by_non_superuser_with_local_catalog(self, 
unique_name):
+    self._test_select_view_created_by_non_superuser(unique_name)
 
 
 class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):

Reply via email to