This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6776402ac [#5118] feat(auth):  Lakehouse Iceberg catalog supports 
Ranger authorization plugin (#5467)
6776402ac is described below

commit 6776402ac8c1f45bf16d62b24e9bd780c59a0bbd
Author: roryqi <ror...@apache.org>
AuthorDate: Wed Nov 6 19:50:39 2024 +0800

    [#5118] feat(auth):  Lakehouse Iceberg catalog supports Ranger 
authorization plugin (#5467)
    
    ### What changes were proposed in this pull request?
    
    Lakehouse Iceberg catalog supports Ranger authorization plugin
    
    ### Why are the changes needed?
    
    Fix: #5118
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add the document.
    
    ### How was this patch tested?
    
    Add E2E tests.
---
 .../apache/gravitino/authorization/Privilege.java  |   2 +-
 .../apache/gravitino/authorization/Privileges.java |   2 +-
 .../authorization-ranger/build.gradle.kts          |   5 +-
 .../authorization/ranger/RangerAuthorization.java  |   3 +-
 ...ava => RangerAuthorizationHadoopSQLPlugin.java} |  16 +-
 .../{RangerHiveE2EIT.java => RangerBaseE2EIT.java} | 420 ++++-----
 .../ranger/integration/test/RangerHiveE2EIT.java   | 940 +--------------------
 .../ranger/integration/test/RangerITEnv.java       |   4 +-
 .../integration/test/RangerIcebergE2EIT.java       | 236 ++++++
 docs/security/access-control.md                    |  10 +-
 docs/security/authorization-pushdown.md            |  32 +-
 11 files changed, 471 insertions(+), 1199 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/Privilege.java 
b/api/src/main/java/org/apache/gravitino/authorization/Privilege.java
index 3ca4107a1..866cf7d23 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/Privilege.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/Privilege.java
@@ -61,7 +61,7 @@ public interface Privilege {
     USE_SCHEMA(0L, 1L << 4),
     /** The privilege to create a table. */
     CREATE_TABLE(0L, 1L << 5),
-    /** The privilege to execute SQL `ALTER`, `INSERT`, `UPDATE`, or `DELETE` 
for a table. */
+    /** The privilege to write data to a table or modify the table schema. */
     MODIFY_TABLE(0L, 1L << 6),
     /** The privilege to select data from a table. */
     SELECT_TABLE(0L, 1L << 7),
diff --git 
a/api/src/main/java/org/apache/gravitino/authorization/Privileges.java 
b/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
index 5255bce1c..b0f9e8fcc 100644
--- a/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
+++ b/api/src/main/java/org/apache/gravitino/authorization/Privileges.java
@@ -407,7 +407,7 @@ public class Privileges {
     }
   }
 
-  /** The privilege to execute SQL `ALTER`, `INSERT`, `UPDATE`, or `DELETE` 
for a table. */
+  /** The privilege to write data to a table or modify the table schema. */
   public static class ModifyTable extends GenericPrivilege<ModifyTable> {
     private static final ModifyTable ALLOW_INSTANCE =
         new ModifyTable(Condition.ALLOW, Name.MODIFY_TABLE);
diff --git a/authorizations/authorization-ranger/build.gradle.kts 
b/authorizations/authorization-ranger/build.gradle.kts
index 66341d9b0..93d90cd4f 100644
--- a/authorizations/authorization-ranger/build.gradle.kts
+++ b/authorizations/authorization-ranger/build.gradle.kts
@@ -27,6 +27,8 @@ plugins {
 val scalaVersion: String = project.properties["scalaVersion"] as? String ?: 
extra["defaultScalaVersion"].toString()
 val sparkVersion: String = libs.versions.spark35.get()
 val kyuubiVersion: String = libs.versions.kyuubi4spark35.get()
+val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
+val icebergVersion: String = libs.versions.iceberg4spark.get()
 
 dependencies {
   implementation(project(":api")) {
@@ -97,6 +99,7 @@ dependencies {
     exclude("javax.servlet", "servlet-api")
     exclude("io.netty")
   }
+  
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
 }
 
 tasks {
@@ -126,7 +129,7 @@ tasks {
 
 tasks.test {
   doFirst {
-    environment("HADOOP_USER_NAME", "test")
+    environment("HADOOP_USER_NAME", "gravitino")
   }
   dependsOn(":catalogs:catalog-hive:jar", ":catalogs:catalog-hive:runtimeJars")
 
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
index 3fb74f288..9f8b42b06 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
@@ -33,7 +33,8 @@ public class RangerAuthorization extends 
BaseAuthorization<RangerAuthorization>
   protected AuthorizationPlugin newPlugin(String catalogProvider, Map<String, 
String> config) {
     switch (catalogProvider) {
       case "hive":
-        return RangerAuthorizationHivePlugin.getInstance(config);
+      case "lakehouse-iceberg":
+        return RangerAuthorizationHadoopSQLPlugin.getInstance(config);
       default:
         throw new IllegalArgumentException("Unknown catalog provider: " + 
catalogProvider);
     }
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHivePlugin.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
similarity index 96%
rename from 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHivePlugin.java
rename to 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
index 12072d7f6..f75485aed 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHivePlugin.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
@@ -41,19 +41,21 @@ import 
org.apache.gravitino.exceptions.AuthorizationPluginException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RangerAuthorizationHivePlugin extends RangerAuthorizationPlugin {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuthorizationHivePlugin.class);
-  private static volatile RangerAuthorizationHivePlugin instance = null;
+public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugin {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RangerAuthorizationHadoopSQLPlugin.class);
+  private static volatile RangerAuthorizationHadoopSQLPlugin instance = null;
 
-  private RangerAuthorizationHivePlugin(Map<String, String> config) {
+  private RangerAuthorizationHadoopSQLPlugin(Map<String, String> config) {
     super(config);
   }
 
-  public static synchronized RangerAuthorizationHivePlugin 
getInstance(Map<String, String> config) {
+  public static synchronized RangerAuthorizationHadoopSQLPlugin getInstance(
+      Map<String, String> config) {
     if (instance == null) {
-      synchronized (RangerAuthorizationHivePlugin.class) {
+      synchronized (RangerAuthorizationHadoopSQLPlugin.class) {
         if (instance == null) {
-          instance = new RangerAuthorizationHivePlugin(config);
+          instance = new RangerAuthorizationHadoopSQLPlugin(config);
         }
       }
     }
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerBaseE2EIT.java
similarity index 71%
copy from 
authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
copy to 
authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerBaseE2EIT.java
index 409ddf48e..50a7d2394 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerBaseE2EIT.java
@@ -18,167 +18,91 @@
  */
 package org.apache.gravitino.authorization.ranger.integration.test;
 
-import static org.apache.gravitino.Catalog.AUTHORIZATION_PROVIDER;
 import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.currentFunName;
-import static 
org.apache.gravitino.catalog.hive.HiveConstants.IMPERSONATION_ENABLE;
-import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_AUTH_TYPE;
-import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_PASSWORD;
-import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_SERVICE_NAME;
-import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_USERNAME;
-import static 
org.apache.gravitino.integration.test.container.RangerContainer.RANGER_SERVER_PORT;
-
-import com.google.common.collect.ImmutableMap;
+
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
-import org.apache.gravitino.Configs;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.auth.AuthConstants;
-import org.apache.gravitino.auth.AuthenticatorType;
 import org.apache.gravitino.authorization.Owner;
 import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
-import org.apache.gravitino.catalog.hive.HiveConstants;
 import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
-import org.apache.gravitino.integration.test.container.HiveContainer;
-import org.apache.gravitino.integration.test.container.RangerContainer;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.kyuubi.plugin.spark.authz.AccessControlException;
-import org.apache.spark.SparkUnsupportedOperationException;
-import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Tag("gravitino-docker-test")
-public class RangerHiveE2EIT extends BaseIT {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveE2EIT.class);
-
-  public static final String metalakeName =
-      GravitinoITUtils.genRandomName("metalake").toLowerCase();
+public abstract class RangerBaseE2EIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerBaseE2EIT.class);
   public static final String catalogName = 
GravitinoITUtils.genRandomName("catalog").toLowerCase();
   public static final String schemaName = 
GravitinoITUtils.genRandomName("schema").toLowerCase();
 
   public static final String tableName = 
GravitinoITUtils.genRandomName("table").toLowerCase();
 
-  private static GravitinoMetalake metalake;
-  private static Catalog catalog;
-  private static final String provider = "hive";
-  private static String HIVE_METASTORE_URIS;
+  public static String metalakeName;
+  protected static GravitinoMetalake metalake;
+  protected static Catalog catalog;
+  protected static String HIVE_METASTORE_URIS;
+  protected static String RANGER_ADMIN_URL = null;
 
-  private static SparkSession sparkSession = null;
-  private static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+  protected static SparkSession sparkSession = null;
+  protected static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
 
-  private static final String SQL_SHOW_DATABASES =
+  protected static final String SQL_SHOW_DATABASES =
       String.format("SHOW DATABASES like '%s'", schemaName);
 
-  private static final String SQL_CREATE_SCHEMA = String.format("CREATE 
DATABASE %s", schemaName);
+  protected static final String SQL_CREATE_SCHEMA = String.format("CREATE 
DATABASE %s", schemaName);
 
-  private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE 
%s", schemaName);
+  protected static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE 
%s", schemaName);
 
-  private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", 
schemaName);
+  protected static final String SQL_USE_SCHEMA = String.format("USE SCHEMA 
%s", schemaName);
 
-  private static final String SQL_CREATE_TABLE =
+  protected static final String SQL_CREATE_TABLE =
       String.format("CREATE TABLE %s (a int, b string, c string)", tableName);
 
-  private static final String SQL_INSERT_TABLE =
+  protected static final String SQL_INSERT_TABLE =
       String.format("INSERT INTO %s (a, b, c) VALUES (1, 'a', 'b')", 
tableName);
 
-  private static final String SQL_SELECT_TABLE = String.format("SELECT * FROM 
%s", tableName);
+  protected static final String SQL_SELECT_TABLE = String.format("SELECT * 
FROM %s", tableName);
 
-  private static final String SQL_UPDATE_TABLE =
+  protected static final String SQL_UPDATE_TABLE =
       String.format("UPDATE %s SET b = 'b', c = 'c' WHERE a = 1", tableName);
 
-  private static final String SQL_DELETE_TABLE =
+  protected static final String SQL_DELETE_TABLE =
       String.format("DELETE FROM %s WHERE a = 1", tableName);
 
-  private static final String SQL_ALTER_TABLE =
+  protected static final String SQL_ALTER_TABLE =
       String.format("ALTER TABLE %s ADD COLUMN d string", tableName);
 
-  private static final String SQL_RENAME_TABLE =
+  protected static final String SQL_ALTER_TABLE_BACK =
+      String.format("ALTER TABLE %s DROP COLUMN d", tableName);
+  protected static final String SQL_RENAME_TABLE =
       String.format("ALTER TABLE %s RENAME TO new_table", tableName);
 
-  private static final String SQL_RENAME_BACK_TABLE =
+  protected static final String SQL_RENAME_BACK_TABLE =
       String.format("ALTER TABLE new_table RENAME TO %s", tableName);
 
-  private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", 
tableName);
-
-  private static String RANGER_ADMIN_URL = null;
-
-  @BeforeAll
-  public void startIntegrationTest() throws Exception {
-    // Enable Gravitino Authorization mode
-    Map<String, String> configs = Maps.newHashMap();
-    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
-    configs.put(Configs.SERVICE_ADMINS.getKey(), RangerITEnv.HADOOP_USER_NAME);
-    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.SIMPLE.name().toLowerCase());
-    configs.put("SimpleAuthUserName", AuthConstants.ANONYMOUS_USER);
-    registerCustomConfigs(configs);
-    super.startIntegrationTest();
-
-    RangerITEnv.init();
-    RangerITEnv.startHiveRangerContainer();
-
-    RANGER_ADMIN_URL =
-        String.format(
-            "http://%s:%d";,
-            containerSuite.getRangerContainer().getContainerIpAddress(), 
RANGER_SERVER_PORT);
-
-    HIVE_METASTORE_URIS =
-        String.format(
-            "thrift://%s:%d",
-            containerSuite.getHiveRangerContainer().getContainerIpAddress(),
-            HiveContainer.HIVE_METASTORE_PORT);
-
-    generateRangerSparkSecurityXML();
-
-    sparkSession =
-        SparkSession.builder()
-            .master("local[1]")
-            .appName("Ranger Hive E2E integration test")
-            .config("hive.metastore.uris", HIVE_METASTORE_URIS)
-            .config(
-                "spark.sql.warehouse.dir",
-                String.format(
-                    "hdfs://%s:%d/user/hive/warehouse",
-                    
containerSuite.getHiveRangerContainer().getContainerIpAddress(),
-                    HiveContainer.HDFS_DEFAULTFS_PORT))
-            .config("spark.sql.storeAssignmentPolicy", "LEGACY")
-            .config("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
-            .config(
-                "spark.sql.extensions",
-                
"org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension")
-            .enableHiveSupport()
-            .getOrCreate();
-
-    createMetalake();
-    createCatalog();
-
-    metalake.addUser("test");
-  }
+  protected static final String SQL_DROP_TABLE = String.format("DROP TABLE 
%s", tableName);
 
-  private static void generateRangerSparkSecurityXML() throws IOException {
+  protected static void generateRangerSparkSecurityXML() throws IOException {
     String templatePath =
         String.join(
             File.separator,
@@ -209,14 +133,13 @@ public class RangerHiveE2EIT extends BaseIT {
     FileUtils.writeStringToFile(new File(xmlPath), templateContext, 
StandardCharsets.UTF_8);
   }
 
-  @AfterAll
-  public void stop() {
+  protected void cleanIT() {
     if (client != null) {
       Arrays.stream(catalog.asSchemas().listSchemas())
           .filter(schema -> !schema.equals("default"))
           .forEach(
               (schema -> {
-                catalog.asSchemas().dropSchema(schema, true);
+                catalog.asSchemas().dropSchema(schema, false);
               }));
       Arrays.stream(metalake.listCatalogs())
           .forEach((catalogName -> metalake.dropCatalog(catalogName, true)));
@@ -235,8 +158,50 @@ public class RangerHiveE2EIT extends BaseIT {
     RangerITEnv.cleanup();
   }
 
+  protected void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  protected static void waitForUpdatingPolicies() throws InterruptedException {
+    // After Ranger authorization, Must wait a period of time for the Ranger 
Spark plugin to update
+    // the policy Sleep time must be greater than the policy update interval
+    // (ranger.plugin.spark.policy.pollIntervalMs) in the
+    // `resources/ranger-spark-security.xml.template`
+    Thread.sleep(1000L);
+  }
+
+  protected abstract void checkTableAllPrivilegesExceptForCreating();
+
+  protected abstract void checkUpdateSQLWithReadWritePrivileges();
+
+  protected abstract void checkUpdateSQLWithReadPrivileges();
+
+  protected abstract void checkUpdateSQLWithWritePrivileges();
+
+  protected abstract void checkDeleteSQLWithReadWritePrivileges();
+
+  protected abstract void checkDeleteSQLWithReadPrivileges();
+
+  protected abstract void checkDeleteSQLWithWritePrivileges();
+
+  protected abstract void useCatalog() throws InterruptedException;
+
+  protected abstract void checkHaveNoPrivileges();
+
+  protected abstract void testAlterTable();
+
   @Test
   void testCreateSchema() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, fail to create the schema
     Assertions.assertThrows(
         AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
@@ -258,12 +223,15 @@ public class RangerHiveE2EIT extends BaseIT {
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
 
     // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testCreateTable() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, create a role for creating a database and grant role to the user
     String createSchemaRole = currentFunName();
     SecurableObject securableObject =
@@ -302,13 +270,16 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(createTableRole);
     metalake.deleteRole(createSchemaRole);
   }
 
   @Test
   void testReadWriteTableWithMetalakeLevelRole() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, create a role for creating a database and grant role to the user
     String readWriteRole = currentFunName();
     SecurableObject securableObject =
@@ -337,15 +308,14 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 2: Succeed to select data from the table
     sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
 
-    // case 3: Fail to update data in the table, Because Hive doesn't support.
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+    // case 3: Update data in the table
+    checkUpdateSQLWithReadWritePrivileges();
 
-    // case 4: Fail to delete data from the table, Because Hive doesn't 
support.
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+    // case 4:  Delete data from the table.
+    checkDeleteSQLWithReadWritePrivileges();
 
     // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
+    testAlterTable();
 
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
@@ -353,23 +323,18 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 7: If we don't have the role, we can't insert and select from data.
     metalake.deleteRole(readWriteRole);
     waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+    checkHaveNoPrivileges();
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testReadWriteTableWithTableLevelRole() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, create a role for creating a database and grant role to the user
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -407,15 +372,14 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 2: Succeed to select data from the table
     sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
 
-    // case 3: Fail to update data in the table, Because Hive doesn't support.
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+    // case 3: Update data in the table.
+    checkUpdateSQLWithReadWritePrivileges();
 
-    // case 4: Fail to delete data from the table, Because Hive doesn't 
support.
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+    // case 4: Delete data from the table.
+    checkDeleteSQLWithReadWritePrivileges();
 
     // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
+    testAlterTable();
 
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
@@ -423,23 +387,18 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 7: If we don't have the role, we can't insert and select from data.
     metalake.deleteRole(roleName);
     waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+    checkHaveNoPrivileges();
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testReadOnlyTable() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, create a role for creating a database and grant role to the user
     String readOnlyRole = currentFunName();
     SecurableObject securableObject =
@@ -467,12 +426,11 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 2: Succeed to select data from the table
     sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
 
-    // case 3: Fail to alter data in the table
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+    // case 3: Update data in the table
+    checkUpdateSQLWithReadPrivileges();
 
-    // case 4: Fail to delete data from the table
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+    // case 4: Delete data from the table
+    checkDeleteSQLWithReadPrivileges();
 
     // case 5: Fail to alter the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_ALTER_TABLE));
@@ -483,23 +441,18 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 7: If we don't have the role, we can't insert and select from data.
     metalake.deleteRole(readOnlyRole);
     waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+    checkHaveNoPrivileges();
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testWriteOnlyTable() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // First, create a role for creating a database and grant role to the user
     String writeOnlyRole = currentFunName();
     SecurableObject securableObject =
@@ -528,15 +481,14 @@ public class RangerHiveE2EIT extends BaseIT {
     Assertions.assertThrows(
         AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
 
-    // case 3: Succeed to update data in the table
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+    // case 3: Update data in the table
+    checkUpdateSQLWithWritePrivileges();
 
-    // case 4: Succeed to delete data from the table
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+    // case 4: Delete data from the table
+    checkDeleteSQLWithWritePrivileges();
 
     // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
+    testAlterTable();
 
     // case 6: Fail to drop the table
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
@@ -544,23 +496,19 @@ public class RangerHiveE2EIT extends BaseIT {
     // case 7: If we don't have the role, we can't insert and select from data.
     metalake.deleteRole(writeOnlyRole);
     waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+    checkHaveNoPrivileges();
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testCreateAllPrivilegesRole() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
+    // Create a role
     String roleName = currentFunName();
     SecurableObject securableObject =
         SecurableObjects.ofMetalake(
@@ -599,12 +547,15 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testDeleteAndRecreateRole() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Fail to create schema
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
 
@@ -624,7 +575,7 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Succeed to create the schema
     sparkSession.sql(SQL_CREATE_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
 
     // Delete the role
     metalake.deleteRole(roleName);
@@ -645,12 +596,15 @@ public class RangerHiveE2EIT extends BaseIT {
     sparkSession.sql(SQL_CREATE_SCHEMA);
 
     // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testDeleteAndRecreateMetadataObject() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a role with CREATE_SCHEMA privilege
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -678,7 +632,7 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Delete a schema
     sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     waitForUpdatingPolicies();
 
     // Recreate a schema
@@ -699,11 +653,14 @@ public class RangerHiveE2EIT extends BaseIT {
     sparkSession.sql(SQL_CREATE_SCHEMA);
 
     // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testRenameMetadataObject() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -733,12 +690,15 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testRenameMetadataObjectPrivilege() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -779,12 +739,15 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testChangeOwner() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a schema and a table
     String helperRole = currentFunName();
     SecurableObject securableObject =
@@ -810,27 +773,7 @@ public class RangerHiveE2EIT extends BaseIT {
     metalake.deleteRole(helperRole);
     waitForUpdatingPolicies();
 
-    // case 1. Have none of privileges of the table
-
-    // - a. Fail to insert data into the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-
-    // - b. Fail to select data from the table
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-
-    // - c: Fail to update data in the table
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // - d: Fail to delete data from the table
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // - e: Fail to alter the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_ALTER_TABLE));
-
-    // - f: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    checkHaveNoPrivileges();
 
     // case 2. user is the  table owner
     MetadataObject tableObject =
@@ -863,7 +806,7 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Succeed to drop schema
     sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     waitForUpdatingPolicies();
 
     // Fail to create schema
@@ -887,7 +830,7 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Succeed to drop schema
     sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     waitForUpdatingPolicies();
 
     metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, 
Owner.Type.USER);
@@ -911,11 +854,14 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
   }
 
   @Test
   void testAllowUseSchemaPrivilege() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a role with CREATE_SCHEMA privilege
     String roleName = currentFunName();
     SecurableObject securableObject =
@@ -969,13 +915,16 @@ public class RangerHiveE2EIT extends BaseIT {
 
     // Clean up
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.revokeRolesFromUser(Lists.newArrayList(roleName), userName1);
     metalake.deleteRole(roleName);
   }
 
   @Test
   void testDenyPrivileges() throws InterruptedException {
+    // Choose a catalog
+    useCatalog();
+
     // Create a schema
     catalog.asSchemas().createSchema(schemaName, "test", 
Collections.emptyMap());
 
@@ -1031,72 +980,7 @@ public class RangerHiveE2EIT extends BaseIT {
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
 
     // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
+    catalog.asSchemas().dropSchema(schemaName, false);
     metalake.deleteRole(roleName);
   }
-
-  private void createMetalake() {
-    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
-    Assertions.assertEquals(0, gravitinoMetalakes.length);
-
-    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
-    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
-    Assertions.assertEquals(metalakeName, loadMetalake.name());
-
-    metalake = loadMetalake;
-  }
-
-  private static void createCatalog() {
-    Map<String, String> properties =
-        ImmutableMap.of(
-            HiveConstants.METASTORE_URIS,
-            HIVE_METASTORE_URIS,
-            IMPERSONATION_ENABLE,
-            "true",
-            AUTHORIZATION_PROVIDER,
-            "ranger",
-            RANGER_SERVICE_NAME,
-            RangerITEnv.RANGER_HIVE_REPO_NAME,
-            AuthorizationPropertiesMeta.RANGER_ADMIN_URL,
-            RANGER_ADMIN_URL,
-            RANGER_AUTH_TYPE,
-            RangerContainer.authType,
-            RANGER_USERNAME,
-            RangerContainer.rangerUserName,
-            RANGER_PASSWORD,
-            RangerContainer.rangerPassword);
-
-    metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider, 
"comment", properties);
-    catalog = metalake.loadCatalog(catalogName);
-    LOG.info("Catalog created: {}", catalog);
-  }
-
-  private void checkTableAllPrivilegesExceptForCreating() {
-    // - a. Succeed to insert data into the table
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    // - b. Succeed to select data from the table
-    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
-
-    // - c: Fail to update data in the table. Because Hive doesn't support
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // - d: Fail to delete data from the table, Because Hive doesn't support
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // - e: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
-
-    // - f: Succeed to drop the table
-    sparkSession.sql(SQL_DROP_TABLE);
-  }
-
-  private static void waitForUpdatingPolicies() throws InterruptedException {
-    // After Ranger authorization, Must wait a period of time for the Ranger 
Spark plugin to update
-    // the policy Sleep time must be greater than the policy update interval
-    // (ranger.plugin.spark.policy.pollIntervalMs) in the
-    // `resources/ranger-spark-security.xml.template`
-    Thread.sleep(1000L);
-  }
 }
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
index 409ddf48e..7e3096a61 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
@@ -19,7 +19,6 @@
 package org.apache.gravitino.authorization.ranger.integration.test;
 
 import static org.apache.gravitino.Catalog.AUTHORIZATION_PROVIDER;
-import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.currentFunName;
 import static 
org.apache.gravitino.catalog.hive.HiveConstants.IMPERSONATION_ENABLE;
 import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_AUTH_TYPE;
 import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_PASSWORD;
@@ -28,106 +27,37 @@ import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_
 import static 
org.apache.gravitino.integration.test.container.RangerContainer.RANGER_SERVER_PORT;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Configs;
-import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
-import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.auth.AuthenticatorType;
-import org.apache.gravitino.authorization.Owner;
-import org.apache.gravitino.authorization.Privileges;
-import org.apache.gravitino.authorization.SecurableObject;
-import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
 import org.apache.gravitino.integration.test.container.HiveContainer;
 import org.apache.gravitino.integration.test.container.RangerContainer;
-import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
-import org.apache.gravitino.rel.TableChange;
 import org.apache.kyuubi.plugin.spark.authz.AccessControlException;
 import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Tag("gravitino-docker-test")
-public class RangerHiveE2EIT extends BaseIT {
+public class RangerHiveE2EIT extends RangerBaseE2EIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(RangerHiveE2EIT.class);
 
-  public static final String metalakeName =
-      GravitinoITUtils.genRandomName("metalake").toLowerCase();
-  public static final String catalogName = 
GravitinoITUtils.genRandomName("catalog").toLowerCase();
-  public static final String schemaName = 
GravitinoITUtils.genRandomName("schema").toLowerCase();
-
-  public static final String tableName = 
GravitinoITUtils.genRandomName("table").toLowerCase();
-
-  private static GravitinoMetalake metalake;
-  private static Catalog catalog;
   private static final String provider = "hive";
-  private static String HIVE_METASTORE_URIS;
-
-  private static SparkSession sparkSession = null;
-  private static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
-
-  private static final String SQL_SHOW_DATABASES =
-      String.format("SHOW DATABASES like '%s'", schemaName);
-
-  private static final String SQL_CREATE_SCHEMA = String.format("CREATE 
DATABASE %s", schemaName);
-
-  private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE 
%s", schemaName);
-
-  private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", 
schemaName);
-
-  private static final String SQL_CREATE_TABLE =
-      String.format("CREATE TABLE %s (a int, b string, c string)", tableName);
-
-  private static final String SQL_INSERT_TABLE =
-      String.format("INSERT INTO %s (a, b, c) VALUES (1, 'a', 'b')", 
tableName);
-
-  private static final String SQL_SELECT_TABLE = String.format("SELECT * FROM 
%s", tableName);
-
-  private static final String SQL_UPDATE_TABLE =
-      String.format("UPDATE %s SET b = 'b', c = 'c' WHERE a = 1", tableName);
-
-  private static final String SQL_DELETE_TABLE =
-      String.format("DELETE FROM %s WHERE a = 1", tableName);
-
-  private static final String SQL_ALTER_TABLE =
-      String.format("ALTER TABLE %s ADD COLUMN d string", tableName);
-
-  private static final String SQL_RENAME_TABLE =
-      String.format("ALTER TABLE %s RENAME TO new_table", tableName);
-
-  private static final String SQL_RENAME_BACK_TABLE =
-      String.format("ALTER TABLE new_table RENAME TO %s", tableName);
-
-  private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", 
tableName);
-
-  private static String RANGER_ADMIN_URL = null;
 
   @BeforeAll
   public void startIntegrationTest() throws Exception {
+    metalakeName = GravitinoITUtils.genRandomName("metalake").toLowerCase();
     // Enable Gravitino Authorization mode
     Map<String, String> configs = Maps.newHashMap();
     configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
@@ -175,184 +105,22 @@ public class RangerHiveE2EIT extends BaseIT {
     createMetalake();
     createCatalog();
 
-    metalake.addUser("test");
-  }
-
-  private static void generateRangerSparkSecurityXML() throws IOException {
-    String templatePath =
-        String.join(
-            File.separator,
-            System.getenv("GRAVITINO_ROOT_DIR"),
-            "authorizations",
-            "authorization-ranger",
-            "src",
-            "test",
-            "resources",
-            "ranger-spark-security.xml.template");
-    String xmlPath =
-        String.join(
-            File.separator,
-            System.getenv("GRAVITINO_ROOT_DIR"),
-            "authorizations",
-            "authorization-ranger",
-            "build",
-            "resources",
-            "test",
-            "ranger-spark-security.xml");
-
-    String templateContext =
-        FileUtils.readFileToString(new File(templatePath), 
StandardCharsets.UTF_8);
-    templateContext =
-        templateContext
-            .replace("__REPLACE__RANGER_ADMIN_URL", RANGER_ADMIN_URL)
-            .replace("__REPLACE__RANGER_HIVE_REPO_NAME", 
RangerITEnv.RANGER_HIVE_REPO_NAME);
-    FileUtils.writeStringToFile(new File(xmlPath), templateContext, 
StandardCharsets.UTF_8);
+    RangerITEnv.cleanup();
+    metalake.addUser(System.getenv(HADOOP_USER_NAME));
   }
 
   @AfterAll
   public void stop() {
-    if (client != null) {
-      Arrays.stream(catalog.asSchemas().listSchemas())
-          .filter(schema -> !schema.equals("default"))
-          .forEach(
-              (schema -> {
-                catalog.asSchemas().dropSchema(schema, true);
-              }));
-      Arrays.stream(metalake.listCatalogs())
-          .forEach((catalogName -> metalake.dropCatalog(catalogName, true)));
-      client.disableMetalake(metalakeName);
-      client.dropMetalake(metalakeName);
-    }
-    if (sparkSession != null) {
-      sparkSession.close();
-    }
-    try {
-      closer.close();
-    } catch (Exception e) {
-      LOG.error("Failed to close CloseableGroup", e);
-    }
-    client = null;
-    RangerITEnv.cleanup();
+    cleanIT();
   }
 
-  @Test
-  void testCreateSchema() throws InterruptedException {
-    // First, fail to create the schema
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-
-    // Second, grant the `CREATE_SCHEMA` role
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName, Lists.newArrayList(Privileges.CreateSchema.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // Third, succeed to create the schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Fourth, fail to create the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
+  @Override
+  protected void useCatalog() throws InterruptedException {
+    // Do nothing, default catalog is ok for Hive.
   }
 
-  @Test
-  void testCreateTable() throws InterruptedException {
-    // First, create a role for creating a database and grant role to the user
-    String createSchemaRole = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(Privileges.UseSchema.allow(), 
Privileges.CreateSchema.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(
-        createSchemaRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(createSchemaRole), userName1);
-    waitForUpdatingPolicies();
-    // Second, create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Third, fail to create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Fourth, create a role for creating a table and grant to the user
-    String createTableRole = currentFunName() + "2";
-    securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName, Lists.newArrayList(Privileges.CreateTable.allow()));
-    metalake.createRole(
-        createTableRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(createTableRole), userName1);
-    waitForUpdatingPolicies();
-
-    // Fifth, succeed to create a table
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Sixth, fail to read and write a table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(createTableRole);
-    metalake.deleteRole(createSchemaRole);
-  }
-
-  @Test
-  void testReadWriteTableWithMetalakeLevelRole() throws InterruptedException {
-    // First, create a role for creating a database and grant role to the user
-    String readWriteRole = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.SelectTable.allow(),
-                Privileges.ModifyTable.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(readWriteRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(readWriteRole), userName1);
-    waitForUpdatingPolicies();
-    // Second, create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Third, create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // case 1: Succeed to insert data into table
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    // case 2: Succeed to select data from the table
-    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
-
-    // case 3: Fail to update data in the table, Because Hive doesn't support.
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // case 4: Fail to delete data from the table, Because Hive doesn't 
support.
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
-
-    // case 6: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-
-    // case 7: If we don't have the role, we can't insert and select from data.
-    metalake.deleteRole(readWriteRole);
-    waitForUpdatingPolicies();
+  @Override
+  protected void checkHaveNoPrivileges() {
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
     Assertions.assertThrows(
         AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
@@ -362,688 +130,44 @@ public class RangerHiveE2EIT extends BaseIT {
     Assertions.assertThrows(
         AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
     Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
   }
 
-  @Test
-  void testReadWriteTableWithTableLevelRole() throws InterruptedException {
-    // First, create a role for creating a database and grant role to the user
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-    // Second, create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Third, create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Fourth, revoke and grant a table level role
-    metalake.deleteRole(roleName);
-    securableObject =
-        SecurableObjects.parse(
-            String.format("%s.%s.%s", catalogName, schemaName, tableName),
-            MetadataObject.Type.TABLE,
-            Lists.newArrayList(Privileges.ModifyTable.allow(), 
Privileges.SelectTable.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // case 1: Succeed to insert data into table
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    // case 2: Succeed to select data from the table
-    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
-
-    // case 3: Fail to update data in the table, Because Hive doesn't support.
+  @Override
+  protected void checkUpdateSQLWithReadWritePrivileges() {
     Assertions.assertThrows(
         SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // case 4: Fail to delete data from the table, Because Hive doesn't 
support.
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
-
-    // case 6: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-
-    // case 7: If we don't have the role, we can't insert and select from data.
-    metalake.deleteRole(roleName);
-    waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
   }
 
-  @Test
-  void testReadOnlyTable() throws InterruptedException {
-    // First, create a role for creating a database and grant role to the user
-    String readOnlyRole = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.SelectTable.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(readOnlyRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(readOnlyRole), userName1);
-    waitForUpdatingPolicies();
-    // Second, create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Third, create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // case 1: Fail to insert data into table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-
-    // case 2: Succeed to select data from the table
-    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
-
-    // case 3: Fail to alter data in the table
+  @Override
+  protected void checkUpdateSQLWithReadPrivileges() {
     Assertions.assertThrows(
         SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // case 4: Fail to delete data from the table
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // case 5: Fail to alter the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_ALTER_TABLE));
-
-    // case 6: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-
-    // case 7: If we don't have the role, we can't insert and select from data.
-    metalake.deleteRole(readOnlyRole);
-    waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
   }
 
-  @Test
-  void testWriteOnlyTable() throws InterruptedException {
-    // First, create a role for creating a database and grant role to the user
-    String writeOnlyRole = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.ModifyTable.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(writeOnlyRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(writeOnlyRole), userName1);
-    waitForUpdatingPolicies();
-    // Second, create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Third, create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // case 1: Succeed to insert data into the table
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    // case 2: Fail to select data from the table
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-
-    // case 3: Succeed to update data in the table
+  @Override
+  protected void checkUpdateSQLWithWritePrivileges() {
     Assertions.assertThrows(
         SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // case 4: Succeed to delete data from the table
-    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // case 5: Succeed to alter the table
-    sparkSession.sql(SQL_ALTER_TABLE);
-
-    // case 6: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-
-    // case 7: If we don't have the role, we can't insert and select from data.
-    metalake.deleteRole(writeOnlyRole);
-    waitForUpdatingPolicies();
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-  }
-
-  @Test
-  void testCreateAllPrivilegesRole() throws InterruptedException {
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.CreateCatalog.allow(),
-                Privileges.UseCatalog.allow(),
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateFileset.allow(),
-                Privileges.ReadFileset.allow(),
-                Privileges.WriteFileset.allow(),
-                Privileges.CreateTopic.allow(),
-                Privileges.ConsumeTopic.allow(),
-                Privileges.ProduceTopic.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.SelectTable.allow(),
-                Privileges.ModifyTable.allow(),
-                Privileges.ManageUsers.allow(),
-                Privileges.ManageGroups.allow(),
-                Privileges.CreateRole.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-
-    waitForUpdatingPolicies();
-
-    // Test to create the schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Test to create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
   }
 
-  @Test
-  void testDeleteAndRecreateRole() throws InterruptedException {
-    // Fail to create schema
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-
-    // Create a role with CREATE_SCHEMA privilege
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(Privileges.UseCatalog.allow(), 
Privileges.CreateSchema.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // Succeed to create the schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
-
-    // Delete the role
-    metalake.deleteRole(roleName);
-    waitForUpdatingPolicies();
-
-    // Fail to create the schema
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-
-    // Create the role again
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-
-    // Grant the role again
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // Succeed to create the schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
-  }
-
-  @Test
-  void testDeleteAndRecreateMetadataObject() throws InterruptedException {
-    // Create a role with CREATE_SCHEMA privilege
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(Privileges.CreateSchema.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // Create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-
-    // Set owner
-    MetadataObject schemaObject =
-        MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
-    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-
-    // Delete a schema
-    sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
-    waitForUpdatingPolicies();
-
-    // Recreate a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
-
-    // Set owner
-    schemaObject = MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
-    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-    sparkSession.sql(SQL_DROP_SCHEMA);
-
-    // Delete the role and fail to create schema
-    metalake.deleteRole(roleName);
-    waitForUpdatingPolicies();
-
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
-  }
-
-  @Test
-  void testRenameMetadataObject() throws InterruptedException {
-    // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(
-                Privileges.UseCatalog.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.ModifyTable.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-
-    waitForUpdatingPolicies();
-
-    // Create a schema and a table
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Rename a table and rename back
-    sparkSession.sql(SQL_RENAME_TABLE);
-    sparkSession.sql(SQL_RENAME_BACK_TABLE);
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
-  }
-
-  @Test
-  void testRenameMetadataObjectPrivilege() throws InterruptedException {
-    // Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(
-                Privileges.UseCatalog.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.ModifyTable.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-
-    waitForUpdatingPolicies();
-
-    // Create a schema and a table
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Rename a table and rename back
-    catalog
-        .asTableCatalog()
-        .alterTable(NameIdentifier.of(schemaName, tableName), 
TableChange.rename("new_table"));
-
-    // Succeed to insert data
-    sparkSession.sql("INSERT INTO new_table (a, b, c) VALUES (1, 'a', 'b')");
-
-    catalog
-        .asTableCatalog()
-        .alterTable(NameIdentifier.of(schemaName, "new_table"), 
TableChange.rename(tableName));
-
-    // Succeed to insert data
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
-  }
-
-  @Test
-  void testChangeOwner() throws InterruptedException {
-    // Create a schema and a table
-    String helperRole = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.ofMetalake(
-            metalakeName,
-            Lists.newArrayList(
-                Privileges.UseSchema.allow(),
-                Privileges.CreateSchema.allow(),
-                Privileges.CreateTable.allow(),
-                Privileges.ModifyTable.allow()));
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.createRole(helperRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-    metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
-    waitForUpdatingPolicies();
-
-    // Create a schema and a table
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-    sparkSession.sql(SQL_USE_SCHEMA);
-    sparkSession.sql(SQL_CREATE_TABLE);
-    sparkSession.sql(SQL_INSERT_TABLE);
-
-    metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
-    metalake.deleteRole(helperRole);
-    waitForUpdatingPolicies();
-
-    // case 1. Have none of privileges of the table
-
-    // - a. Fail to insert data into the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
-
-    // - b. Fail to select data from the table
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
-
-    // - c: Fail to update data in the table
-    Assertions.assertThrows(
-        SparkUnsupportedOperationException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
-
-    // - d: Fail to delete data from the table
+  @Override
+  protected void checkDeleteSQLWithReadWritePrivileges() {
     Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
-
-    // - e: Fail to alter the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_ALTER_TABLE));
-
-    // - f: Fail to drop the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
-
-    // case 2. user is the  table owner
-    MetadataObject tableObject =
-        MetadataObjects.of(
-            Lists.newArrayList(catalogName, schemaName, tableName), 
MetadataObject.Type.TABLE);
-    metalake.setOwner(tableObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-
-    // Owner has all the privileges except for creating table
-    checkTableAllPrivilegesExceptForCreating();
-
-    // Delete Gravitino's meta data
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    waitForUpdatingPolicies();
-
-    // Fail to create the table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // case 3. user is the schema owner
-    MetadataObject schemaObject =
-        MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
-    metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-
-    // Succeed to create a table
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Succeed to check other table privileges
-    checkTableAllPrivilegesExceptForCreating();
-
-    // Succeed to drop schema
-    sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
-    waitForUpdatingPolicies();
-
-    // Fail to create schema
-    Assertions.assertThrows(
-        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
-
-    // case 4. user is the catalog owner
-    MetadataObject catalogObject =
-        MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
-    metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-
-    // Succeed to create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Succeed to create a table
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Succeed to check other table privileges
-    checkTableAllPrivilegesExceptForCreating();
-
-    // Succeed to drop schema
-    sparkSession.sql(SQL_DROP_SCHEMA);
-    catalog.asSchemas().dropSchema(schemaName, true);
-    waitForUpdatingPolicies();
-
-    metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, 
Owner.Type.USER);
-    // case 5. user is the metalake owner
-    MetadataObject metalakeObject =
-        MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
-    metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
-    waitForUpdatingPolicies();
-
-    // Succeed to create a schema
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Succeed to create a table
-    sparkSession.sql(SQL_CREATE_TABLE);
-
-    // Succeed to check other table privileges
-    checkTableAllPrivilegesExceptForCreating();
-
-    // Succeed to drop schema
-    sparkSession.sql(SQL_DROP_SCHEMA);
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
   }
 
-  @Test
-  void testAllowUseSchemaPrivilege() throws InterruptedException {
-    // Create a role with CREATE_SCHEMA privilege
-    String roleName = currentFunName();
-    SecurableObject securableObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(Privileges.CreateSchema.allow()));
-    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // create a schema use Gravitino client
-    sparkSession.sql(SQL_CREATE_SCHEMA);
-
-    // Revoke the privilege of creating schema
-    MetadataObject catalogObject =
-        MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
-    metalake.revokePrivilegesFromRole(
-        roleName, catalogObject, 
Lists.newArrayList(Privileges.CreateSchema.allow()));
-    waitForUpdatingPolicies();
-
-    // Use Spark to show this databases(schema)
-    Dataset dataset1 = sparkSession.sql(SQL_SHOW_DATABASES);
-    dataset1.show();
-    List<Row> rows1 = dataset1.collectAsList();
-    // The schema should not be shown, because the user does not have the 
permission
-    Assertions.assertEquals(
-        0, rows1.stream().filter(row -> 
row.getString(0).equals(schemaName)).count());
-
-    // Grant the privilege of using schema
-    MetadataObject schemaObject =
-        MetadataObjects.of(catalogName, schemaName, 
MetadataObject.Type.SCHEMA);
-    metalake.grantPrivilegesToRole(
-        roleName, schemaObject, 
Lists.newArrayList(Privileges.UseSchema.allow()));
-    waitForUpdatingPolicies();
-
-    // Use Spark to show this databases(schema) again
-    Dataset dataset2 = sparkSession.sql(SQL_SHOW_DATABASES);
-    dataset2.show(100, 100);
-    List<Row> rows2 = dataset2.collectAsList();
-    rows2.stream()
-        .filter(row -> row.getString(0).equals(schemaName))
-        .findFirst()
-        .orElseThrow(() -> new IllegalStateException("Database not found: " + 
schemaName));
-    // The schema should be shown, because the user has the permission
-    Assertions.assertEquals(
-        1, rows2.stream().filter(row -> 
row.getString(0).equals(schemaName)).count());
-
-    // Clean up
-    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.revokeRolesFromUser(Lists.newArrayList(roleName), userName1);
-    metalake.deleteRole(roleName);
+  @Override
+  protected void checkDeleteSQLWithReadPrivileges() {
+    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
   }
 
-  @Test
-  void testDenyPrivileges() throws InterruptedException {
-    // Create a schema
-    catalog.asSchemas().createSchema(schemaName, "test", 
Collections.emptyMap());
-
-    // Create a role with CREATE_SCHEMA privilege
-    String roleName = currentFunName();
-    SecurableObject allowObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(Privileges.UseSchema.allow(), 
Privileges.CreateTable.allow()));
-    SecurableObject denyObject =
-        SecurableObjects.parse(
-            String.format("%s.%s", catalogName, schemaName),
-            MetadataObject.Type.SCHEMA,
-            Lists.newArrayList(Privileges.CreateTable.deny()));
-    // Create a role, catalog allows to create a table, schema denies to 
create a table
-    metalake.createRole(
-        roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, 
denyObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    String userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-    waitForUpdatingPolicies();
-
-    // Fail to create a table
-    sparkSession.sql(SQL_USE_SCHEMA);
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Delete the role
-    metalake.deleteRole(roleName);
-
-    // Create another role, but catalog denies to create a table, schema 
allows to create a table
-    allowObject =
-        SecurableObjects.parse(
-            String.format("%s", catalogName),
-            MetadataObject.Type.CATALOG,
-            Lists.newArrayList(Privileges.CreateTable.deny()));
-    denyObject =
-        SecurableObjects.parse(
-            String.format("%s.%s", catalogName, schemaName),
-            MetadataObject.Type.SCHEMA,
-            Lists.newArrayList(Privileges.CreateTable.allow()));
-    metalake.createRole(
-        roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, 
denyObject));
-
-    // Granted this role to the spark execution user `HADOOP_USER_NAME`
-    userName1 = System.getenv(HADOOP_USER_NAME);
-    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
-
-    waitForUpdatingPolicies();
-
-    // Fail to create a table
-    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
-
-    // Clean up
-    catalog.asSchemas().dropSchema(schemaName, true);
-    metalake.deleteRole(roleName);
+  @Override
+  protected void checkDeleteSQLWithWritePrivileges() {
+    Assertions.assertThrows(AnalysisException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
   }
 
-  private void createMetalake() {
-    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
-    Assertions.assertEquals(0, gravitinoMetalakes.length);
-
-    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
-    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
-    Assertions.assertEquals(metalakeName, loadMetalake.name());
-
-    metalake = loadMetalake;
+  @Override
+  protected void testAlterTable() {
+    sparkSession.sql(SQL_ALTER_TABLE);
   }
 
   private static void createCatalog() {
@@ -1071,7 +195,7 @@ public class RangerHiveE2EIT extends BaseIT {
     LOG.info("Catalog created: {}", catalog);
   }
 
-  private void checkTableAllPrivilegesExceptForCreating() {
+  protected void checkTableAllPrivilegesExceptForCreating() {
     // - a. Succeed to insert data into the table
     sparkSession.sql(SQL_INSERT_TABLE);
 
@@ -1091,12 +215,4 @@ public class RangerHiveE2EIT extends BaseIT {
     // - f: Succeed to drop the table
     sparkSession.sql(SQL_DROP_TABLE);
   }
-
-  private static void waitForUpdatingPolicies() throws InterruptedException {
-    // After Ranger authorization, Must wait a period of time for the Ranger 
Spark plugin to update
-    // the policy Sleep time must be greater than the policy update interval
-    // (ranger.plugin.spark.policy.pollIntervalMs) in the
-    // `resources/ranger-spark-security.xml.template`
-    Thread.sleep(1000L);
-  }
 }
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
index fdc2d8fab..31ae3974d 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Role;
-import org.apache.gravitino.authorization.ranger.RangerAuthorizationHivePlugin;
+import 
org.apache.gravitino.authorization.ranger.RangerAuthorizationHadoopSQLPlugin;
 import org.apache.gravitino.authorization.ranger.RangerAuthorizationPlugin;
 import org.apache.gravitino.authorization.ranger.RangerHelper;
 import org.apache.gravitino.authorization.ranger.RangerPrivileges;
@@ -89,7 +89,7 @@ public class RangerITEnv {
     rangerClient = containerSuite.getRangerContainer().rangerClient;
 
     rangerAuthHivePlugin =
-        RangerAuthorizationHivePlugin.getInstance(
+        RangerAuthorizationHadoopSQLPlugin.getInstance(
             ImmutableMap.of(
                 AuthorizationPropertiesMeta.RANGER_ADMIN_URL,
                 String.format(
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
new file mode 100644
index 000000000..648a9c4d7
--- /dev/null
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger.integration.test;
+
+import static org.apache.gravitino.Catalog.AUTHORIZATION_PROVIDER;
+import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.currentFunName;
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.IMPERSONATION_ENABLE;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_AUTH_TYPE;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_PASSWORD;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_SERVICE_NAME;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_USERNAME;
+import static 
org.apache.gravitino.integration.test.container.RangerContainer.RANGER_SERVER_PORT;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.container.RangerContainer;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.kyuubi.plugin.spark.authz.AccessControlException;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class RangerIcebergE2EIT extends RangerBaseE2EIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerIcebergE2EIT.class);
+  private static final String SQL_USE_CATALOG = "USE iceberg";
+  private static final String provider = "lakehouse-iceberg";
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    metalakeName = GravitinoITUtils.genRandomName("metalake").toLowerCase();
+    // Enable Gravitino Authorization mode
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
+    configs.put(Configs.SERVICE_ADMINS.getKey(), RangerITEnv.HADOOP_USER_NAME);
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.SIMPLE.name().toLowerCase());
+    configs.put("SimpleAuthUserName", AuthConstants.ANONYMOUS_USER);
+    registerCustomConfigs(configs);
+    super.startIntegrationTest();
+
+    RangerITEnv.init();
+    RangerITEnv.startHiveRangerContainer();
+
+    RANGER_ADMIN_URL =
+        String.format(
+            "http://%s:%d";,
+            containerSuite.getRangerContainer().getContainerIpAddress(), 
RANGER_SERVER_PORT);
+
+    HIVE_METASTORE_URIS =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveRangerContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+
+    generateRangerSparkSecurityXML();
+
+    sparkSession =
+        SparkSession.builder()
+            .master("local[1]")
+            .appName("Ranger Hive E2E integration test")
+            .config("spark.sql.catalog.iceberg", 
"org.apache.iceberg.spark.SparkCatalog")
+            .config("spark.sql.catalog.iceberg.type", "hive")
+            .config("spark.sql.catalog.iceberg.uri", HIVE_METASTORE_URIS)
+            .config("spark.sql.catalog.iceberg.cache-enabled", "false")
+            .config(
+                "spark.sql.extensions",
+                
"org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,"
+                    + 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+            .enableHiveSupport()
+            .getOrCreate();
+
+    createMetalake();
+    createCatalog();
+
+    metalake.addUser(System.getenv(HADOOP_USER_NAME));
+
+    RangerITEnv.cleanup();
+    waitForUpdatingPolicies();
+  }
+
+  @AfterAll
+  public void stop() {
+    cleanIT();
+  }
+
+  @Override
+  protected void checkUpdateSQLWithReadWritePrivileges() {
+    sparkSession.sql(SQL_UPDATE_TABLE);
+  }
+
+  @Override
+  protected void checkUpdateSQLWithReadPrivileges() {
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+  }
+
+  @Override
+  protected void checkUpdateSQLWithWritePrivileges() {
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+  }
+
+  @Override
+  protected void checkDeleteSQLWithReadWritePrivileges() {
+    sparkSession.sql(SQL_DELETE_TABLE);
+  }
+
+  @Override
+  protected void checkDeleteSQLWithReadPrivileges() {
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+  }
+
+  @Override
+  protected void checkDeleteSQLWithWritePrivileges() {
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+  }
+
+  @Override
+  protected void checkHaveNoPrivileges() {
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_INSERT_TABLE));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DROP_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_USE_SCHEMA));
+    Assertions.assertThrows(
+        AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_SCHEMA));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_CREATE_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_DELETE_TABLE));
+    Assertions.assertThrows(AccessControlException.class, () -> 
sparkSession.sql(SQL_UPDATE_TABLE));
+  }
+
+  @Override
+  protected void testAlterTable() {
+    sparkSession.sql(SQL_ALTER_TABLE);
+    sparkSession.sql(SQL_ALTER_TABLE_BACK);
+  }
+
+  private static void createCatalog() {
+    Map<String, String> properties =
+        ImmutableMap.of(
+            IcebergConstants.URI,
+            HIVE_METASTORE_URIS,
+            IcebergConstants.CATALOG_BACKEND,
+            "hive",
+            IcebergConstants.WAREHOUSE,
+            String.format(
+                "hdfs://%s:%d/user/hive/warehouse",
+                
containerSuite.getHiveRangerContainer().getContainerIpAddress(),
+                HiveContainer.HDFS_DEFAULTFS_PORT),
+            IMPERSONATION_ENABLE,
+            "true",
+            AUTHORIZATION_PROVIDER,
+            "ranger",
+            RANGER_SERVICE_NAME,
+            RangerITEnv.RANGER_HIVE_REPO_NAME,
+            AuthorizationPropertiesMeta.RANGER_ADMIN_URL,
+            RANGER_ADMIN_URL,
+            RANGER_AUTH_TYPE,
+            RangerContainer.authType,
+            RANGER_USERNAME,
+            RangerContainer.rangerUserName,
+            RANGER_PASSWORD,
+            RangerContainer.rangerPassword);
+
+    metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider, 
"comment", properties);
+    catalog = metalake.loadCatalog(catalogName);
+    LOG.info("Catalog created: {}", catalog);
+  }
+
+  @Override
+  protected void useCatalog() throws InterruptedException {
+    String userName1 = System.getenv(HADOOP_USER_NAME);
+    String roleName = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.ofMetalake(
+            metalakeName, Lists.newArrayList(Privileges.UseCatalog.allow()));
+    metalake.createRole(roleName, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
+    waitForUpdatingPolicies();
+    sparkSession.sql(SQL_USE_CATALOG);
+    metalake.deleteRole(roleName);
+    waitForUpdatingPolicies();
+  }
+
+  protected void checkTableAllPrivilegesExceptForCreating() {
+    // - a. Succeed to insert data into the table
+    sparkSession.sql(SQL_INSERT_TABLE);
+
+    // - b. Succeed to select data from the table
+    sparkSession.sql(SQL_SELECT_TABLE).collectAsList();
+
+    // - c: Succeed to update data in the table.
+    sparkSession.sql(SQL_UPDATE_TABLE);
+
+    // - d: Succeed to delete data from the table.
+    sparkSession.sql(SQL_DELETE_TABLE);
+
+    // - e: Succeed to alter the table
+    sparkSession.sql(SQL_ALTER_TABLE);
+
+    // - f: Succeed to drop the table
+    sparkSession.sql(SQL_DROP_TABLE);
+  }
+}
diff --git a/docs/security/access-control.md b/docs/security/access-control.md
index afdf921bf..6b47a2541 100644
--- a/docs/security/access-control.md
+++ b/docs/security/access-control.md
@@ -202,11 +202,11 @@ and `USE_SCHEMA` privileges on its parent schema.
 
 ### Table privileges
 
-| Name         | Supports Securable Object         | Operation                 
                     |
-|--------------|-----------------------------------|------------------------------------------------|
-| CREATE_TABLE | Metalake, Catalog, Schema         | Create a table            
                     |
-| MODIFY_TABLE | Metalake, Catalog, Schema, Table  | Use the SQL 
`UPDATE`,`DELETE`,`INSERT` a table |
-| SELECT_TABLE | Metalake, Catalog, Schema, Table  | Use the SQL `SELECT` data 
from a table         |
+| Name         | Supports Securable Object         | Operation                 
                       |
+|--------------|-----------------------------------|--------------------------------------------------|
+| CREATE_TABLE | Metalake, Catalog, Schema         | Create a table            
                       |
+| MODIFY_TABLE | Metalake, Catalog, Schema, Table  | Write data to a table or 
modify the table schema |
+| SELECT_TABLE | Metalake, Catalog, Schema, Table  | Select data from a table  
                       |
 
 ### Topic privileges
 
diff --git a/docs/security/authorization-pushdown.md 
b/docs/security/authorization-pushdown.md
index 148e76b5f..2d93305f5 100644
--- a/docs/security/authorization-pushdown.md
+++ b/docs/security/authorization-pushdown.md
@@ -43,6 +43,36 @@ authorization.ranger.password=PWD123
 authorization.ranger.service.name=hiveRepo
 ```
 
+### Authorization Iceberg with Ranger properties
+
+In order to use the Authorization Ranger Iceberg Plugin, you need to configure 
the following properties and [Lakehouse_Iceberg catalog 
properties](../lakehouse-iceberg-catalog.md#catalog-properties):
+
+| Property Name                       | Description                            
                                                                                
                              | Default Value | Required | Since Version    |
+|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|------------------|
+| `authorization-provider`            | Providers to use to implement 
authorization plugin such as `ranger`.                                          
                                       | (none)        | No       | 
0.8.0-incubating |
+| `authorization.ranger.admin.url`    | The Apache Ranger web URIs.            
                                                                                
                              | (none)        | No       | 0.8.0-incubating |
+| `authorization.ranger.auth.type`    | The Apache Ranger authentication type 
`simple` or `kerberos`.                                                         
                               | `simple`      | No       | 0.8.0-incubating |
+| `authorization.ranger.username`     | The Apache Ranger admin web login 
username (auth type=simple), or kerberos principal(auth type=kerberos), Need 
have Ranger administrator permission. | (none)        | No       | 
0.8.0-incubating |
+| `authorization.ranger.password`     | The Apache Ranger admin web login user 
password (auth type=simple), or path of the keytab file(auth type=kerberos)     
                              | (none)        | No       | 0.8.0-incubating |
+| `authorization.ranger.service.name` | The Apache Ranger service name.        
                                                                                
                              | (none)        | No       | 0.8.0-incubating |
+
+Once you have used the correct configuration, you can perform authorization 
operations by calling Gravitino [authorization RESTful 
API](https://gravitino.apache.org/docs/latest/api/rest/grant-roles-to-a-user).
+
+#### Example of using the Authorization Ranger Iceberg Plugin
+
+Suppose you have an Apache Hive service in your datacenter and have created a 
`icebergRepo` in Apache Ranger to manage its permissions.
+The Ranger service is accessible at `172.0.0.100:6080`, with the username 
`Jack` and the password `PWD123`.
+To add this Hive service to Gravitino using the Hive catalog, you'll need to 
configure the following parameters.
+
+```properties
+authorization-provider=ranger
+authorization.ranger.admin.url=172.0.0.100:6080
+authorization.ranger.auth.type=simple
+authorization.ranger.username=Jack
+authorization.ranger.password=PWD123
+authorization.ranger.service.name=icebergRepo
+```
+
 :::caution
-Gravitino 0.6.0 only supports the authorization Apache Ranger Hive service and 
more data source authorization is under development.
+Gravitino 0.8.0 only supports the authorization Apache Ranger Hive service and 
Apache Iceberg service. More data source authorization is under development.
 :::
\ No newline at end of file

Reply via email to