This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a4c863091 [#5661] feat(auth): Add JDBC authorization plugin interface 
(#5904)
a4c863091 is described below

commit a4c86309142d241ed54d38dcf7aa916a23601f17
Author: roryqi <ror...@apache.org>
AuthorDate: Tue Dec 24 16:22:25 2024 +0800

    [#5661] feat(auth): Add JDBC authorization plugin interface (#5904)
    
    ### What changes were proposed in this pull request?
    
    Add JDBC authorization plugin interface
    
    ### Why are the changes needed?
    
    Fix: #5661
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Add a UT
---
 .../workflows/access-control-integration-test.yml  |   3 +
 authorizations/authorization-jdbc/build.gradle.kts |  94 +++++
 .../jdbc/JdbcAuthorizationPlugin.java              | 461 +++++++++++++++++++++
 .../jdbc/JdbcAuthorizationProperties.java          |  44 ++
 .../authorization/jdbc/JdbcAuthorizationSQL.java   | 117 ++++++
 .../authorization/jdbc/JdbcMetadataObject.java     | 106 +++++
 .../authorization/jdbc/JdbcPrivilege.java          |  55 +++
 .../authorization/jdbc/JdbcSecurableObject.java    |  65 +++
 .../jdbc/JdbcSecurableObjectMappingProvider.java   | 212 ++++++++++
 .../jdbc/JdbcAuthorizationPluginTest.java          | 317 ++++++++++++++
 settings.gradle.kts                                |   2 +-
 11 files changed, 1475 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/access-control-integration-test.yml 
b/.github/workflows/access-control-integration-test.yml
index 54ffde2ee..6997eaf9a 100644
--- a/.github/workflows/access-control-integration-test.yml
+++ b/.github/workflows/access-control-integration-test.yml
@@ -90,6 +90,9 @@ jobs:
           ./gradlew -PtestMode=embedded -PjdbcBackend=h2 -PjdkVersion=${{ 
matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-ranger:test
           ./gradlew -PtestMode=deploy -PjdbcBackend=mysql -PjdkVersion=${{ 
matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-ranger:test
           ./gradlew -PtestMode=deploy -PjdbcBackend=postgresql 
-PjdkVersion=${{ matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-ranger:test
+          ./gradlew -PtestMode=embedded -PjdbcBackend=h2 -PjdkVersion=${{ 
matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-jdbc:test
+          ./gradlew -PtestMode=deploy -PjdbcBackend=mysql -PjdkVersion=${{ 
matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-jdbc:test
+          ./gradlew -PtestMode=deploy -PjdbcBackend=postgresql 
-PjdkVersion=${{ matrix.java-version }} -PskipDockerTests=false 
:authorizations:authorization-jdbc:test
 
       - name: Upload integrate tests reports
         uses: actions/upload-artifact@v3
diff --git a/authorizations/authorization-jdbc/build.gradle.kts 
b/authorizations/authorization-jdbc/build.gradle.kts
new file mode 100644
index 000000000..8b105908c
--- /dev/null
+++ b/authorizations/authorization-jdbc/build.gradle.kts
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+description = "authorization-jdbc"
+
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(project(":api")) {
+    exclude(group = "*")
+  }
+  implementation(project(":core")) {
+    exclude(group = "*")
+  }
+
+  implementation(libs.bundles.log4j)
+  implementation(libs.commons.lang3)
+  implementation(libs.guava)
+  implementation(libs.javax.jaxb.api) {
+    exclude("*")
+  }
+  implementation(libs.javax.ws.rs.api)
+  implementation(libs.jettison)
+  compileOnly(libs.lombok)
+  implementation(libs.mail)
+  implementation(libs.rome)
+  implementation(libs.commons.dbcp2)
+
+  testImplementation(project(":common"))
+  testImplementation(project(":clients:client-java"))
+  testImplementation(project(":server"))
+  testImplementation(project(":catalogs:catalog-common"))
+  testImplementation(project(":integration-test-common", "testArtifacts"))
+  testImplementation(libs.junit.jupiter.api)
+  testImplementation(libs.mockito.core)
+  testImplementation(libs.testcontainers)
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
+
+tasks {
+  val runtimeJars by registering(Copy::class) {
+    from(configurations.runtimeClasspath)
+    into("build/libs")
+  }
+
+  val copyAuthorizationLibs by registering(Copy::class) {
+    dependsOn("jar", runtimeJars)
+    from("build/libs") {
+      exclude("guava-*.jar")
+      exclude("log4j-*.jar")
+      exclude("slf4j-*.jar")
+    }
+    into("$rootDir/distribution/package/authorizations/ranger/libs")
+  }
+
+  register("copyLibAndConfig", Copy::class) {
+    dependsOn(copyAuthorizationLibs)
+  }
+
+  jar {
+    dependsOn(runtimeJars)
+  }
+}
+
+tasks.test {
+  dependsOn(":catalogs:catalog-hive:jar", ":catalogs:catalog-hive:runtimeJars")
+
+  val skipITs = project.hasProperty("skipITs")
+  if (skipITs) {
+    // Exclude integration tests
+    exclude("**/integration/test/**")
+  } else {
+    dependsOn(tasks.jar)
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPlugin.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPlugin.java
new file mode 100644
index 000000000..f889cee22
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPlugin.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.annotation.Unstable;
+import org.apache.gravitino.authorization.AuthorizationPrivilege;
+import org.apache.gravitino.authorization.AuthorizationSecurableObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.MetadataObjectChange;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.exceptions.AuthorizationPluginException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JdbcSQLBasedAuthorizationPlugin is the base class for all JDBC-based 
authorization plugins. For
+ * example, JdbcHiveAuthorizationPlugin is the JDBC-based authorization plugin 
for Hive. Different
+ * JDBC-based authorization plugins can inherit this class and implement their 
own SQL statements.
+ */
+@Unstable
+abstract class JdbcAuthorizationPlugin implements AuthorizationPlugin, 
JdbcAuthorizationSQL {
+
+  private static final String GROUP_PREFIX = "GRAVITINO_GROUP_";
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAuthorizationPlugin.class);
+
+  protected BasicDataSource dataSource;
+  protected JdbcSecurableObjectMappingProvider mappingProvider;
+
+  public JdbcAuthorizationPlugin(Map<String, String> config) {
+    // Initialize the data source
+    dataSource = new BasicDataSource();
+    JdbcAuthorizationProperties.validate(config);
+
+    String jdbcUrl = config.get(JdbcAuthorizationProperties.JDBC_URL);
+    dataSource.setUrl(jdbcUrl);
+    
dataSource.setDriverClassName(config.get(JdbcAuthorizationProperties.JDBC_DRIVER));
+    
dataSource.setUsername(config.get(JdbcAuthorizationProperties.JDBC_USERNAME));
+    
dataSource.setPassword(config.get(JdbcAuthorizationProperties.JDBC_PASSWORD));
+    dataSource.setDefaultAutoCommit(true);
+    dataSource.setMaxTotal(20);
+    dataSource.setMaxIdle(5);
+    dataSource.setMinIdle(0);
+    dataSource.setLogAbandoned(true);
+    dataSource.setRemoveAbandonedOnBorrow(true);
+    dataSource.setTestOnBorrow(BaseObjectPoolConfig.DEFAULT_TEST_ON_BORROW);
+    dataSource.setTestWhileIdle(BaseObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE);
+    
dataSource.setNumTestsPerEvictionRun(BaseObjectPoolConfig.DEFAULT_NUM_TESTS_PER_EVICTION_RUN);
+    dataSource.setTestOnReturn(BaseObjectPoolConfig.DEFAULT_TEST_ON_RETURN);
+    dataSource.setLifo(BaseObjectPoolConfig.DEFAULT_LIFO);
+    mappingProvider = new JdbcSecurableObjectMappingProvider();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (dataSource != null) {
+      try {
+        dataSource.close();
+        dataSource = null;
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public Boolean onMetadataUpdated(MetadataObjectChange... changes) throws 
RuntimeException {
+    // This interface mainly handles the metadata object rename change and 
delete change.
+    // The privilege for JdbcSQLBasedAuthorizationPlugin will be renamed or 
deleted automatically.
+    // We don't need to do any other things.
+    return true;
+  }
+
+  @Override
+  public Boolean onRoleCreated(Role role) throws AuthorizationPluginException {
+    List<String> sqls = getCreateRoleSQL(role.name());
+    for (String sql : sqls) {
+      executeUpdateSQL(sql, "already exists");
+    }
+
+    if (role.securableObjects() != null) {
+      for (SecurableObject object : role.securableObjects()) {
+        onRoleUpdated(role, RoleChange.addSecurableObject(role.name(), 
object));
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws AuthorizationPluginException 
{
+    throw new UnsupportedOperationException("Doesn't support to acquired a 
role");
+  }
+
+  @Override
+  public Boolean onRoleDeleted(Role role) throws AuthorizationPluginException {
+    List<String> sqls = getDropRoleSQL(role.name());
+    for (String sql : sqls) {
+      executeUpdateSQL(sql);
+    }
+    return null;
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes)
+      throws AuthorizationPluginException {
+    onRoleCreated(role);
+    for (RoleChange change : changes) {
+      if (change instanceof RoleChange.AddSecurableObject) {
+        SecurableObject object = ((RoleChange.AddSecurableObject) 
change).getSecurableObject();
+        grantObjectPrivileges(role, object);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        SecurableObject object = ((RoleChange.RemoveSecurableObject) 
change).getSecurableObject();
+        revokeObjectPrivileges(role, object);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        RoleChange.UpdateSecurableObject updateChange = 
(RoleChange.UpdateSecurableObject) change;
+        SecurableObject addObject = updateChange.getNewSecurableObject();
+        SecurableObject removeObject = updateChange.getSecurableObject();
+        revokeObjectPrivileges(role, removeObject);
+        grantObjectPrivileges(role, addObject);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("RoleChange is not supported - %s", change));
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user)
+      throws AuthorizationPluginException {
+
+    for (Role role : roles) {
+      onRoleCreated(role);
+      List<String> sqls = getGrantRoleSQL(role.name(), "USER", user.name());
+      for (String sql : sqls) {
+        executeUpdateSQL(sql);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onRevokedRolesFromUser(List<Role> roles, User user)
+      throws AuthorizationPluginException {
+
+    for (Role role : roles) {
+      onRoleCreated(role);
+      List<String> sqls = getRevokeRoleSQL(role.name(), "USER", user.name());
+      for (String sql : sqls) {
+        executeUpdateSQL(sql);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onGrantedRolesToGroup(List<Role> roles, Group group)
+      throws AuthorizationPluginException {
+
+    for (Role role : roles) {
+      onRoleCreated(role);
+      List<String> sqls =
+          getGrantRoleSQL(role.name(), "USER", String.format("%s%s", 
GROUP_PREFIX, group.name()));
+      for (String sql : sqls) {
+        executeUpdateSQL(sql);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onRevokedRolesFromGroup(List<Role> roles, Group group)
+      throws AuthorizationPluginException {
+
+    for (Role role : roles) {
+      onRoleCreated(role);
+      List<String> sqls =
+          getRevokeRoleSQL(role.name(), "USER", String.format("%s%s", 
GROUP_PREFIX, group.name()));
+      for (String sql : sqls) {
+        executeUpdateSQL(sql);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onUserAdded(User user) throws AuthorizationPluginException {
+    List<String> sqls = getCreateUserSQL(user.name());
+    for (String sql : sqls) {
+      executeUpdateSQL(sql);
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onUserRemoved(User user) throws AuthorizationPluginException {
+    List<String> sqls = getDropUserSQL(user.name());
+    for (String sql : sqls) {
+      executeUpdateSQL(sql);
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onUserAcquired(User user) throws AuthorizationPluginException 
{
+    throw new UnsupportedOperationException("Doesn't support to acquired a 
user");
+  }
+
+  @Override
+  public Boolean onGroupAdded(Group group) throws AuthorizationPluginException 
{
+    String name = String.format("%s%s", GROUP_PREFIX, group.name());
+    List<String> sqls = getCreateUserSQL(name);
+    for (String sql : sqls) {
+      executeUpdateSQL(sql);
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onGroupRemoved(Group group) throws 
AuthorizationPluginException {
+    String name = String.format("%s%s", GROUP_PREFIX, group.name());
+    List<String> sqls = getDropUserSQL(name);
+    for (String sql : sqls) {
+      executeUpdateSQL(sql);
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean onGroupAcquired(Group group) throws 
AuthorizationPluginException {
+    throw new UnsupportedOperationException("Doesn't support to acquired a 
group");
+  }
+
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws AuthorizationPluginException {
+    if (newOwner.type() == Owner.Type.USER) {
+      onUserAdded(
+          UserEntity.builder()
+              .withName(newOwner.name())
+              .withId(0L)
+              .withAuditInfo(AuditInfo.EMPTY)
+              .build());
+    } else if (newOwner.type() == Owner.Type.GROUP) {
+      onGroupAdded(
+          GroupEntity.builder()
+              .withName(newOwner.name())
+              .withId(0L)
+              .withAuditInfo(AuditInfo.EMPTY)
+              .build());
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Don't support owner type %s", newOwner.type()));
+    }
+
+    List<AuthorizationSecurableObject> authObjects = 
mappingProvider.translateOwner(metadataObject);
+    for (AuthorizationSecurableObject authObject : authObjects) {
+      List<String> sqls =
+          getSetOwnerSQL(
+              authObject.type().metadataObjectType(), authObject.fullName(), 
preOwner, newOwner);
+      for (String sql : sqls) {
+        executeUpdateSQL(sql);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public List<String> getCreateUserSQL(String username) {
+    return Lists.newArrayList(String.format("CREATE USER %s", username));
+  }
+
+  @Override
+  public List<String> getDropUserSQL(String username) {
+    return Lists.newArrayList(String.format("DROP USER %s", username));
+  }
+
+  @Override
+  public List<String> getCreateRoleSQL(String roleName) {
+    return Lists.newArrayList(String.format("CREATE ROLE %s", roleName));
+  }
+
+  @Override
+  public List<String> getDropRoleSQL(String roleName) {
+    return Lists.newArrayList(String.format("DROP ROLE %s", roleName));
+  }
+
+  @Override
+  public List<String> getGrantPrivilegeSQL(
+      String privilege, String objectType, String objectName, String roleName) 
{
+    return Lists.newArrayList(
+        String.format("GRANT %s ON %s %s TO ROLE %s", privilege, objectType, 
objectName, roleName));
+  }
+
+  @Override
+  public List<String> getRevokePrivilegeSQL(
+      String privilege, String objectType, String objectName, String roleName) 
{
+    return Lists.newArrayList(
+        String.format(
+            "REVOKE %s ON %s %s FROM ROLE %s", privilege, objectType, 
objectName, roleName));
+  }
+
+  @Override
+  public List<String> getGrantRoleSQL(String roleName, String grantorType, 
String grantorName) {
+    return Lists.newArrayList(
+        String.format("GRANT ROLE %s TO %s %s", roleName, grantorType, 
grantorName));
+  }
+
+  @Override
+  public List<String> getRevokeRoleSQL(String roleName, String revokerType, 
String revokerName) {
+    return Lists.newArrayList(
+        String.format("REVOKE ROLE %s FROM %s %s", roleName, revokerType, 
revokerName));
+  }
+
+  @VisibleForTesting
+  Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  protected void executeUpdateSQL(String sql) {
+    executeUpdateSQL(sql, null);
+  }
+
+  /**
+   * Convert the object name contains `*` to a list of 
AuthorizationSecurableObject.
+   *
+   * @param object The object contains the name with `*` to be converted
+   * @return The list of AuthorizationSecurableObject
+   */
+  protected List<AuthorizationSecurableObject> convertResourceAll(
+      AuthorizationSecurableObject object) {
+    List<AuthorizationSecurableObject> authObjects = Lists.newArrayList();
+    authObjects.add(object);
+    return authObjects;
+  }
+
+  protected List<AuthorizationPrivilege> filterUnsupportedPrivileges(
+      List<AuthorizationPrivilege> privileges) {
+    return privileges;
+  }
+
+  protected AuthorizationPluginException 
toAuthorizationPluginException(SQLException se) {
+    return new AuthorizationPluginException(
+        "JDBC authorization plugin fail to execute SQL, error code: %d", 
se.getErrorCode());
+  }
+
+  void executeUpdateSQL(String sql, String ignoreErrorMsg) {
+    try (final Connection connection = getConnection()) {
+      try (final Statement statement = connection.createStatement()) {
+        statement.executeUpdate(sql);
+      }
+    } catch (SQLException se) {
+      if (ignoreErrorMsg != null && se.getMessage().contains(ignoreErrorMsg)) {
+        return;
+      }
+      LOG.error("JDBC authorization plugin exception: ", se);
+      throw toAuthorizationPluginException(se);
+    }
+  }
+
+  private void grantObjectPrivileges(Role role, SecurableObject object) {
+    List<AuthorizationSecurableObject> authObjects = 
mappingProvider.translatePrivilege(object);
+    for (AuthorizationSecurableObject authObject : authObjects) {
+      List<AuthorizationSecurableObject> convertedObjects = 
Lists.newArrayList();
+      if (authObject.name().equals(JdbcSecurableObject.ALL)) {
+        convertedObjects.addAll(convertResourceAll(authObject));
+      } else {
+        convertedObjects.add(authObject);
+      }
+
+      for (AuthorizationSecurableObject convertedObject : convertedObjects) {
+        List<String> privileges =
+            filterUnsupportedPrivileges(authObject.privileges()).stream()
+                .map(AuthorizationPrivilege::getName)
+                .collect(Collectors.toList());
+        // We don't grant the privileges in one SQL, because some privilege 
has been granted, it
+        // will cause the failure of the SQL. So we grant the privileges one 
by one.
+        for (String privilege : privileges) {
+          List<String> sqls =
+              getGrantPrivilegeSQL(
+                  privilege,
+                  convertedObject.metadataObjectType().name(),
+                  convertedObject.fullName(),
+                  role.name());
+          for (String sql : sqls) {
+            executeUpdateSQL(sql, "is already granted");
+          }
+        }
+      }
+    }
+  }
+
+  private void revokeObjectPrivileges(Role role, SecurableObject removeObject) 
{
+    List<AuthorizationSecurableObject> authObjects =
+        mappingProvider.translatePrivilege(removeObject);
+    for (AuthorizationSecurableObject authObject : authObjects) {
+      List<AuthorizationSecurableObject> convertedObjects = 
Lists.newArrayList();
+      if (authObject.name().equals(JdbcSecurableObject.ALL)) {
+        convertedObjects.addAll(convertResourceAll(authObject));
+      } else {
+        convertedObjects.add(authObject);
+      }
+
+      for (AuthorizationSecurableObject convertedObject : convertedObjects) {
+        List<String> privileges =
+            filterUnsupportedPrivileges(authObject.privileges()).stream()
+                .map(AuthorizationPrivilege::getName)
+                .collect(Collectors.toList());
+        for (String privilege : privileges) {
+          // We don't revoke the privileges in one SQL, because some privilege 
has been revoked, it
+          // will cause the failure of the SQL. So we revoke the privileges 
one by one.
+          List<String> sqls =
+              getRevokePrivilegeSQL(
+                  privilege,
+                  convertedObject.metadataObjectType().name(),
+                  convertedObject.fullName(),
+                  role.name());
+          for (String sql : sqls) {
+            executeUpdateSQL(sql, "Cannot find privilege Privilege");
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationProperties.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationProperties.java
new file mode 100644
index 000000000..b13504fd2
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationProperties.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import java.util.Map;
+
+/** The properties for JDBC authorization plugin. */
+public class JdbcAuthorizationProperties {
+  private static final String CONFIG_PREFIX = "authorization.jdbc.";
+  public static final String JDBC_PASSWORD = CONFIG_PREFIX + "password";
+  public static final String JDBC_USERNAME = CONFIG_PREFIX + "username";
+  public static final String JDBC_URL = CONFIG_PREFIX + "url";
+  public static final String JDBC_DRIVER = CONFIG_PREFIX + "driver";
+
+  public static void validate(Map<String, String> properties) {
+    String errorMsg = "%s is required";
+    check(properties, JDBC_URL, errorMsg);
+    check(properties, JDBC_USERNAME, errorMsg);
+    check(properties, JDBC_PASSWORD, errorMsg);
+    check(properties, JDBC_DRIVER, errorMsg);
+  }
+
+  private static void check(Map<String, String> properties, String key, String 
errorMsg) {
+    if (!properties.containsKey(key) && properties.get(key) != null) {
+      throw new IllegalArgumentException(String.format(errorMsg, key));
+    }
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationSQL.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationSQL.java
new file mode 100644
index 000000000..f7171ff35
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationSQL.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.annotation.Unstable;
+import org.apache.gravitino.authorization.Owner;
+
+/** Interface for SQL operations of the underlying access control system. */
+@Unstable
+interface JdbcAuthorizationSQL {
+
+  /**
+   * Get SQL statements for creating a user.
+   *
+   * @param username the username to create
+   * @return the SQL statement list to create a user
+   */
+  List<String> getCreateUserSQL(String username);
+
+  /**
+   * Get SQL statements for creating a group.
+   *
+   * @param username the username to drop
+   * @return the SQL statement list to drop a user
+   */
+  List<String> getDropUserSQL(String username);
+
+  /**
+   * Get SQL statements for creating a role.
+   *
+   * @param roleName the role name to create
+   * @return the SQL statement list to create a role
+   */
+  List<String> getCreateRoleSQL(String roleName);
+
+  /**
+   * Get SQL statements for dropping a role.
+   *
+   * @param roleName the role name to drop
+   * @return the SQL statement list to drop a role
+   */
+  List<String> getDropRoleSQL(String roleName);
+
+  /**
+   * Get SQL statements for granting privileges.
+   *
+   * @param privilege the privilege to grant
+   * @param objectType the object type in the database system
+   * @param objectName the object name in the database system
+   * @param roleName the role name to grant
+   * @return the sql statement list to grant privilege
+   */
+  List<String> getGrantPrivilegeSQL(
+      String privilege, String objectType, String objectName, String roleName);
+
+  /**
+   * Get SQL statements for revoking privileges.
+   *
+   * @param privilege the privilege to revoke
+   * @param objectType the object type in the database system
+   * @param objectName the object name in the database system
+   * @param roleName the role name to revoke
+   * @return the sql statement list to revoke privilege
+   */
+  List<String> getRevokePrivilegeSQL(
+      String privilege, String objectType, String objectName, String roleName);
+
+  /**
+   * Get SQL statements for granting role.
+   *
+   * @param roleName the role name to grant
+   * @param grantorType the grantor type, usually USER or ROLE
+   * @param grantorName the grantor name
+   * @return the sql statement list to grant role
+   */
+  List<String> getGrantRoleSQL(String roleName, String grantorType, String 
grantorName);
+
+  /**
+   * Get SQL statements for revoking roles.
+   *
+   * @param roleName the role name to revoke
+   * @param revokerType the revoker type, usually USER or ROLE
+   * @param revokerName the revoker name
+   * @return the sql statement list to revoke role
+   */
+  List<String> getRevokeRoleSQL(String roleName, String revokerType, String 
revokerName);
+
+  /**
+   * Get SQL statements for setting owner.
+   *
+   * @param type The metadata object type
+   * @param objectName the object name in the database system
+   * @param preOwner the previous owner of the object
+   * @param newOwner the new owner of the object
+   * @return the sql statement list to set owner
+   */
+  List<String> getSetOwnerSQL(
+      MetadataObject.Type type, String objectName, Owner preOwner, Owner 
newOwner);
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcMetadataObject.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcMetadataObject.java
new file mode 100644
index 000000000..c74c7ae60
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcMetadataObject.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.AuthorizationMetadataObject;
+
+public class JdbcMetadataObject implements AuthorizationMetadataObject {
+
+  private final String parent;
+  private final String name;
+  private final Type type;
+
+  public JdbcMetadataObject(String parent, String name, Type type) {
+    this.parent = parent;
+    this.name = name;
+    this.type = type;
+  }
+
+  @Nullable
+  @Override
+  public String parent() {
+    return parent;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public List<String> names() {
+    return DOT_SPLITTER.splitToList(fullName());
+  }
+
+  @Override
+  public Type type() {
+    return type;
+  }
+
+  @Override
+  public void validateAuthorizationMetadataObject() throws 
IllegalArgumentException {
+    List<String> names = names();
+    Preconditions.checkArgument(
+        names != null && !names.isEmpty(), "The name of the object is empty.");
+    Preconditions.checkArgument(
+        names.size() <= 2, "The name of the object is not in the format of 
'database.table'.");
+    Preconditions.checkArgument(type != null, "The type of the object is 
null.");
+    if (names.size() == 1) {
+      Preconditions.checkArgument(
+          type.metadataObjectType() == MetadataObject.Type.SCHEMA,
+          "The type of the object is not SCHEMA.");
+    } else {
+      Preconditions.checkArgument(
+          type.metadataObjectType() == MetadataObject.Type.TABLE,
+          "The type of the object is not TABLE.");
+    }
+
+    for (String name : names) {
+      Preconditions.checkArgument(name != null, "Cannot create a metadata 
object with null name");
+    }
+  }
+
+  public enum Type implements AuthorizationMetadataObject.Type {
+    SCHEMA(MetadataObject.Type.SCHEMA),
+    TABLE(MetadataObject.Type.TABLE);
+
+    private final MetadataObject.Type metadataType;
+
+    Type(MetadataObject.Type type) {
+      this.metadataType = type;
+    }
+
+    public MetadataObject.Type metadataObjectType() {
+      return metadataType;
+    }
+
+    public static Type fromMetadataType(MetadataObject.Type metadataType) {
+      for (Type type : Type.values()) {
+        if (type.metadataObjectType() == metadataType) {
+          return type;
+        }
+      }
+      throw new IllegalArgumentException("No matching JdbcMetadataObject.Type 
for " + metadataType);
+    }
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcPrivilege.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcPrivilege.java
new file mode 100644
index 000000000..845b31a5b
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcPrivilege.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import org.apache.gravitino.authorization.AuthorizationPrivilege;
+import org.apache.gravitino.authorization.Privilege;
+
+public enum JdbcPrivilege implements AuthorizationPrivilege {
+  SELECT("SELECT"),
+  INSERT("INSERT"),
+  UPDATE("UPDATE"),
+  ALTER("ALTER"),
+  DELETE("DELETE"),
+  ALL("ALL PRIVILEGES"),
+  CREATE("CREATE"),
+  DROP("DROP"),
+  USAGE("USAGE");
+
+  private final String name;
+
+  JdbcPrivilege(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Privilege.Condition condition() {
+    return Privilege.Condition.ALLOW;
+  }
+
+  @Override
+  public boolean equalsTo(String value) {
+    return name.equals(value);
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObject.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObject.java
new file mode 100644
index 000000000..78b82e2a8
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObject.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.AuthorizationPrivilege;
+import org.apache.gravitino.authorization.AuthorizationSecurableObject;
+
+/**
+ * JdbcAuthorizationObject is used for translating securable object to 
authorization securable
+ * object. JdbcAuthorizationObject has the database and table name. When table 
name is null, the
+ * object represents a database. The database can't be null.
+ */
+public class JdbcSecurableObject extends JdbcMetadataObject
+    implements AuthorizationSecurableObject {
+
+  public static final String ALL = "*";
+
+  List<AuthorizationPrivilege> privileges;
+
+  private JdbcSecurableObject(
+      String parent,
+      String name,
+      JdbcMetadataObject.Type type,
+      List<AuthorizationPrivilege> privileges) {
+    super(parent, name, type);
+    this.privileges = privileges;
+  }
+
+  static JdbcSecurableObject create(
+      String schema, String table, List<AuthorizationPrivilege> privileges) {
+    String parent = table == null ? null : schema;
+    String name = table == null ? schema : table;
+    JdbcMetadataObject.Type type =
+        table == null
+            ? 
JdbcMetadataObject.Type.fromMetadataType(MetadataObject.Type.SCHEMA)
+            : 
JdbcMetadataObject.Type.fromMetadataType(MetadataObject.Type.TABLE);
+
+    JdbcSecurableObject object = new JdbcSecurableObject(parent, name, type, 
privileges);
+    object.validateAuthorizationMetadataObject();
+    return object;
+  }
+
+  @Override
+  public List<AuthorizationPrivilege> privileges() {
+    return privileges;
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObjectMappingProvider.java
 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObjectMappingProvider.java
new file mode 100644
index 000000000..70b2d10e3
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/main/java/org/apache/gravitino/authorization/jdbc/JdbcSecurableObjectMappingProvider.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.authorization.AuthorizationMetadataObject;
+import org.apache.gravitino.authorization.AuthorizationPrivilege;
+import 
org.apache.gravitino.authorization.AuthorizationPrivilegesMappingProvider;
+import org.apache.gravitino.authorization.AuthorizationSecurableObject;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.SecurableObject;
+
+/**
+ * JdbcSecurableObjectMappingProvider is used for translating securable object 
to authorization
+ * securable object.
+ */
+public class JdbcSecurableObjectMappingProvider implements 
AuthorizationPrivilegesMappingProvider {
+
+  private final Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegeMapping =
+      ImmutableMap.of(
+          Privilege.Name.CREATE_TABLE, Sets.newHashSet(JdbcPrivilege.CREATE),
+          Privilege.Name.CREATE_SCHEMA, Sets.newHashSet(JdbcPrivilege.CREATE),
+          Privilege.Name.SELECT_TABLE, Sets.newHashSet(JdbcPrivilege.SELECT),
+          Privilege.Name.MODIFY_TABLE,
+              Sets.newHashSet(
+                  JdbcPrivilege.SELECT,
+                  JdbcPrivilege.UPDATE,
+                  JdbcPrivilege.DELETE,
+                  JdbcPrivilege.INSERT,
+                  JdbcPrivilege.ALTER),
+          Privilege.Name.USE_SCHEMA, Sets.newHashSet(JdbcPrivilege.USAGE));
+
+  private final Map<Privilege.Name, MetadataObject.Type> privilegeScopeMapping 
=
+      ImmutableMap.of(
+          Privilege.Name.CREATE_TABLE, MetadataObject.Type.TABLE,
+          Privilege.Name.CREATE_SCHEMA, MetadataObject.Type.SCHEMA,
+          Privilege.Name.SELECT_TABLE, MetadataObject.Type.TABLE,
+          Privilege.Name.MODIFY_TABLE, MetadataObject.Type.TABLE,
+          Privilege.Name.USE_SCHEMA, MetadataObject.Type.SCHEMA);
+
+  private final Set<AuthorizationPrivilege> ownerPrivileges = 
ImmutableSet.of();
+
+  private final Set<MetadataObject.Type> allowObjectTypes =
+      ImmutableSet.of(
+          MetadataObject.Type.METALAKE,
+          MetadataObject.Type.CATALOG,
+          MetadataObject.Type.SCHEMA,
+          MetadataObject.Type.TABLE);
+
+  @Override
+  public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule() {
+    return privilegeMapping;
+  }
+
+  @Override
+  public Set<AuthorizationPrivilege> ownerMappingRule() {
+    return ownerPrivileges;
+  }
+
+  @Override
+  public Set<Privilege.Name> allowPrivilegesRule() {
+    return privilegeMapping.keySet();
+  }
+
+  @Override
+  public Set<MetadataObject.Type> allowMetadataObjectTypesRule() {
+    return allowObjectTypes;
+  }
+
+  @Override
+  public List<AuthorizationSecurableObject> translatePrivilege(SecurableObject 
securableObject) {
+    List<AuthorizationSecurableObject> authObjects = Lists.newArrayList();
+    List<AuthorizationPrivilege> databasePrivileges = Lists.newArrayList();
+    List<AuthorizationPrivilege> tablePrivileges = Lists.newArrayList();
+    JdbcSecurableObject databaseObject;
+    JdbcSecurableObject tableObject;
+    switch (securableObject.type()) {
+      case METALAKE:
+      case CATALOG:
+        convertJdbcPrivileges(securableObject, databasePrivileges, 
tablePrivileges);
+
+        if (!databasePrivileges.isEmpty()) {
+          databaseObject =
+              JdbcSecurableObject.create(JdbcSecurableObject.ALL, null, 
databasePrivileges);
+          authObjects.add(databaseObject);
+        }
+
+        if (!tablePrivileges.isEmpty()) {
+          tableObject =
+              JdbcSecurableObject.create(
+                  JdbcSecurableObject.ALL, JdbcSecurableObject.ALL, 
tablePrivileges);
+          authObjects.add(tableObject);
+        }
+        break;
+
+      case SCHEMA:
+        convertJdbcPrivileges(securableObject, databasePrivileges, 
tablePrivileges);
+        if (!databasePrivileges.isEmpty()) {
+          databaseObject =
+              JdbcSecurableObject.create(securableObject.name(), null, 
databasePrivileges);
+          authObjects.add(databaseObject);
+        }
+
+        if (!tablePrivileges.isEmpty()) {
+          tableObject =
+              JdbcSecurableObject.create(
+                  securableObject.name(), JdbcSecurableObject.ALL, 
tablePrivileges);
+          authObjects.add(tableObject);
+        }
+        break;
+
+      case TABLE:
+        convertJdbcPrivileges(securableObject, databasePrivileges, 
tablePrivileges);
+        if (!tablePrivileges.isEmpty()) {
+          MetadataObject metadataObject =
+              MetadataObjects.parse(securableObject.parent(), 
MetadataObject.Type.SCHEMA);
+          tableObject =
+              JdbcSecurableObject.create(
+                  metadataObject.name(), securableObject.name(), 
tablePrivileges);
+          authObjects.add(tableObject);
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException(
+            String.format("Don't support metadata object type %s", 
securableObject.type()));
+    }
+
+    return authObjects;
+  }
+
+  @Override
+  public List<AuthorizationSecurableObject> translateOwner(MetadataObject 
metadataObject) {
+    List<AuthorizationSecurableObject> objects = Lists.newArrayList();
+    switch (metadataObject.type()) {
+      case METALAKE:
+      case CATALOG:
+        objects.add(
+            JdbcSecurableObject.create(
+                JdbcSecurableObject.ALL, null, 
Lists.newArrayList(JdbcPrivilege.ALL)));
+        objects.add(
+            JdbcSecurableObject.create(
+                JdbcSecurableObject.ALL,
+                JdbcSecurableObject.ALL,
+                Lists.newArrayList(JdbcPrivilege.ALL)));
+        break;
+      case SCHEMA:
+        objects.add(
+            JdbcSecurableObject.create(
+                metadataObject.name(), null, 
Lists.newArrayList(JdbcPrivilege.ALL)));
+        objects.add(
+            JdbcSecurableObject.create(
+                metadataObject.name(),
+                JdbcSecurableObject.ALL,
+                Lists.newArrayList(JdbcPrivilege.ALL)));
+        break;
+      case TABLE:
+        MetadataObject schema =
+            MetadataObjects.parse(metadataObject.parent(), 
MetadataObject.Type.SCHEMA);
+        objects.add(
+            JdbcSecurableObject.create(
+                schema.name(), metadataObject.name(), 
Lists.newArrayList(JdbcPrivilege.ALL)));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Don't support metadata object type " + metadataObject.type());
+    }
+    return objects;
+  }
+
+  @Override
+  public AuthorizationMetadataObject translateMetadataObject(MetadataObject 
metadataObject) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  private void convertJdbcPrivileges(
+      SecurableObject securableObject,
+      List<AuthorizationPrivilege> databasePrivileges,
+      List<AuthorizationPrivilege> tablePrivileges) {
+    for (Privilege privilege : securableObject.privileges()) {
+      if (privilegeScopeMapping.get(privilege.name()) == 
MetadataObject.Type.SCHEMA) {
+        databasePrivileges.addAll(privilegeMapping.get(privilege.name()));
+      } else if (privilegeScopeMapping.get(privilege.name()) == 
MetadataObject.Type.TABLE) {
+        tablePrivileges.addAll(privilegeMapping.get(privilege.name()));
+      }
+    }
+  }
+}
diff --git 
a/authorizations/authorization-jdbc/src/test/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPluginTest.java
 
b/authorizations/authorization-jdbc/src/test/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPluginTest.java
new file mode 100644
index 000000000..b72392a6c
--- /dev/null
+++ 
b/authorizations/authorization-jdbc/src/test/java/org/apache/gravitino/authorization/jdbc/JdbcAuthorizationPluginTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.jdbc;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class JdbcAuthorizationPluginTest {
+  private static List<String> expectSQLs = Lists.newArrayList();
+  private static List<MetadataObject.Type> expectTypes = Lists.newArrayList();
+  private static List<String> expectObjectNames = Lists.newArrayList();
+  private static List<Optional<Owner>> expectPreOwners = Lists.newArrayList();
+  private static List<Owner> expectNewOwners = Lists.newArrayList();
+  private static int currentSQLIndex = 0;
+  private static int currentIndex = 0;
+  private static final Map<String, String> properties =
+      ImmutableMap.of(
+          JdbcAuthorizationProperties.JDBC_URL,
+          "xx",
+          JdbcAuthorizationProperties.JDBC_USERNAME,
+          "xx",
+          JdbcAuthorizationProperties.JDBC_PASSWORD,
+          "xx",
+          JdbcAuthorizationProperties.JDBC_DRIVER,
+          "xx");
+
+  private static final JdbcAuthorizationPlugin plugin =
+      new JdbcAuthorizationPlugin(properties) {
+
+        @Override
+        public List<String> getSetOwnerSQL(
+            MetadataObject.Type type, String objectName, Owner preOwner, Owner 
newOwner) {
+          Assertions.assertEquals(expectTypes.get(currentIndex), type);
+          Assertions.assertEquals(expectObjectNames.get(currentIndex), 
objectName);
+          Assertions.assertEquals(expectPreOwners.get(currentIndex), 
Optional.ofNullable(preOwner));
+          Assertions.assertEquals(expectNewOwners.get(currentIndex), newOwner);
+          currentIndex++;
+          return Collections.emptyList();
+        }
+
+        void executeUpdateSQL(String sql, String ignoreErrorMsg) {
+          Assertions.assertEquals(expectSQLs.get(currentSQLIndex), sql);
+          currentSQLIndex++;
+        }
+      };
+
+  @Test
+  public void testUserManagement() {
+    expectSQLs = Lists.newArrayList("CREATE USER tmp");
+    currentSQLIndex = 0;
+    plugin.onUserAdded(createUser("tmp"));
+
+    Assertions.assertThrows(
+        UnsupportedOperationException.class, () -> 
plugin.onUserAcquired(createUser("tmp")));
+
+    expectSQLs = Lists.newArrayList("DROP USER tmp");
+    currentSQLIndex = 0;
+    plugin.onUserRemoved(createUser("tmp"));
+  }
+
+  @Test
+  public void testGroupManagement() {
+    expectSQLs = Lists.newArrayList("CREATE USER GRAVITINO_GROUP_tmp");
+    resetSQLIndex();
+    plugin.onGroupAdded(createGroup("tmp"));
+
+    Assertions.assertThrows(
+        UnsupportedOperationException.class, () -> 
plugin.onGroupAcquired(createGroup("tmp")));
+
+    expectSQLs = Lists.newArrayList("DROP USER GRAVITINO_GROUP_tmp");
+    resetSQLIndex();
+    plugin.onGroupRemoved(createGroup("tmp"));
+  }
+
+  @Test
+  public void testRoleManagement() {
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp");
+    resetSQLIndex();
+    Role role = createRole("tmp");
+    plugin.onRoleCreated(role);
+
+    Assertions.assertThrows(UnsupportedOperationException.class, () -> 
plugin.onRoleAcquired(role));
+
+    resetSQLIndex();
+    expectSQLs = Lists.newArrayList("DROP ROLE tmp");
+    plugin.onRoleDeleted(role);
+  }
+
+  @Test
+  public void testPermissionManagement() {
+    Role role = createRole("tmp");
+    Group group = createGroup("tmp");
+    User user = createUser("tmp");
+
+    resetSQLIndex();
+    expectSQLs =
+        Lists.newArrayList("CREATE ROLE tmp", "GRANT ROLE tmp TO USER 
GRAVITINO_GROUP_tmp");
+    plugin.onGrantedRolesToGroup(Lists.newArrayList(role), group);
+
+    resetSQLIndex();
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp", "GRANT ROLE tmp TO USER 
tmp");
+    plugin.onGrantedRolesToUser(Lists.newArrayList(role), user);
+
+    resetSQLIndex();
+    expectSQLs =
+        Lists.newArrayList("CREATE ROLE tmp", "REVOKE ROLE tmp FROM USER 
GRAVITINO_GROUP_tmp");
+    plugin.onRevokedRolesFromGroup(Lists.newArrayList(role), group);
+
+    resetSQLIndex();
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp", "REVOKE ROLE tmp FROM 
USER tmp");
+    plugin.onRevokedRolesFromUser(Lists.newArrayList(role), user);
+
+    // Test metalake object and different role change
+    resetSQLIndex();
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp", "GRANT SELECT ON TABLE 
*.* TO ROLE tmp");
+    SecurableObject metalakeObject =
+        SecurableObjects.ofMetalake("metalake", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    RoleChange roleChange = RoleChange.addSecurableObject("tmp", 
metalakeObject);
+    plugin.onRoleUpdated(role, roleChange);
+
+    resetSQLIndex();
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp", "REVOKE SELECT ON TABLE 
*.* FROM ROLE tmp");
+    roleChange = RoleChange.removeSecurableObject("tmp", metalakeObject);
+    plugin.onRoleUpdated(role, roleChange);
+
+    resetSQLIndex();
+    expectSQLs =
+        Lists.newArrayList(
+            "CREATE ROLE tmp",
+            "REVOKE SELECT ON TABLE *.* FROM ROLE tmp",
+            "GRANT CREATE ON TABLE *.* TO ROLE tmp");
+    SecurableObject newMetalakeObject =
+        SecurableObjects.ofMetalake("metalake", 
Lists.newArrayList(Privileges.CreateTable.allow()));
+    roleChange = RoleChange.updateSecurableObject("tmp", metalakeObject, 
newMetalakeObject);
+    plugin.onRoleUpdated(role, roleChange);
+
+    // Test catalog object
+    resetSQLIndex();
+    SecurableObject catalogObject =
+        SecurableObjects.ofCatalog("catalog", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    roleChange = RoleChange.addSecurableObject("tmp", catalogObject);
+    expectSQLs = Lists.newArrayList("CREATE ROLE tmp", "GRANT SELECT ON TABLE 
*.* TO ROLE tmp");
+    plugin.onRoleUpdated(role, roleChange);
+
+    // Test schema object
+    resetSQLIndex();
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            catalogObject, "schema", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    roleChange = RoleChange.addSecurableObject("tmp", schemaObject);
+    expectSQLs =
+        Lists.newArrayList("CREATE ROLE tmp", "GRANT SELECT ON TABLE schema.* 
TO ROLE tmp");
+    plugin.onRoleUpdated(role, roleChange);
+
+    // Test table object
+    resetSQLIndex();
+    SecurableObject tableObject =
+        SecurableObjects.ofTable(
+            schemaObject, "table", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    roleChange = RoleChange.addSecurableObject("tmp", tableObject);
+    expectSQLs =
+        Lists.newArrayList("CREATE ROLE tmp", "GRANT SELECT ON TABLE 
schema.table TO ROLE tmp");
+    plugin.onRoleUpdated(role, roleChange);
+  }
+
+  @Test
+  public void testOwnerManagement() {
+
+    // Test metalake object
+    Owner owner = new TemporaryOwner("tmp", Owner.Type.USER);
+    MetadataObject metalakeObject =
+        MetadataObjects.of(null, "metalake", MetadataObject.Type.METALAKE);
+    expectSQLs = Lists.newArrayList("CREATE USER tmp");
+    currentSQLIndex = 0;
+    expectTypes.add(MetadataObject.Type.SCHEMA);
+    expectObjectNames.add("*");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+
+    expectTypes.add(MetadataObject.Type.TABLE);
+    expectObjectNames.add("*.*");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+    plugin.onOwnerSet(metalakeObject, null, owner);
+
+    // clean up
+    cleanup();
+    expectSQLs = Lists.newArrayList("CREATE USER tmp");
+
+    // Test catalog object
+    MetadataObject catalogObject = MetadataObjects.of(null, "catalog", 
MetadataObject.Type.CATALOG);
+    expectTypes.add(MetadataObject.Type.SCHEMA);
+    expectObjectNames.add("*");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+
+    expectTypes.add(MetadataObject.Type.TABLE);
+    expectObjectNames.add("*.*");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+    plugin.onOwnerSet(catalogObject, null, owner);
+
+    // clean up
+    cleanup();
+    expectSQLs = Lists.newArrayList("CREATE USER tmp");
+
+    // Test schema object
+    MetadataObject schemaObject =
+        MetadataObjects.of("catalog", "schema", MetadataObject.Type.SCHEMA);
+    expectTypes.add(MetadataObject.Type.SCHEMA);
+    expectObjectNames.add("schema");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+
+    expectTypes.add(MetadataObject.Type.TABLE);
+    expectObjectNames.add("schema.*");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+    plugin.onOwnerSet(schemaObject, null, owner);
+
+    // clean up
+    cleanup();
+    expectSQLs = Lists.newArrayList("CREATE USER tmp");
+
+    // Test table object
+    MetadataObject tableObject =
+        MetadataObjects.of(
+            Lists.newArrayList("catalog", "schema", "table"), 
MetadataObject.Type.TABLE);
+
+    expectTypes.add(MetadataObject.Type.TABLE);
+    expectObjectNames.add("schema.table");
+    expectPreOwners.add(Optional.empty());
+    expectNewOwners.add(owner);
+    plugin.onOwnerSet(tableObject, null, owner);
+  }
+
+  private static void resetSQLIndex() {
+    currentSQLIndex = 0;
+  }
+
+  private static void cleanup() {
+    expectTypes.clear();
+    expectObjectNames.clear();
+    expectPreOwners.clear();
+    expectNewOwners.clear();
+    currentIndex = 0;
+    currentSQLIndex = 0;
+  }
+
+  private static class TemporaryOwner implements Owner {
+    private final String name;
+    private final Type type;
+
+    public TemporaryOwner(String name, Type type) {
+      this.name = name;
+      this.type = type;
+    }
+
+    @Override
+    public String name() {
+      return name;
+    }
+
+    @Override
+    public Type type() {
+      return type;
+    }
+  }
+
+  private static Role createRole(String name) {
+    return 
RoleEntity.builder().withId(0L).withName(name).withAuditInfo(AuditInfo.EMPTY).build();
+  }
+
+  private static Group createGroup(String name) {
+    return 
GroupEntity.builder().withId(0L).withName(name).withAuditInfo(AuditInfo.EMPTY).build();
+  }
+
+  private static User createUser(String name) {
+    return 
UserEntity.builder().withId(0L).withName(name).withAuditInfo(AuditInfo.EMPTY).build();
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 150acdb00..b3eb56578 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -57,7 +57,7 @@ if 
(gradle.startParameter.projectProperties["enableFuse"]?.toBoolean() == true)
 }
 include("iceberg:iceberg-common")
 include("iceberg:iceberg-rest-server")
-include("authorizations:authorization-ranger")
+include("authorizations:authorization-ranger", 
"authorizations:authorization-jdbc")
 include("trino-connector:trino-connector", "trino-connector:integration-test")
 include("spark-connector:spark-common")
 // kyuubi hive connector doesn't support 2.13 for Spark3.3

Reply via email to