This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new b151461c6 [#5731] feat(auth-ranger): RangerAuthorizationHDFSPlugin 
supports Fileset authorization (#5733)
b151461c6 is described below

commit b151461c69f6701ab4f7e8a60a291d064af39e86
Author: theoryxu <xuxiaothe...@gmail.com>
AuthorDate: Fri Dec 13 13:17:55 2024 +0800

    [#5731] feat(auth-ranger): RangerAuthorizationHDFSPlugin supports Fileset 
authorization (#5733)
    
    ### What changes were proposed in this pull request?
    
    RangerAuthorizationHDFSPlugin supports Fileset authorization
    
    ### Why are the changes needed?
    
    Fix: #5731
    
    ### Does this PR introduce _any_ user-facing change?
    
    Addition property keys in Fileset
    
    ### How was this patch tested?
    
    ITs
    
    ---------
    
    Co-authored-by: theoryxu <theor...@tencent.com>
---
 .../authorization-ranger/build.gradle.kts          |   2 +-
 .../authorization/ranger/RangerAuthorization.java  |   2 +
 .../ranger/RangerAuthorizationHDFSPlugin.java      | 252 +++++++++
 .../ranger/RangerAuthorizationHadoopSQLPlugin.java |  81 ++-
 .../ranger/RangerAuthorizationPlugin.java          |  93 +++-
 ...ect.java => RangerHadoopSQLMetadataObject.java} |  16 +-
 ...ct.java => RangerHadoopSQLSecurableObject.java} |   6 +-
 .../authorization/ranger/RangerHelper.java         |  55 --
 ...ject.java => RangerPathBaseMetadataObject.java} |  92 +---
 ...ect.java => RangerPathBaseSecurableObject.java} |  23 +-
 .../ranger/reference/RangerDefines.java            |   4 +-
 .../test/RangerAuthorizationHDFSPluginIT.java      | 172 ++++++
 .../test/RangerAuthorizationPluginIT.java          |  58 ++-
 .../ranger/integration/test/RangerFilesetIT.java   | 578 +++++++++++++++++++++
 .../ranger/integration/test/RangerHiveE2EIT.java   |   2 +-
 .../ranger/integration/test/RangerHiveIT.java      |  10 +-
 .../ranger/integration/test/RangerITEnv.java       |  43 +-
 .../integration/test/RangerIcebergE2EIT.java       |   2 +-
 .../ranger/integration/test/RangerPaimonE2EIT.java |   2 +-
 19 files changed, 1255 insertions(+), 238 deletions(-)

diff --git a/authorizations/authorization-ranger/build.gradle.kts 
b/authorizations/authorization-ranger/build.gradle.kts
index f83aee72c..a335e492b 100644
--- a/authorizations/authorization-ranger/build.gradle.kts
+++ b/authorizations/authorization-ranger/build.gradle.kts
@@ -133,7 +133,7 @@ tasks.test {
   doFirst {
     environment("HADOOP_USER_NAME", "gravitino")
   }
-  dependsOn(":catalogs:catalog-hive:jar", 
":catalogs:catalog-hive:runtimeJars", 
":catalogs:catalog-lakehouse-iceberg:jar", 
":catalogs:catalog-lakehouse-iceberg:runtimeJars", 
":catalogs:catalog-lakehouse-paimon:jar", 
":catalogs:catalog-lakehouse-paimon:runtimeJars")
+  dependsOn(":catalogs:catalog-hive:jar", 
":catalogs:catalog-hive:runtimeJars", 
":catalogs:catalog-lakehouse-iceberg:jar", 
":catalogs:catalog-lakehouse-iceberg:runtimeJars", 
":catalogs:catalog-lakehouse-paimon:jar", 
":catalogs:catalog-lakehouse-paimon:runtimeJars", 
":catalogs:catalog-hadoop:jar", ":catalogs:catalog-hadoop:runtimeJars")
 
   val skipITs = project.hasProperty("skipITs")
   if (skipITs) {
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
index ae656f981..04c40e219 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorization.java
@@ -37,6 +37,8 @@ public class RangerAuthorization extends 
BaseAuthorization<RangerAuthorization>
       case "lakehouse-iceberg":
       case "lakehouse-paimon":
         return RangerAuthorizationHadoopSQLPlugin.getInstance(metalake, 
config);
+      case "hadoop":
+        return RangerAuthorizationHDFSPlugin.getInstance(metalake, config);
       default:
         throw new IllegalArgumentException("Unknown catalog provider: " + 
catalogProvider);
     }
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java
new file mode 100644
index 000000000..16ce5bba4
--- /dev/null
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHDFSPlugin.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.AuthorizationMetadataObject;
+import org.apache.gravitino.authorization.AuthorizationPrivilege;
+import org.apache.gravitino.authorization.AuthorizationSecurableObject;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.ranger.reference.RangerDefines;
+import org.apache.gravitino.catalog.FilesetDispatcher;
+import org.apache.gravitino.exceptions.AuthorizationPluginException;
+import org.apache.gravitino.file.Fileset;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RangerAuthorizationHDFSPlugin extends RangerAuthorizationPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuthorizationHDFSPlugin.class);
+
+  private static final Pattern pattern = Pattern.compile("^hdfs://[^/]*");
+
+  private static volatile RangerAuthorizationHDFSPlugin instance = null;
+
+  private RangerAuthorizationHDFSPlugin(String metalake, Map<String, String> 
config) {
+    super(metalake, config);
+  }
+
+  public static synchronized RangerAuthorizationHDFSPlugin getInstance(
+      String metalake, Map<String, String> config) {
+    if (instance == null) {
+      synchronized (RangerAuthorizationHadoopSQLPlugin.class) {
+        if (instance == null) {
+          instance = new RangerAuthorizationHDFSPlugin(metalake, config);
+        }
+      }
+    }
+    return instance;
+  }
+
+  @Override
+  public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule() {
+    return ImmutableMap.of(
+        Privilege.Name.READ_FILESET,
+        ImmutableSet.of(
+            RangerPrivileges.RangerHdfsPrivilege.READ,
+            RangerPrivileges.RangerHdfsPrivilege.EXECUTE),
+        Privilege.Name.WRITE_FILESET,
+        ImmutableSet.of(
+            RangerPrivileges.RangerHdfsPrivilege.WRITE,
+            RangerPrivileges.RangerHdfsPrivilege.EXECUTE));
+  }
+
+  @Override
+  public Set<AuthorizationPrivilege> ownerMappingRule() {
+    return ImmutableSet.of(
+        RangerPrivileges.RangerHdfsPrivilege.READ,
+        RangerPrivileges.RangerHdfsPrivilege.WRITE,
+        RangerPrivileges.RangerHdfsPrivilege.EXECUTE);
+  }
+
+  @Override
+  public List<String> policyResourceDefinesRule() {
+    return ImmutableList.of(RangerDefines.PolicyResource.PATH.getName());
+  }
+
+  @Override
+  protected RangerPolicy createPolicyAddResources(AuthorizationMetadataObject 
metadataObject) {
+    RangerPolicy policy = new RangerPolicy();
+    policy.setService(rangerServiceName);
+    policy.setName(metadataObject.fullName());
+    RangerPolicy.RangerPolicyResource policyResource =
+        new RangerPolicy.RangerPolicyResource(metadataObject.names().get(0), 
false, true);
+    policy.getResources().put(RangerDefines.PolicyResource.PATH.getName(), 
policyResource);
+    return policy;
+  }
+
+  @Override
+  public AuthorizationSecurableObject generateAuthorizationSecurableObject(
+      List<String> names,
+      AuthorizationMetadataObject.Type type,
+      Set<AuthorizationPrivilege> privileges) {
+    AuthorizationMetadataObject authMetadataObject =
+        new 
RangerPathBaseMetadataObject(AuthorizationMetadataObject.getLastName(names), 
type);
+    authMetadataObject.validateAuthorizationMetadataObject();
+    return new RangerPathBaseSecurableObject(
+        authMetadataObject.name(), authMetadataObject.type(), privileges);
+  }
+
+  @Override
+  public Set<Privilege.Name> allowPrivilegesRule() {
+    return ImmutableSet.of(
+        Privilege.Name.CREATE_FILESET, Privilege.Name.READ_FILESET, 
Privilege.Name.WRITE_FILESET);
+  }
+
+  @Override
+  public Set<MetadataObject.Type> allowMetadataObjectTypesRule() {
+    return ImmutableSet.of(
+        MetadataObject.Type.FILESET,
+        MetadataObject.Type.SCHEMA,
+        MetadataObject.Type.CATALOG,
+        MetadataObject.Type.METALAKE);
+  }
+
+  @Override
+  public List<AuthorizationSecurableObject> translatePrivilege(SecurableObject 
securableObject) {
+    List<AuthorizationSecurableObject> rangerSecurableObjects = new 
ArrayList<>();
+
+    securableObject.privileges().stream()
+        .filter(Objects::nonNull)
+        .forEach(
+            gravitinoPrivilege -> {
+              Set<AuthorizationPrivilege> rangerPrivileges = new HashSet<>();
+              // Ignore unsupported privileges
+              if 
(!privilegesMappingRule().containsKey(gravitinoPrivilege.name())) {
+                return;
+              }
+              privilegesMappingRule().get(gravitinoPrivilege.name()).stream()
+                  .forEach(
+                      rangerPrivilege ->
+                          rangerPrivileges.add(
+                              new RangerPrivileges.RangerHivePrivilegeImpl(
+                                  rangerPrivilege, 
gravitinoPrivilege.condition())));
+
+              switch (gravitinoPrivilege.name()) {
+                case CREATE_FILESET:
+                  // Ignore the Gravitino privilege `CREATE_FILESET` in the
+                  // RangerAuthorizationHDFSPlugin
+                  break;
+                case READ_FILESET:
+                case WRITE_FILESET:
+                  switch (securableObject.type()) {
+                    case METALAKE:
+                    case CATALOG:
+                    case SCHEMA:
+                      break;
+                    case FILESET:
+                      rangerSecurableObjects.add(
+                          generateAuthorizationSecurableObject(
+                              translateMetadataObject(securableObject).names(),
+                              RangerPathBaseMetadataObject.Type.PATH,
+                              rangerPrivileges));
+                      break;
+                    default:
+                      throw new AuthorizationPluginException(
+                          "The privilege %s is not supported for the securable 
object: %s",
+                          gravitinoPrivilege.name(), securableObject.type());
+                  }
+                  break;
+                default:
+                  LOG.warn(
+                      "RangerAuthorizationHDFSPlugin -> privilege {} is not 
supported for the securable object: {}",
+                      gravitinoPrivilege.name(),
+                      securableObject.type());
+              }
+            });
+
+    return rangerSecurableObjects;
+  }
+
+  @Override
+  public List<AuthorizationSecurableObject> translateOwner(MetadataObject 
gravitinoMetadataObject) {
+    List<AuthorizationSecurableObject> rangerSecurableObjects = new 
ArrayList<>();
+    switch (gravitinoMetadataObject.type()) {
+      case METALAKE:
+      case CATALOG:
+      case SCHEMA:
+        return rangerSecurableObjects;
+      case FILESET:
+        rangerSecurableObjects.add(
+            generateAuthorizationSecurableObject(
+                translateMetadataObject(gravitinoMetadataObject).names(),
+                RangerPathBaseMetadataObject.Type.PATH,
+                ownerMappingRule()));
+        break;
+      default:
+        throw new AuthorizationPluginException(
+            "The owner privilege is not supported for the securable object: 
%s",
+            gravitinoMetadataObject.type());
+    }
+
+    return rangerSecurableObjects;
+  }
+
+  @Override
+  public AuthorizationMetadataObject translateMetadataObject(MetadataObject 
metadataObject) {
+    Preconditions.checkArgument(
+        allowMetadataObjectTypesRule().contains(metadataObject.type()),
+        String.format(
+            "The metadata object type %s is not supported in the 
RangerAuthorizationHDFSPlugin",
+            metadataObject.type()));
+    List<String> nsMetadataObject =
+        
Lists.newArrayList(SecurableObjects.DOT_SPLITTER.splitToList(metadataObject.fullName()));
+    Preconditions.checkArgument(
+        nsMetadataObject.size() > 0, "The metadata object must have at least 
one name.");
+
+    if (metadataObject.type() == MetadataObject.Type.FILESET) {
+      RangerPathBaseMetadataObject rangerHDFSMetadataObject =
+          new RangerPathBaseMetadataObject(
+              getFileSetPath(metadataObject), 
RangerPathBaseMetadataObject.Type.PATH);
+      rangerHDFSMetadataObject.validateAuthorizationMetadataObject();
+      return rangerHDFSMetadataObject;
+    } else {
+      return new RangerPathBaseMetadataObject("", 
RangerPathBaseMetadataObject.Type.PATH);
+    }
+  }
+
+  public String getFileSetPath(MetadataObject metadataObject) {
+    FilesetDispatcher filesetDispatcher = 
GravitinoEnv.getInstance().filesetDispatcher();
+    NameIdentifier identifier =
+        NameIdentifier.parse(String.format("%s.%s", metalake, 
metadataObject.fullName()));
+    Fileset fileset = filesetDispatcher.loadFileset(identifier);
+    Preconditions.checkArgument(
+        fileset != null, String.format("Fileset %s is not found", identifier));
+    String filesetLocation = fileset.storageLocation();
+    Preconditions.checkArgument(
+        filesetLocation != null, String.format("Fileset %s location is not 
found", identifier));
+    return pattern.matcher(filesetLocation).replaceAll("");
+  }
+}
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
index 13b0400ec..0da5c105a 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java
@@ -41,6 +41,7 @@ import org.apache.gravitino.authorization.SecurableObjects;
 import 
org.apache.gravitino.authorization.ranger.RangerPrivileges.RangerHadoopSQLPrivilege;
 import 
org.apache.gravitino.authorization.ranger.reference.RangerDefines.PolicyResource;
 import org.apache.gravitino.exceptions.AuthorizationPluginException;
+import org.apache.ranger.plugin.model.RangerPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +104,38 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
         PolicyResource.COLUMN.getName());
   }
 
+  @Override
+  protected RangerPolicy createPolicyAddResources(AuthorizationMetadataObject 
metadataObject) {
+    RangerPolicy policy = new RangerPolicy();
+    policy.setService(rangerServiceName);
+    policy.setName(metadataObject.fullName());
+    List<String> nsMetadataObject = metadataObject.names();
+    for (int i = 0; i < nsMetadataObject.size(); i++) {
+      RangerPolicy.RangerPolicyResource policyResource =
+          new RangerPolicy.RangerPolicyResource(nsMetadataObject.get(i));
+      policy.getResources().put(policyResourceDefinesRule().get(i), 
policyResource);
+    }
+    return policy;
+  }
+
+  @Override
+  public AuthorizationSecurableObject generateAuthorizationSecurableObject(
+      List<String> names,
+      AuthorizationMetadataObject.Type type,
+      Set<AuthorizationPrivilege> privileges) {
+    AuthorizationMetadataObject authMetadataObject =
+        new RangerHadoopSQLMetadataObject(
+            AuthorizationMetadataObject.getParentFullName(names),
+            AuthorizationMetadataObject.getLastName(names),
+            type);
+    authMetadataObject.validateAuthorizationMetadataObject();
+    return new RangerHadoopSQLSecurableObject(
+        authMetadataObject.parent(),
+        authMetadataObject.name(),
+        authMetadataObject.type(),
+        privileges);
+  }
+
   @Override
   /** Allow privilege operation defines rule. */
   public Set<Privilege.Name> allowPrivilegesRule() {
@@ -143,13 +176,13 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
         AuthorizationSecurableObjects.add(
             generateAuthorizationSecurableObject(
                 ImmutableList.of(RangerHelper.RESOURCE_ALL),
-                RangerMetadataObject.Type.SCHEMA,
+                RangerHadoopSQLMetadataObject.Type.SCHEMA,
                 ownerMappingRule()));
         // Add `*.*` for the TABLE permission
         AuthorizationSecurableObjects.add(
             generateAuthorizationSecurableObject(
                 ImmutableList.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
-                RangerMetadataObject.Type.TABLE,
+                RangerHadoopSQLMetadataObject.Type.TABLE,
                 ownerMappingRule()));
         // Add `*.*.*` for the COLUMN permission
         AuthorizationSecurableObjects.add(
@@ -158,7 +191,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                     RangerHelper.RESOURCE_ALL,
                     RangerHelper.RESOURCE_ALL,
                     RangerHelper.RESOURCE_ALL),
-                RangerMetadataObject.Type.COLUMN,
+                RangerHadoopSQLMetadataObject.Type.COLUMN,
                 ownerMappingRule()));
         break;
       case SCHEMA:
@@ -166,14 +199,14 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
         AuthorizationSecurableObjects.add(
             generateAuthorizationSecurableObject(
                 ImmutableList.of(gravitinoMetadataObject.name() /*Schema 
name*/),
-                RangerMetadataObject.Type.SCHEMA,
+                RangerHadoopSQLMetadataObject.Type.SCHEMA,
                 ownerMappingRule()));
         // Add `{schema}.*` for the TABLE permission
         AuthorizationSecurableObjects.add(
             generateAuthorizationSecurableObject(
                 ImmutableList.of(
                     gravitinoMetadataObject.name() /*Schema name*/, 
RangerHelper.RESOURCE_ALL),
-                RangerMetadataObject.Type.TABLE,
+                RangerHadoopSQLMetadataObject.Type.TABLE,
                 ownerMappingRule()));
         // Add `{schema}.*.*` for the COLUMN permission
         AuthorizationSecurableObjects.add(
@@ -182,7 +215,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                     gravitinoMetadataObject.name() /*Schema name*/,
                     RangerHelper.RESOURCE_ALL,
                     RangerHelper.RESOURCE_ALL),
-                RangerMetadataObject.Type.COLUMN,
+                RangerHadoopSQLMetadataObject.Type.COLUMN,
                 ownerMappingRule()));
         break;
       case TABLE:
@@ -190,7 +223,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
         AuthorizationSecurableObjects.add(
             generateAuthorizationSecurableObject(
                 translateMetadataObject(gravitinoMetadataObject).names(),
-                RangerMetadataObject.Type.TABLE,
+                RangerHadoopSQLMetadataObject.Type.TABLE,
                 ownerMappingRule()));
         // Add `{schema}.{table}.*` for the COLUMN permission
         AuthorizationSecurableObjects.add(
@@ -199,7 +232,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                         
translateMetadataObject(gravitinoMetadataObject).names().stream(),
                         Stream.of(RangerHelper.RESOURCE_ALL))
                     .collect(Collectors.toList()),
-                RangerMetadataObject.Type.COLUMN,
+                RangerHadoopSQLMetadataObject.Type.COLUMN,
                 ownerMappingRule()));
         break;
       default:
@@ -245,7 +278,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                       AuthorizationSecurableObjects.add(
                           generateAuthorizationSecurableObject(
                               ImmutableList.of(RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.SCHEMA,
+                              RangerHadoopSQLMetadataObject.Type.SCHEMA,
                               rangerPrivileges));
                       break;
                     default:
@@ -262,7 +295,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                       AuthorizationSecurableObjects.add(
                           generateAuthorizationSecurableObject(
                               ImmutableList.of(RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.SCHEMA,
+                              RangerHadoopSQLMetadataObject.Type.SCHEMA,
                               rangerPrivileges));
                       break;
                     default:
@@ -279,7 +312,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                       AuthorizationSecurableObjects.add(
                           generateAuthorizationSecurableObject(
                               ImmutableList.of(RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.SCHEMA,
+                              RangerHadoopSQLMetadataObject.Type.SCHEMA,
                               rangerPrivileges));
                       break;
                     case SCHEMA:
@@ -287,7 +320,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                       AuthorizationSecurableObjects.add(
                           generateAuthorizationSecurableObject(
                               ImmutableList.of(securableObject.name() /*Schema 
name*/),
-                              RangerMetadataObject.Type.SCHEMA,
+                              RangerHadoopSQLMetadataObject.Type.SCHEMA,
                               rangerPrivileges));
                       break;
                     default:
@@ -307,7 +340,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                           generateAuthorizationSecurableObject(
                               ImmutableList.of(
                                   RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.TABLE,
+                              RangerHadoopSQLMetadataObject.Type.TABLE,
                               rangerPrivileges));
                       // Add `*.*.*` for the COLUMN permission
                       AuthorizationSecurableObjects.add(
@@ -316,7 +349,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                                   RangerHelper.RESOURCE_ALL,
                                   RangerHelper.RESOURCE_ALL,
                                   RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.COLUMN,
+                              RangerHadoopSQLMetadataObject.Type.COLUMN,
                               rangerPrivileges));
                       break;
                     case SCHEMA:
@@ -326,7 +359,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                               ImmutableList.of(
                                   securableObject.name() /*Schema name*/,
                                   RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.TABLE,
+                              RangerHadoopSQLMetadataObject.Type.TABLE,
                               rangerPrivileges));
                       // Add `{schema}.*.*` for the COLUMN permission
                       AuthorizationSecurableObjects.add(
@@ -335,7 +368,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                                   securableObject.name() /*Schema name*/,
                                   RangerHelper.RESOURCE_ALL,
                                   RangerHelper.RESOURCE_ALL),
-                              RangerMetadataObject.Type.COLUMN,
+                              RangerHadoopSQLMetadataObject.Type.COLUMN,
                               rangerPrivileges));
                       break;
                     case TABLE:
@@ -348,7 +381,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                         AuthorizationSecurableObjects.add(
                             generateAuthorizationSecurableObject(
                                 
translateMetadataObject(securableObject).names(),
-                                RangerMetadataObject.Type.TABLE,
+                                RangerHadoopSQLMetadataObject.Type.TABLE,
                                 rangerPrivileges));
                         // Add `{schema}.{table}.*` for the COLUMN permission
                         AuthorizationSecurableObjects.add(
@@ -357,7 +390,7 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
                                         
translateMetadataObject(securableObject).names().stream(),
                                         Stream.of(RangerHelper.RESOURCE_ALL))
                                     .collect(Collectors.toList()),
-                                RangerMetadataObject.Type.COLUMN,
+                                RangerHadoopSQLMetadataObject.Type.COLUMN,
                                 rangerPrivileges));
                       }
                       break;
@@ -403,18 +436,18 @@ public class RangerAuthorizationHadoopSQLPlugin extends 
RangerAuthorizationPlugi
         || metadataObject.type() == MetadataObject.Type.CATALOG) {
       nsMetadataObject.clear();
       nsMetadataObject.add(RangerHelper.RESOURCE_ALL);
-      type = RangerMetadataObject.Type.SCHEMA;
+      type = RangerHadoopSQLMetadataObject.Type.SCHEMA;
     } else {
       nsMetadataObject.remove(0); // Remove the catalog name
-      type = RangerMetadataObject.Type.fromMetadataType(metadataObject.type());
+      type = 
RangerHadoopSQLMetadataObject.Type.fromMetadataType(metadataObject.type());
     }
 
-    RangerMetadataObject rangerMetadataObject =
-        new RangerMetadataObject(
+    RangerHadoopSQLMetadataObject rangerHadoopSQLMetadataObject =
+        new RangerHadoopSQLMetadataObject(
             AuthorizationMetadataObject.getParentFullName(nsMetadataObject),
             AuthorizationMetadataObject.getLastName(nsMetadataObject),
             type);
-    rangerMetadataObject.validateAuthorizationMetadataObject();
-    return rangerMetadataObject;
+    rangerHadoopSQLMetadataObject.validateAuthorizationMetadataObject();
+    return rangerHadoopSQLMetadataObject;
   }
 }
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java
index d2b1b7570..a3ce047aa 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java
@@ -122,6 +122,57 @@ public abstract class RangerAuthorizationPlugin
    */
   public abstract List<String> policyResourceDefinesRule();
 
+  /**
+   * Create a new policy for metadata object
+   *
+   * @return The RangerPolicy for metadata object.
+   */
+  protected abstract RangerPolicy createPolicyAddResources(
+      AuthorizationMetadataObject metadataObject);
+
+  protected RangerPolicy addOwnerToNewPolicy(
+      AuthorizationMetadataObject metadataObject, Owner newOwner) {
+    RangerPolicy policy = createPolicyAddResources(metadataObject);
+    ownerMappingRule()
+        .forEach(
+            ownerPrivilege -> {
+              // Each owner's privilege will create one RangerPolicyItemAccess 
in the policy
+              RangerPolicy.RangerPolicyItem policyItem = new 
RangerPolicy.RangerPolicyItem();
+              policyItem
+                  .getAccesses()
+                  .add(new 
RangerPolicy.RangerPolicyItemAccess(ownerPrivilege.getName()));
+              if (newOwner != null) {
+                if (newOwner.type() == Owner.Type.USER) {
+                  policyItem.getUsers().add(newOwner.name());
+                } else {
+                  policyItem.getGroups().add(newOwner.name());
+                }
+                // mark the policy item is created by Gravitino
+                policyItem.getRoles().add(RangerHelper.GRAVITINO_OWNER_ROLE);
+              }
+              policy.getPolicyItems().add(policyItem);
+            });
+    return policy;
+  }
+
+  protected RangerPolicy addOwnerRoleToNewPolicy(
+      AuthorizationMetadataObject metadataObject, String ownerRoleName) {
+    RangerPolicy policy = createPolicyAddResources(metadataObject);
+
+    ownerMappingRule()
+        .forEach(
+            ownerPrivilege -> {
+              // Each owner's privilege will create one RangerPolicyItemAccess 
in the policy
+              RangerPolicy.RangerPolicyItem policyItem = new 
RangerPolicy.RangerPolicyItem();
+              policyItem
+                  .getAccesses()
+                  .add(new 
RangerPolicy.RangerPolicyItemAccess(ownerPrivilege.getName()));
+              
policyItem.getRoles().add(rangerHelper.generateGravitinoRoleName(ownerRoleName));
+              policy.getPolicyItems().add(policyItem);
+            });
+    return policy;
+  }
+
   /**
    * Create a new role in the Ranger. <br>
    * 1. Create a policy for metadata object. <br>
@@ -277,9 +328,11 @@ public abstract class RangerAuthorizationPlugin
       } else if (change instanceof MetadataObjectChange.RemoveMetadataObject) {
         MetadataObject metadataObject =
             ((MetadataObjectChange.RemoveMetadataObject) 
change).metadataObject();
-        AuthorizationMetadataObject AuthorizationMetadataObject =
-            translateMetadataObject(metadataObject);
-        doRemoveMetadataObject(AuthorizationMetadataObject);
+        if (metadataObject.type() != MetadataObject.Type.FILESET) {
+          AuthorizationMetadataObject AuthorizationMetadataObject =
+              translateMetadataObject(metadataObject);
+          doRemoveMetadataObject(AuthorizationMetadataObject);
+        }
       } else {
         throw new IllegalArgumentException(
             "Unsupported metadata object change type: "
@@ -385,9 +438,7 @@ public abstract class RangerAuthorizationPlugin
                       
rangerHelper.findManagedPolicy(AuthorizationSecurableObject);
                   try {
                     if (policy == null) {
-                      policy =
-                          rangerHelper.addOwnerRoleToNewPolicy(
-                              AuthorizationSecurableObject, ownerRoleName);
+                      policy = 
addOwnerRoleToNewPolicy(AuthorizationSecurableObject, ownerRoleName);
                       rangerClient.createPolicy(policy);
                     } else {
                       rangerHelper.updatePolicyOwnerRole(policy, 
ownerRoleName);
@@ -401,6 +452,7 @@ public abstract class RangerAuthorizationPlugin
         break;
       case SCHEMA:
       case TABLE:
+      case FILESET:
         // The schema and table use user/group to manage the owner
         AuthorizationSecurableObjects.stream()
             .forEach(
@@ -409,8 +461,7 @@ public abstract class RangerAuthorizationPlugin
                       
rangerHelper.findManagedPolicy(AuthorizationSecurableObject);
                   try {
                     if (policy == null) {
-                      policy =
-                          
rangerHelper.addOwnerToNewPolicy(AuthorizationSecurableObject, newOwner);
+                      policy = 
addOwnerToNewPolicy(AuthorizationSecurableObject, newOwner);
                       rangerClient.createPolicy(policy);
                     } else {
                       rangerHelper.updatePolicyOwner(policy, preOwner, 
newOwner);
@@ -684,7 +735,7 @@ public abstract class RangerAuthorizationPlugin
         return true;
       }
     } else {
-      policy = rangerHelper.createPolicyAddResources(securableObject);
+      policy = createPolicyAddResources(securableObject);
     }
 
     rangerHelper.addPolicyItem(policy, roleName, securableObject);
@@ -807,6 +858,9 @@ public abstract class RangerAuthorizationPlugin
       case COLUMN:
         removePolicyByMetadataObject(authMetadataObject.names());
         break;
+      case FILESET:
+        // can not get fileset path in this case, do nothing
+        break;
       default:
         throw new IllegalArgumentException(
             "Unsupported metadata object type: " + authMetadataObject.type());
@@ -819,7 +873,7 @@ public abstract class RangerAuthorizationPlugin
    */
   private void doRemoveSchemaMetadataObject(AuthorizationMetadataObject 
authMetadataObject) {
     Preconditions.checkArgument(
-        authMetadataObject.type() == RangerMetadataObject.Type.SCHEMA,
+        authMetadataObject.type() == RangerHadoopSQLMetadataObject.Type.SCHEMA,
         "The metadata object type must be SCHEMA");
     Preconditions.checkArgument(
         authMetadataObject.names().size() == 1, "The metadata object names 
must be 1");
@@ -894,6 +948,9 @@ public abstract class RangerAuthorizationPlugin
       case COLUMN:
         doRenameColumnMetadataObject(AuthorizationMetadataObject, 
newAuthMetadataObject);
         break;
+      case FILESET:
+        // do nothing when fileset is renamed
+        break;
       default:
         throw new IllegalArgumentException(
             "Unsupported metadata object type: " + 
AuthorizationMetadataObject.type());
@@ -1083,22 +1140,10 @@ public abstract class RangerAuthorizationPlugin
   public void close() throws IOException {}
 
   /** Generate authorization securable object */
-  public AuthorizationSecurableObject generateAuthorizationSecurableObject(
+  public abstract AuthorizationSecurableObject 
generateAuthorizationSecurableObject(
       List<String> names,
       AuthorizationMetadataObject.Type type,
-      Set<AuthorizationPrivilege> privileges) {
-    AuthorizationMetadataObject authMetadataObject =
-        new RangerMetadataObject(
-            AuthorizationMetadataObject.getParentFullName(names),
-            AuthorizationMetadataObject.getLastName(names),
-            type);
-    authMetadataObject.validateAuthorizationMetadataObject();
-    return new RangerSecurableObject(
-        authMetadataObject.parent(),
-        authMetadataObject.name(),
-        authMetadataObject.type(),
-        privileges);
-  }
+      Set<AuthorizationPrivilege> privileges);
 
   public boolean validAuthorizationOperation(List<SecurableObject> 
securableObjects) {
     return securableObjects.stream()
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLMetadataObject.java
similarity index 88%
copy from 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
copy to 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLMetadataObject.java
index b9354ee46..8462a0e07 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLMetadataObject.java
@@ -24,7 +24,7 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.authorization.AuthorizationMetadataObject;
 
 /** The helper class for {@link AuthorizationMetadataObject}. */
-public class RangerMetadataObject implements AuthorizationMetadataObject {
+public class RangerHadoopSQLMetadataObject implements 
AuthorizationMetadataObject {
   /**
    * The type of object in the Ranger system. Every type will map one kind of 
the entity of the
    * Gravitino type system.
@@ -36,7 +36,6 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
     TABLE(MetadataObject.Type.TABLE),
     /** A column is a sub-collection of the table that represents a group of 
same type data. */
     COLUMN(MetadataObject.Type.COLUMN);
-
     private final MetadataObject.Type metadataType;
 
     Type(MetadataObject.Type type) {
@@ -72,7 +71,8 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
    * @param name The name of the metadata object
    * @param type The type of the metadata object
    */
-  public RangerMetadataObject(String parent, String name, 
AuthorizationMetadataObject.Type type) {
+  public RangerHadoopSQLMetadataObject(
+      String parent, String name, AuthorizationMetadataObject.Type type) {
     this.parent = parent;
     this.name = name;
     this.type = type;
@@ -110,15 +110,15 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
         type != null, "Cannot create a Ranger metadata object with no type");
 
     Preconditions.checkArgument(
-        names.size() != 1 || type == RangerMetadataObject.Type.SCHEMA,
+        names.size() != 1 || type == RangerHadoopSQLMetadataObject.Type.SCHEMA,
         "If the length of names is 1, it must be the SCHEMA type");
 
     Preconditions.checkArgument(
-        names.size() != 2 || type == RangerMetadataObject.Type.TABLE,
+        names.size() != 2 || type == RangerHadoopSQLMetadataObject.Type.TABLE,
         "If the length of names is 2, it must be the TABLE type");
 
     Preconditions.checkArgument(
-        names.size() != 3 || type == RangerMetadataObject.Type.COLUMN,
+        names.size() != 3 || type == RangerHadoopSQLMetadataObject.Type.COLUMN,
         "If the length of names is 3, it must be COLUMN");
 
     for (String name : names) {
@@ -132,11 +132,11 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
       return true;
     }
 
-    if (!(o instanceof RangerMetadataObject)) {
+    if (!(o instanceof RangerHadoopSQLMetadataObject)) {
       return false;
     }
 
-    RangerMetadataObject that = (RangerMetadataObject) o;
+    RangerHadoopSQLMetadataObject that = (RangerHadoopSQLMetadataObject) o;
     return java.util.Objects.equals(name, that.name)
         && java.util.Objects.equals(parent, that.parent)
         && type == that.type;
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLSecurableObject.java
similarity index 90%
copy from 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
copy to 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLSecurableObject.java
index 3a6294f82..4aabdc4c3 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHadoopSQLSecurableObject.java
@@ -26,8 +26,8 @@ import 
org.apache.gravitino.authorization.AuthorizationMetadataObject;
 import org.apache.gravitino.authorization.AuthorizationPrivilege;
 import org.apache.gravitino.authorization.AuthorizationSecurableObject;
 
-/** The helper class for {@link RangerSecurableObject}. */
-public class RangerSecurableObject extends RangerMetadataObject
+/** The helper class for {@link RangerHadoopSQLSecurableObject}. */
+public class RangerHadoopSQLSecurableObject extends 
RangerHadoopSQLMetadataObject
     implements AuthorizationSecurableObject {
   private final List<AuthorizationPrivilege> privileges;
 
@@ -38,7 +38,7 @@ public class RangerSecurableObject extends 
RangerMetadataObject
    * @param name The name of the metadata object
    * @param type The type of the metadata object
    */
-  public RangerSecurableObject(
+  public RangerHadoopSQLSecurableObject(
       String parent,
       String name,
       AuthorizationMetadataObject.Type type,
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHelper.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHelper.java
index d955f7656..4c2b2956c 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHelper.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerHelper.java
@@ -442,61 +442,6 @@ public class RangerHelper {
             });
   }
 
-  protected RangerPolicy createPolicyAddResources(AuthorizationMetadataObject 
metadataObject) {
-    RangerPolicy policy = new RangerPolicy();
-    policy.setService(rangerServiceName);
-    policy.setName(metadataObject.fullName());
-    List<String> nsMetadataObject = metadataObject.names();
-    for (int i = 0; i < nsMetadataObject.size(); i++) {
-      RangerPolicy.RangerPolicyResource policyResource =
-          new RangerPolicy.RangerPolicyResource(nsMetadataObject.get(i));
-      policy.getResources().put(policyResourceDefines.get(i), policyResource);
-    }
-    return policy;
-  }
-
-  protected RangerPolicy addOwnerToNewPolicy(
-      AuthorizationMetadataObject metadataObject, Owner newOwner) {
-    RangerPolicy policy = createPolicyAddResources(metadataObject);
-
-    ownerPrivileges.forEach(
-        ownerPrivilege -> {
-          // Each owner's privilege will create one RangerPolicyItemAccess in 
the policy
-          RangerPolicy.RangerPolicyItem policyItem = new 
RangerPolicy.RangerPolicyItem();
-          policyItem
-              .getAccesses()
-              .add(new 
RangerPolicy.RangerPolicyItemAccess(ownerPrivilege.getName()));
-          if (newOwner != null) {
-            if (newOwner.type() == Owner.Type.USER) {
-              policyItem.getUsers().add(newOwner.name());
-            } else {
-              policyItem.getGroups().add(newOwner.name());
-            }
-            // mark the policy item is created by Gravitino
-            policyItem.getRoles().add(GRAVITINO_OWNER_ROLE);
-          }
-          policy.getPolicyItems().add(policyItem);
-        });
-    return policy;
-  }
-
-  protected RangerPolicy addOwnerRoleToNewPolicy(
-      AuthorizationMetadataObject metadataObject, String ownerRoleName) {
-    RangerPolicy policy = createPolicyAddResources(metadataObject);
-
-    ownerPrivileges.forEach(
-        ownerPrivilege -> {
-          // Each owner's privilege will create one RangerPolicyItemAccess in 
the policy
-          RangerPolicy.RangerPolicyItem policyItem = new 
RangerPolicy.RangerPolicyItem();
-          policyItem
-              .getAccesses()
-              .add(new 
RangerPolicy.RangerPolicyItemAccess(ownerPrivilege.getName()));
-          policyItem.getRoles().add(generateGravitinoRoleName(ownerRoleName));
-          policy.getPolicyItems().add(policyItem);
-        });
-    return policy;
-  }
-
   protected void updatePolicyOwnerRole(RangerPolicy policy, String 
ownerRoleName) {
     // Find matching policy items based on the owner's privileges
     List<RangerPolicy.RangerPolicyItem> matchPolicyItems =
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseMetadataObject.java
similarity index 52%
rename from 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
rename to 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseMetadataObject.java
index b9354ee46..775234641 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerMetadataObject.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseMetadataObject.java
@@ -19,24 +19,20 @@
 package org.apache.gravitino.authorization.ranger;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.authorization.AuthorizationMetadataObject;
 
-/** The helper class for {@link AuthorizationMetadataObject}. */
-public class RangerMetadataObject implements AuthorizationMetadataObject {
+public class RangerPathBaseMetadataObject implements 
AuthorizationMetadataObject {
   /**
    * The type of object in the Ranger system. Every type will map one kind of 
the entity of the
    * Gravitino type system.
    */
   public enum Type implements AuthorizationMetadataObject.Type {
-    /** A schema is a sub collection of the catalog. The schema can contain 
tables, columns, etc. */
-    SCHEMA(MetadataObject.Type.SCHEMA),
-    /** A table is mapped the table of relational data sources like Apache 
Hive, MySQL, etc. */
-    TABLE(MetadataObject.Type.TABLE),
-    /** A column is a sub-collection of the table that represents a group of 
same type data. */
-    COLUMN(MetadataObject.Type.COLUMN);
-
+    /** A path is mapped the path of storages like HDFS, S3 etc. */
+    PATH(MetadataObject.Type.FILESET);
     private final MetadataObject.Type metadataType;
 
     Type(MetadataObject.Type type) {
@@ -47,8 +43,9 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
       return metadataType;
     }
 
-    public static Type fromMetadataType(MetadataObject.Type metadataType) {
-      for (Type type : Type.values()) {
+    public static RangerHadoopSQLMetadataObject.Type fromMetadataType(
+        MetadataObject.Type metadataType) {
+      for (RangerHadoopSQLMetadataObject.Type type : 
RangerHadoopSQLMetadataObject.Type.values()) {
         if (type.metadataObjectType() == metadataType) {
           return type;
         }
@@ -58,44 +55,34 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
     }
   }
 
-  /** The implementation of the {@link MetadataObject}. */
-  private final String name;
-
-  private final String parent;
+  private final String path;
 
   private final AuthorizationMetadataObject.Type type;
 
-  /**
-   * Create the metadata object with the given name, parent and type.
-   *
-   * @param parent The parent of the metadata object
-   * @param name The name of the metadata object
-   * @param type The type of the metadata object
-   */
-  public RangerMetadataObject(String parent, String name, 
AuthorizationMetadataObject.Type type) {
-    this.parent = parent;
-    this.name = name;
+  public RangerPathBaseMetadataObject(String path, 
AuthorizationMetadataObject.Type type) {
+    this.path = path;
     this.type = type;
   }
 
+  @Nullable
   @Override
-  public String name() {
-    return name;
+  public String parent() {
+    return null;
   }
 
   @Override
-  public List<String> names() {
-    return DOT_SPLITTER.splitToList(fullName());
+  public String name() {
+    return this.path;
   }
 
   @Override
-  public String parent() {
-    return parent;
+  public List<String> names() {
+    return ImmutableList.of(this.path);
   }
 
   @Override
   public AuthorizationMetadataObject.Type type() {
-    return type;
+    return this.type;
   }
 
   @Override
@@ -104,51 +91,16 @@ public class RangerMetadataObject implements 
AuthorizationMetadataObject {
     Preconditions.checkArgument(
         names != null && !names.isEmpty(), "Cannot create a Ranger metadata 
object with no names");
     Preconditions.checkArgument(
-        names.size() <= 3,
-        "Cannot create a Ranger metadata object with the name length which is 
greater than 3");
+        names.size() == 1,
+        "Cannot create a Ranger metadata object with the name length which is 
1");
     Preconditions.checkArgument(
         type != null, "Cannot create a Ranger metadata object with no type");
 
     Preconditions.checkArgument(
-        names.size() != 1 || type == RangerMetadataObject.Type.SCHEMA,
-        "If the length of names is 1, it must be the SCHEMA type");
-
-    Preconditions.checkArgument(
-        names.size() != 2 || type == RangerMetadataObject.Type.TABLE,
-        "If the length of names is 2, it must be the TABLE type");
-
-    Preconditions.checkArgument(
-        names.size() != 3 || type == RangerMetadataObject.Type.COLUMN,
-        "If the length of names is 3, it must be COLUMN");
+        type == RangerPathBaseMetadataObject.Type.PATH, "it must be the PATH 
type");
 
     for (String name : names) {
       Preconditions.checkArgument(name != null, "Cannot create a metadata 
object with null name");
     }
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (!(o instanceof RangerMetadataObject)) {
-      return false;
-    }
-
-    RangerMetadataObject that = (RangerMetadataObject) o;
-    return java.util.Objects.equals(name, that.name)
-        && java.util.Objects.equals(parent, that.parent)
-        && type == that.type;
-  }
-
-  @Override
-  public int hashCode() {
-    return java.util.Objects.hash(name, parent, type);
-  }
-
-  @Override
-  public String toString() {
-    return "MetadataObject: [fullName=" + fullName() + "], [type=" + type + 
"]";
-  }
 }
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseSecurableObject.java
similarity index 66%
rename from 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
rename to 
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseSecurableObject.java
index 3a6294f82..bd2c73fda 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerSecurableObject.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerPathBaseSecurableObject.java
@@ -19,32 +19,21 @@
 package org.apache.gravitino.authorization.ranger;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 import java.util.List;
 import java.util.Set;
 import org.apache.gravitino.authorization.AuthorizationMetadataObject;
 import org.apache.gravitino.authorization.AuthorizationPrivilege;
 import org.apache.gravitino.authorization.AuthorizationSecurableObject;
 
-/** The helper class for {@link RangerSecurableObject}. */
-public class RangerSecurableObject extends RangerMetadataObject
+public class RangerPathBaseSecurableObject extends RangerPathBaseMetadataObject
     implements AuthorizationSecurableObject {
+
   private final List<AuthorizationPrivilege> privileges;
 
-  /**
-   * Create the Ranger securable object with the given name, parent and type.
-   *
-   * @param parent The parent of the metadata object
-   * @param name The name of the metadata object
-   * @param type The type of the metadata object
-   */
-  public RangerSecurableObject(
-      String parent,
-      String name,
-      AuthorizationMetadataObject.Type type,
-      Set<AuthorizationPrivilege> privileges) {
-    super(parent, name, type);
-    this.privileges = ImmutableList.copyOf(Sets.newHashSet(privileges));
+  public RangerPathBaseSecurableObject(
+      String path, AuthorizationMetadataObject.Type type, 
Set<AuthorizationPrivilege> privileges) {
+    super(path, type);
+    this.privileges = ImmutableList.copyOf(privileges);
   }
 
   @Override
diff --git 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/RangerDefines.java
 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/RangerDefines.java
index b81fc3fdc..570b0feec 100644
--- 
a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/RangerDefines.java
+++ 
b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/RangerDefines.java
@@ -37,8 +37,8 @@ public class RangerDefines {
     // In the Ranger 2.4.0 
agents-common/src/main/resources/service-defs/ranger-servicedef-hive.json
     DATABASE("database"),
     TABLE("table"),
-    COLUMN("column");
-
+    COLUMN("column"),
+    PATH("path");
     private final String name;
 
     PolicyResource(String name) {
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationHDFSPluginIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationHDFSPluginIT.java
new file mode 100644
index 000000000..e1eacba15
--- /dev/null
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationHDFSPluginIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger.integration.test;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.authorization.AuthorizationMetadataObject;
+import org.apache.gravitino.authorization.AuthorizationSecurableObject;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.ranger.RangerAuthorizationPlugin;
+import org.apache.gravitino.authorization.ranger.RangerPathBaseMetadataObject;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class RangerAuthorizationHDFSPluginIT {
+
+  private static RangerAuthorizationPlugin rangerAuthPlugin;
+
+  @BeforeAll
+  public static void setup() {
+    RangerITEnv.init(true);
+    rangerAuthPlugin = RangerITEnv.rangerAuthHDFSPlugin;
+  }
+
+  @AfterAll
+  public static void cleanup() {
+    RangerITEnv.cleanup();
+  }
+
+  @Test
+  public void testTranslateMetadataObject() {
+    MetadataObject metalake =
+        MetadataObjects.parse(String.format("metalake1"), 
MetadataObject.Type.METALAKE);
+    Assertions.assertEquals(
+        RangerPathBaseMetadataObject.Type.PATH,
+        rangerAuthPlugin.translateMetadataObject(metalake).type());
+
+    MetadataObject catalog =
+        MetadataObjects.parse(String.format("catalog1"), 
MetadataObject.Type.CATALOG);
+    Assertions.assertEquals(
+        RangerPathBaseMetadataObject.Type.PATH,
+        rangerAuthPlugin.translateMetadataObject(catalog).type());
+
+    MetadataObject schema =
+        MetadataObjects.parse(String.format("catalog1.schema1"), 
MetadataObject.Type.SCHEMA);
+    Assertions.assertEquals(
+        RangerPathBaseMetadataObject.Type.PATH,
+        rangerAuthPlugin.translateMetadataObject(schema).type());
+
+    MetadataObject table =
+        MetadataObjects.parse(String.format("catalog1.schema1.tab1"), 
MetadataObject.Type.TABLE);
+    Assertions.assertThrows(
+        IllegalArgumentException.class, () -> 
rangerAuthPlugin.translateMetadataObject(table));
+
+    MetadataObject fileset =
+        MetadataObjects.parse(
+            String.format("catalog1.schema1.fileset1"), 
MetadataObject.Type.FILESET);
+    AuthorizationMetadataObject rangerFileset = 
rangerAuthPlugin.translateMetadataObject(fileset);
+    Assertions.assertEquals(1, rangerFileset.names().size());
+    Assertions.assertEquals("/test", rangerFileset.fullName());
+    Assertions.assertEquals(RangerPathBaseMetadataObject.Type.PATH, 
rangerFileset.type());
+  }
+
+  @Test
+  public void testTranslatePrivilege() {
+    SecurableObject filesetInMetalake =
+        SecurableObjects.parse(
+            String.format("metalake1"),
+            MetadataObject.Type.METALAKE,
+            Lists.newArrayList(
+                Privileges.CreateFileset.allow(),
+                Privileges.ReadFileset.allow(),
+                Privileges.WriteFileset.allow()));
+    List<AuthorizationSecurableObject> filesetInMetalake1 =
+        rangerAuthPlugin.translatePrivilege(filesetInMetalake);
+    Assertions.assertEquals(0, filesetInMetalake1.size());
+
+    SecurableObject filesetInCatalog =
+        SecurableObjects.parse(
+            String.format("catalog1"),
+            MetadataObject.Type.CATALOG,
+            Lists.newArrayList(
+                Privileges.CreateFileset.allow(),
+                Privileges.ReadFileset.allow(),
+                Privileges.WriteFileset.allow()));
+    List<AuthorizationSecurableObject> filesetInCatalog1 =
+        rangerAuthPlugin.translatePrivilege(filesetInCatalog);
+    Assertions.assertEquals(0, filesetInCatalog1.size());
+
+    SecurableObject filesetInSchema =
+        SecurableObjects.parse(
+            String.format("catalog1.schema1"),
+            MetadataObject.Type.SCHEMA,
+            Lists.newArrayList(
+                Privileges.CreateFileset.allow(),
+                Privileges.ReadFileset.allow(),
+                Privileges.WriteFileset.allow()));
+    List<AuthorizationSecurableObject> filesetInSchema1 =
+        rangerAuthPlugin.translatePrivilege(filesetInSchema);
+    Assertions.assertEquals(0, filesetInSchema1.size());
+
+    SecurableObject filesetInFileset =
+        SecurableObjects.parse(
+            String.format("catalog1.schema1.fileset1"),
+            MetadataObject.Type.FILESET,
+            Lists.newArrayList(
+                Privileges.CreateFileset.allow(),
+                Privileges.ReadFileset.allow(),
+                Privileges.WriteFileset.allow()));
+    List<AuthorizationSecurableObject> filesetInFileset1 =
+        rangerAuthPlugin.translatePrivilege(filesetInFileset);
+    Assertions.assertEquals(2, filesetInFileset1.size());
+
+    filesetInFileset1.forEach(
+        securableObject -> {
+          Assertions.assertEquals(RangerPathBaseMetadataObject.Type.PATH, 
securableObject.type());
+          Assertions.assertEquals("/test", securableObject.fullName());
+          Assertions.assertEquals(2, securableObject.privileges().size());
+        });
+  }
+
+  @Test
+  public void testTranslateOwner() {
+    MetadataObject metalake =
+        MetadataObjects.parse(String.format("metalake1"), 
MetadataObject.Type.METALAKE);
+    List<AuthorizationSecurableObject> metalakeOwner = 
rangerAuthPlugin.translateOwner(metalake);
+    Assertions.assertEquals(0, metalakeOwner.size());
+
+    MetadataObject catalog =
+        MetadataObjects.parse(String.format("catalog1"), 
MetadataObject.Type.CATALOG);
+    List<AuthorizationSecurableObject> catalogOwner = 
rangerAuthPlugin.translateOwner(catalog);
+    Assertions.assertEquals(0, catalogOwner.size());
+
+    MetadataObject schema =
+        MetadataObjects.parse(String.format("catalog1.schema1"), 
MetadataObject.Type.SCHEMA);
+    List<AuthorizationSecurableObject> schemaOwner = 
rangerAuthPlugin.translateOwner(schema);
+    Assertions.assertEquals(0, schemaOwner.size());
+
+    MetadataObject fileset =
+        MetadataObjects.parse(
+            String.format("catalog1.schema1.fileset1"), 
MetadataObject.Type.FILESET);
+    List<AuthorizationSecurableObject> filesetOwner = 
rangerAuthPlugin.translateOwner(fileset);
+    Assertions.assertEquals(1, filesetOwner.size());
+    Assertions.assertEquals("/test", filesetOwner.get(0).fullName());
+    Assertions.assertEquals(RangerPathBaseMetadataObject.Type.PATH, 
filesetOwner.get(0).type());
+    Assertions.assertEquals(3, filesetOwner.get(0).privileges().size());
+  }
+}
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationPluginIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationPluginIT.java
index 50ca331d2..74ddf0784 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationPluginIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerAuthorizationPluginIT.java
@@ -31,8 +31,9 @@ import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.authorization.ranger.RangerAuthorizationPlugin;
+import org.apache.gravitino.authorization.ranger.RangerHadoopSQLMetadataObject;
 import org.apache.gravitino.authorization.ranger.RangerHelper;
-import org.apache.gravitino.authorization.ranger.RangerMetadataObject;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
@@ -44,10 +45,15 @@ public class RangerAuthorizationPluginIT {
 
   @BeforeAll
   public static void setup() {
-    RangerITEnv.init();
+    RangerITEnv.init(true);
     rangerAuthPlugin = RangerITEnv.rangerAuthHivePlugin;
   }
 
+  @AfterAll
+  public static void cleanup() {
+    RangerITEnv.cleanup();
+  }
+
   @Test
   public void testTranslateMetadataObject() {
     MetadataObject metalake =
@@ -55,21 +61,21 @@ public class RangerAuthorizationPluginIT {
     AuthorizationMetadataObject rangerMetalake = 
rangerAuthPlugin.translateMetadataObject(metalake);
     Assertions.assertEquals(1, rangerMetalake.names().size());
     Assertions.assertEquals(RangerHelper.RESOURCE_ALL, 
rangerMetalake.names().get(0));
-    Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
rangerMetalake.type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.SCHEMA, 
rangerMetalake.type());
 
     MetadataObject catalog =
         MetadataObjects.parse(String.format("catalog1"), 
MetadataObject.Type.CATALOG);
     AuthorizationMetadataObject rangerCatalog = 
rangerAuthPlugin.translateMetadataObject(catalog);
     Assertions.assertEquals(1, rangerCatalog.names().size());
     Assertions.assertEquals(RangerHelper.RESOURCE_ALL, 
rangerCatalog.names().get(0));
-    Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
rangerCatalog.type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.SCHEMA, 
rangerCatalog.type());
 
     MetadataObject schema =
         MetadataObjects.parse(String.format("catalog1.schema1"), 
MetadataObject.Type.SCHEMA);
     AuthorizationMetadataObject rangerSchema = 
rangerAuthPlugin.translateMetadataObject(schema);
     Assertions.assertEquals(1, rangerSchema.names().size());
     Assertions.assertEquals("schema1", rangerSchema.names().get(0));
-    Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
rangerSchema.type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.SCHEMA, 
rangerSchema.type());
 
     MetadataObject table =
         MetadataObjects.parse(String.format("catalog1.schema1.tab1"), 
MetadataObject.Type.TABLE);
@@ -77,7 +83,7 @@ public class RangerAuthorizationPluginIT {
     Assertions.assertEquals(2, rangerTable.names().size());
     Assertions.assertEquals("schema1", rangerTable.names().get(0));
     Assertions.assertEquals("tab1", rangerTable.names().get(1));
-    Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
rangerTable.type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
rangerTable.type());
   }
 
   @Test
@@ -92,7 +98,7 @@ public class RangerAuthorizationPluginIT {
     Assertions.assertEquals(1, createSchemaInMetalake1.size());
     Assertions.assertEquals(RangerHelper.RESOURCE_ALL, 
createSchemaInMetalake1.get(0).fullName());
     Assertions.assertEquals(
-        RangerMetadataObject.Type.SCHEMA, 
createSchemaInMetalake1.get(0).type());
+        RangerHadoopSQLMetadataObject.Type.SCHEMA, 
createSchemaInMetalake1.get(0).type());
 
     SecurableObject createSchemaInCatalog =
         SecurableObjects.parse(
@@ -103,7 +109,8 @@ public class RangerAuthorizationPluginIT {
         rangerAuthPlugin.translatePrivilege(createSchemaInCatalog);
     Assertions.assertEquals(1, createSchemaInCatalog1.size());
     Assertions.assertEquals(RangerHelper.RESOURCE_ALL, 
createSchemaInCatalog1.get(0).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
createSchemaInCatalog1.get(0).type());
+    Assertions.assertEquals(
+        RangerHadoopSQLMetadataObject.Type.SCHEMA, 
createSchemaInCatalog1.get(0).type());
 
     for (Privilege privilege :
         ImmutableList.of(
@@ -118,9 +125,9 @@ public class RangerAuthorizationPluginIT {
       List<AuthorizationSecurableObject> metalake1 = 
rangerAuthPlugin.translatePrivilege(metalake);
       Assertions.assertEquals(2, metalake1.size());
       Assertions.assertEquals("*.*", metalake1.get(0).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
metalake1.get(0).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
metalake1.get(0).type());
       Assertions.assertEquals("*.*.*", metalake1.get(1).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
metalake1.get(1).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
metalake1.get(1).type());
 
       SecurableObject catalog =
           SecurableObjects.parse(
@@ -130,9 +137,9 @@ public class RangerAuthorizationPluginIT {
       List<AuthorizationSecurableObject> catalog1 = 
rangerAuthPlugin.translatePrivilege(catalog);
       Assertions.assertEquals(2, catalog1.size());
       Assertions.assertEquals("*.*", catalog1.get(0).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
catalog1.get(0).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
catalog1.get(0).type());
       Assertions.assertEquals("*.*.*", catalog1.get(1).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
catalog1.get(1).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
catalog1.get(1).type());
 
       SecurableObject schema =
           SecurableObjects.parse(
@@ -142,9 +149,9 @@ public class RangerAuthorizationPluginIT {
       List<AuthorizationSecurableObject> schema1 = 
rangerAuthPlugin.translatePrivilege(schema);
       Assertions.assertEquals(2, schema1.size());
       Assertions.assertEquals("schema1.*", schema1.get(0).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
schema1.get(0).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
schema1.get(0).type());
       Assertions.assertEquals("schema1.*.*", schema1.get(1).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
schema1.get(1).type());
+      Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
schema1.get(1).type());
 
       if (!privilege.equals(Privileges.CreateTable.allow())) {
         // `CREATE_TABLE` not support securable object for table, So ignore 
check for table.
@@ -156,9 +163,9 @@ public class RangerAuthorizationPluginIT {
         List<AuthorizationSecurableObject> table1 = 
rangerAuthPlugin.translatePrivilege(table);
         Assertions.assertEquals(2, table1.size());
         Assertions.assertEquals("schema1.table1", table1.get(0).fullName());
-        Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
table1.get(0).type());
+        Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
table1.get(0).type());
         Assertions.assertEquals("schema1.table1.*", table1.get(1).fullName());
-        Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
table1.get(1).type());
+        Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
table1.get(1).type());
       }
     }
   }
@@ -171,31 +178,34 @@ public class RangerAuthorizationPluginIT {
       List<AuthorizationSecurableObject> metalakeOwner = 
rangerAuthPlugin.translateOwner(metalake);
       Assertions.assertEquals(3, metalakeOwner.size());
       Assertions.assertEquals(RangerHelper.RESOURCE_ALL, 
metalakeOwner.get(0).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
metalakeOwner.get(0).type());
+      Assertions.assertEquals(
+          RangerHadoopSQLMetadataObject.Type.SCHEMA, 
metalakeOwner.get(0).type());
       Assertions.assertEquals("*.*", metalakeOwner.get(1).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
metalakeOwner.get(1).type());
+      Assertions.assertEquals(
+          RangerHadoopSQLMetadataObject.Type.TABLE, 
metalakeOwner.get(1).type());
       Assertions.assertEquals("*.*.*", metalakeOwner.get(2).fullName());
-      Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
metalakeOwner.get(2).type());
+      Assertions.assertEquals(
+          RangerHadoopSQLMetadataObject.Type.COLUMN, 
metalakeOwner.get(2).type());
     }
 
     MetadataObject schema = MetadataObjects.parse("catalog1.schema1", 
MetadataObject.Type.SCHEMA);
     List<AuthorizationSecurableObject> schemaOwner = 
rangerAuthPlugin.translateOwner(schema);
     Assertions.assertEquals(3, schemaOwner.size());
     Assertions.assertEquals("schema1", schemaOwner.get(0).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.SCHEMA, 
schemaOwner.get(0).type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.SCHEMA, 
schemaOwner.get(0).type());
     Assertions.assertEquals("schema1.*", schemaOwner.get(1).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
schemaOwner.get(1).type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
schemaOwner.get(1).type());
     Assertions.assertEquals("schema1.*.*", schemaOwner.get(2).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
schemaOwner.get(2).type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
schemaOwner.get(2).type());
 
     MetadataObject table =
         MetadataObjects.parse("catalog1.schema1.table1", 
MetadataObject.Type.TABLE);
     List<AuthorizationSecurableObject> tableOwner = 
rangerAuthPlugin.translateOwner(table);
     Assertions.assertEquals(2, tableOwner.size());
     Assertions.assertEquals("schema1.table1", tableOwner.get(0).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.TABLE, 
tableOwner.get(0).type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.TABLE, 
tableOwner.get(0).type());
     Assertions.assertEquals("schema1.table1.*", tableOwner.get(1).fullName());
-    Assertions.assertEquals(RangerMetadataObject.Type.COLUMN, 
tableOwner.get(1).type());
+    Assertions.assertEquals(RangerHadoopSQLMetadataObject.Type.COLUMN, 
tableOwner.get(1).type());
   }
 
   @Test
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerFilesetIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerFilesetIT.java
new file mode 100644
index 000000000..bbaae3278
--- /dev/null
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerFilesetIT.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger.integration.test;
+
+import static org.apache.gravitino.Catalog.AUTHORIZATION_PROVIDER;
+import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.currentFunName;
+import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.rangerClient;
+import static 
org.apache.gravitino.authorization.ranger.integration.test.RangerITEnv.rangerHelper;
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.IMPERSONATION_ENABLE;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_AUTH_TYPE;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_PASSWORD;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_SERVICE_NAME;
+import static 
org.apache.gravitino.connector.AuthorizationPropertiesMeta.RANGER_USERNAME;
+import static 
org.apache.gravitino.integration.test.container.RangerContainer.RANGER_SERVER_PORT;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.ranger.RangerHelper;
+import org.apache.gravitino.authorization.ranger.RangerPrivileges;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.container.RangerContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class RangerFilesetIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerFilesetIT.class);
+
+  private String RANGER_ADMIN_URL;
+  private String defaultBaseLocation;
+  private String metalakeName = "metalake";
+  private String catalogName = 
GravitinoITUtils.genRandomName("RangerFilesetE2EIT_catalog");
+  private String schemaName = 
GravitinoITUtils.genRandomName("RangerFilesetE2EIT_schema");
+  private static final String provider = "hadoop";
+  private FileSystem fileSystem;
+  private GravitinoMetalake metalake;
+  private Catalog catalog;
+
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    // Enable Gravitino Authorization mode
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.ENABLE_AUTHORIZATION.getKey(), String.valueOf(true));
+    configs.put(Configs.SERVICE_ADMINS.getKey(), RangerITEnv.HADOOP_USER_NAME);
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.SIMPLE.name().toLowerCase());
+    configs.put("SimpleAuthUserName", AuthConstants.ANONYMOUS_USER);
+    registerCustomConfigs(configs);
+    super.startIntegrationTest();
+
+    RangerITEnv.init(false);
+    RangerITEnv.startHiveRangerContainer();
+
+    RANGER_ADMIN_URL =
+        String.format(
+            "http://%s:%d";,
+            containerSuite.getRangerContainer().getContainerIpAddress(), 
RANGER_SERVER_PORT);
+
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", defaultBaseLocation());
+    fileSystem = FileSystem.get(conf);
+
+    createCatalogAndSchema();
+  }
+
+  @AfterAll
+  public void stop() throws IOException {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, false);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach((catalogName -> metalake.dropCatalog(catalogName, true)));
+      client.disableMetalake(metalakeName);
+      client.dropMetalake(metalakeName);
+    }
+    if (fileSystem != null) {
+      fileSystem.close();
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+    client = null;
+    RangerITEnv.cleanup();
+  }
+
+  @Test
+  @Order(0)
+  void testReadWritePath() throws IOException, RangerServiceException {
+    String filename = 
GravitinoITUtils.genRandomName("RangerFilesetE2EIT_fileset");
+    Fileset fileset =
+        catalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(schemaName, filename),
+                "comment",
+                Fileset.Type.MANAGED,
+                storageLocation(filename),
+                null);
+    Assertions.assertTrue(
+        catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
fileset.name())));
+    Assertions.assertTrue(fileSystem.exists(new 
Path(storageLocation(filename))));
+    List<RangerPolicy> policies =
+        rangerClient.getPoliciesInService(RangerITEnv.RANGER_HDFS_REPO_NAME);
+    Assertions.assertEquals(1, policies.size());
+    Assertions.assertEquals(3, policies.get(0).getPolicyItems().size());
+
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(item -> 
item.getRoles().contains(RangerHelper.GRAVITINO_OWNER_ROLE))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.READ.getName())))
+            .count());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(item -> 
item.getRoles().contains(RangerHelper.GRAVITINO_OWNER_ROLE))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.WRITE.getName())))
+            .count());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(item -> 
item.getRoles().contains(RangerHelper.GRAVITINO_OWNER_ROLE))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    .equals(
+                                        
RangerPrivileges.RangerHdfsPrivilege.EXECUTE.getName())))
+            .count());
+
+    String filesetRole = currentFunName();
+    SecurableObject securableObject =
+        SecurableObjects.parse(
+            String.format("%s.%s.%s", catalogName, schemaName, fileset.name()),
+            MetadataObject.Type.FILESET,
+            Lists.newArrayList(Privileges.ReadFileset.allow()));
+    metalake.createRole(filesetRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+
+    policies = 
rangerClient.getPoliciesInService(RangerITEnv.RANGER_HDFS_REPO_NAME);
+    Assertions.assertEquals(1, policies.size());
+    Assertions.assertEquals(3, policies.get(0).getPolicyItems().size());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.READ.getName())))
+            .count());
+    Assertions.assertEquals(
+        0,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.WRITE.getName())))
+            .count());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    .equals(
+                                        
RangerPrivileges.RangerHdfsPrivilege.EXECUTE.getName())))
+            .count());
+
+    metalake.grantPrivilegesToRole(
+        filesetRole,
+        MetadataObjects.of(
+            String.format("%s.%s", catalogName, schemaName),
+            fileset.name(),
+            MetadataObject.Type.FILESET),
+        Lists.newArrayList(Privileges.WriteFileset.allow()));
+
+    policies = 
rangerClient.getPoliciesInService(RangerITEnv.RANGER_HDFS_REPO_NAME);
+    Assertions.assertEquals(1, policies.size());
+    Assertions.assertEquals(3, policies.get(0).getPolicyItems().size());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.READ.getName())))
+            .count());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.WRITE.getName())))
+            .count());
+    Assertions.assertEquals(
+        1,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    .equals(
+                                        
RangerPrivileges.RangerHdfsPrivilege.EXECUTE.getName())))
+            .count());
+
+    metalake.revokePrivilegesFromRole(
+        filesetRole,
+        MetadataObjects.of(
+            String.format("%s.%s", catalogName, schemaName),
+            fileset.name(),
+            MetadataObject.Type.FILESET),
+        Lists.newArrayList(Privileges.ReadFileset.allow(), 
Privileges.WriteFileset.allow()));
+    policies = 
rangerClient.getPoliciesInService(RangerITEnv.RANGER_HDFS_REPO_NAME);
+    Assertions.assertEquals(1, policies.size());
+    Assertions.assertEquals(3, policies.get(0).getPolicyItems().size());
+    Assertions.assertEquals(
+        0,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.READ.getName())))
+            .count());
+    Assertions.assertEquals(
+        0,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    
.equals(RangerPrivileges.RangerHdfsPrivilege.WRITE.getName())))
+            .count());
+    Assertions.assertEquals(
+        0,
+        policies.get(0).getPolicyItems().stream()
+            .filter(
+                item ->
+                    
item.getRoles().contains(rangerHelper.generateGravitinoRoleName(filesetRole)))
+            .filter(
+                item ->
+                    item.getAccesses().stream()
+                        .anyMatch(
+                            access ->
+                                access
+                                    .getType()
+                                    .equals(
+                                        
RangerPrivileges.RangerHdfsPrivilege.EXECUTE.getName())))
+            .count());
+
+    catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(schemaName, 
fileset.name()));
+    policies = 
rangerClient.getPoliciesInService(RangerITEnv.RANGER_HDFS_REPO_NAME);
+    Assertions.assertEquals(1, policies.size());
+    Assertions.assertEquals(3, policies.get(0).getPolicyItems().size());
+  }
+
+  @Test
+  @Order(1)
+  void testReadWritePathE2E() throws IOException, RangerServiceException, 
InterruptedException {
+    String filenameRole = 
GravitinoITUtils.genRandomName("RangerFilesetE2EIT_fileset");
+    Fileset fileset =
+        catalog
+            .asFilesetCatalog()
+            .createFileset(
+                NameIdentifier.of(schemaName, filenameRole),
+                "comment",
+                Fileset.Type.MANAGED,
+                storageLocation(filenameRole),
+                null);
+    Assertions.assertTrue(
+        catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
fileset.name())));
+    Assertions.assertTrue(fileSystem.exists(new 
Path(storageLocation(filenameRole))));
+    FsPermission permission = new FsPermission("700");
+    fileSystem.setPermission(new Path(storageLocation(filenameRole)), 
permission);
+
+    String userName = "userTestReadWritePathE2E";
+    metalake.addUser(userName);
+
+    UserGroupInformation.createProxyUser(userName, 
UserGroupInformation.getCurrentUser())
+        .doAs(
+            (PrivilegedExceptionAction<Void>)
+                () -> {
+                  Configuration conf = new Configuration();
+                  conf.set("fs.defaultFS", defaultBaseLocation());
+                  FileSystem userFileSystem = FileSystem.get(conf);
+                  Assertions.assertThrows(
+                      Exception.class,
+                      () ->
+                          userFileSystem.listFiles(new 
Path(storageLocation(filenameRole)), false));
+                  Assertions.assertThrows(
+                      Exception.class,
+                      () ->
+                          userFileSystem.mkdirs(
+                              new Path(
+                                  String.format("%s/%s", 
storageLocation(filenameRole), "test1"))));
+                  userFileSystem.close();
+                  return null;
+                });
+
+    String filesetRole = currentFunName() + "_testReadWritePathE2E";
+    SecurableObject securableObject =
+        SecurableObjects.parse(
+            String.format("%s.%s.%s", catalogName, schemaName, fileset.name()),
+            MetadataObject.Type.FILESET,
+            Lists.newArrayList(Privileges.ReadFileset.allow()));
+    metalake.createRole(filesetRole, Collections.emptyMap(), 
Lists.newArrayList(securableObject));
+    metalake.grantRolesToUser(Lists.newArrayList(filesetRole), userName);
+    RangerBaseE2EIT.waitForUpdatingPolicies();
+
+    UserGroupInformation.createProxyUser(userName, 
UserGroupInformation.getCurrentUser())
+        .doAs(
+            (PrivilegedExceptionAction<Void>)
+                () -> {
+                  FileSystem userFileSystem =
+                      FileSystem.get(
+                          new Configuration() {
+                            {
+                              set("fs.defaultFS", defaultBaseLocation());
+                            }
+                          });
+                  Assertions.assertDoesNotThrow(
+                      () ->
+                          userFileSystem.listFiles(new 
Path(storageLocation(filenameRole)), false));
+                  Assertions.assertThrows(
+                      Exception.class,
+                      () ->
+                          userFileSystem.mkdirs(
+                              new Path(
+                                  String.format("%s/%s", 
storageLocation(filenameRole), "test2"))));
+                  userFileSystem.close();
+                  return null;
+                });
+
+    MetadataObject filesetObject =
+        MetadataObjects.of(
+            String.format("%s.%s", catalogName, schemaName),
+            fileset.name(),
+            MetadataObject.Type.FILESET);
+    metalake.grantPrivilegesToRole(
+        filesetRole, filesetObject, 
Lists.newArrayList(Privileges.WriteFileset.allow()));
+    RangerBaseE2EIT.waitForUpdatingPolicies();
+    UserGroupInformation.createProxyUser(userName, 
UserGroupInformation.getCurrentUser())
+        .doAs(
+            (PrivilegedExceptionAction<Void>)
+                () -> {
+                  FileSystem userFileSystem =
+                      FileSystem.get(
+                          new Configuration() {
+                            {
+                              set("fs.defaultFS", defaultBaseLocation());
+                            }
+                          });
+                  Assertions.assertDoesNotThrow(
+                      () ->
+                          userFileSystem.listFiles(new 
Path(storageLocation(filenameRole)), false));
+                  Assertions.assertDoesNotThrow(
+                      () ->
+                          userFileSystem.mkdirs(
+                              new Path(
+                                  String.format("%s/%s", 
storageLocation(filenameRole), "test3"))));
+                  userFileSystem.close();
+                  return null;
+                });
+
+    metalake.revokePrivilegesFromRole(
+        filesetRole,
+        filesetObject,
+        Lists.newArrayList(Privileges.ReadFileset.allow(), 
Privileges.WriteFileset.allow()));
+    RangerBaseE2EIT.waitForUpdatingPolicies();
+    UserGroupInformation.createProxyUser(userName, 
UserGroupInformation.getCurrentUser())
+        .doAs(
+            (PrivilegedExceptionAction<Void>)
+                () -> {
+                  FileSystem userFileSystem =
+                      FileSystem.get(
+                          new Configuration() {
+                            {
+                              set("fs.defaultFS", defaultBaseLocation());
+                            }
+                          });
+                  Assertions.assertThrows(
+                      Exception.class,
+                      () ->
+                          userFileSystem.listFiles(new 
Path(storageLocation(filenameRole)), false));
+                  Assertions.assertThrows(
+                      Exception.class,
+                      () ->
+                          userFileSystem.mkdirs(
+                              new Path(
+                                  String.format("%s/%s", 
storageLocation(filenameRole), "test4"))));
+                  userFileSystem.close();
+                  return null;
+                });
+
+    catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(schemaName, 
fileset.name()));
+  }
+
+  private void createCatalogAndSchema() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    metalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, metalake.name());
+
+    metalake.createCatalog(
+        catalogName,
+        Catalog.Type.FILESET,
+        provider,
+        "comment",
+        ImmutableMap.of(
+            IMPERSONATION_ENABLE,
+            "true",
+            AUTHORIZATION_PROVIDER,
+            "ranger",
+            RANGER_SERVICE_NAME,
+            RangerITEnv.RANGER_HDFS_REPO_NAME,
+            AuthorizationPropertiesMeta.RANGER_ADMIN_URL,
+            RANGER_ADMIN_URL,
+            RANGER_AUTH_TYPE,
+            RangerContainer.authType,
+            RANGER_USERNAME,
+            RangerContainer.rangerUserName,
+            RANGER_PASSWORD,
+            RangerContainer.rangerPassword));
+
+    catalog = metalake.loadCatalog(catalogName);
+    catalog
+        .asSchemas()
+        .createSchema(schemaName, "comment", ImmutableMap.of("location", 
defaultBaseLocation()));
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(schemaName, loadSchema.name());
+    Assertions.assertNotNull(loadSchema.properties().get("location"));
+  }
+
+  private String defaultBaseLocation() {
+    if (defaultBaseLocation == null) {
+      defaultBaseLocation =
+          String.format(
+              "hdfs://%s:%d/user/hadoop/%s",
+              containerSuite.getHiveRangerContainer().getContainerIpAddress(),
+              HiveContainer.HDFS_DEFAULTFS_PORT,
+              schemaName.toLowerCase());
+    }
+    return defaultBaseLocation;
+  }
+
+  private String storageLocation(String filesetName) {
+    return defaultBaseLocation() + "/" + filesetName;
+  }
+}
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
index cb41e7921..600463fbc 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveE2EIT.java
@@ -67,7 +67,7 @@ public class RangerHiveE2EIT extends RangerBaseE2EIT {
     registerCustomConfigs(configs);
     super.startIntegrationTest();
 
-    RangerITEnv.init();
+    RangerITEnv.init(true);
     RangerITEnv.startHiveRangerContainer();
 
     RANGER_ADMIN_URL =
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveIT.java
index dce93a614..9c45a2109 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerHiveIT.java
@@ -48,10 +48,10 @@ import org.apache.gravitino.authorization.RoleChange;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.authorization.ranger.RangerAuthorizationPlugin;
+import org.apache.gravitino.authorization.ranger.RangerHadoopSQLMetadataObject;
+import 
org.apache.gravitino.authorization.ranger.RangerHadoopSQLSecurableObject;
 import org.apache.gravitino.authorization.ranger.RangerHelper;
-import org.apache.gravitino.authorization.ranger.RangerMetadataObject;
 import org.apache.gravitino.authorization.ranger.RangerPrivileges;
-import org.apache.gravitino.authorization.ranger.RangerSecurableObject;
 import org.apache.gravitino.authorization.ranger.reference.RangerDefines;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.meta.AuditInfo;
@@ -80,7 +80,7 @@ public class RangerHiveIT {
 
   @BeforeAll
   public static void setup() {
-    RangerITEnv.init();
+    RangerITEnv.init(true);
 
     rangerAuthHivePlugin = RangerITEnv.rangerAuthHivePlugin;
     rangerHelper = RangerITEnv.rangerHelper;
@@ -343,7 +343,7 @@ public class RangerHiveIT {
     AuthorizationSecurableObject rangerSecurableObject =
         rangerAuthHivePlugin.generateAuthorizationSecurableObject(
             ImmutableList.of(String.format("%s3", dbName), "tab1"),
-            RangerMetadataObject.Type.TABLE,
+            RangerHadoopSQLMetadataObject.Type.TABLE,
             ImmutableSet.of(
                 new RangerPrivileges.RangerHivePrivilegeImpl(
                     RangerPrivileges.RangerHadoopSQLPrivilege.ALL, 
Privilege.Condition.ALLOW)));
@@ -460,7 +460,7 @@ public class RangerHiveIT {
         Collections.singletonList(policyItem));
   }
 
-  static boolean deleteHivePolicy(RangerSecurableObject rangerSecurableObject) 
{
+  static boolean deleteHivePolicy(RangerHadoopSQLSecurableObject 
rangerSecurableObject) {
     RangerPolicy policy = 
rangerHelper.findManagedPolicy(rangerSecurableObject);
     if (policy != null) {
       try {
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
index 2758d307b..f6b83bb9d 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerITEnv.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.authorization.ranger.integration.test;
 
+import static org.mockito.Mockito.doReturn;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
@@ -30,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.gravitino.authorization.AuthorizationSecurableObject;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.ranger.RangerAuthorizationHDFSPlugin;
 import 
org.apache.gravitino.authorization.ranger.RangerAuthorizationHadoopSQLPlugin;
 import org.apache.gravitino.authorization.ranger.RangerAuthorizationPlugin;
 import org.apache.gravitino.authorization.ranger.RangerHelper;
@@ -47,6 +50,7 @@ import org.apache.ranger.plugin.model.RangerRole;
 import org.apache.ranger.plugin.model.RangerService;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,9 +85,12 @@ public class RangerITEnv {
   // Search filter prefix file path constants
   public static final String SEARCH_FILTER_PATH = SearchFilter.RESOURCE_PREFIX 
+ RESOURCE_PATH;
   public static RangerAuthorizationPlugin rangerAuthHivePlugin;
+  public static RangerAuthorizationPlugin rangerAuthHDFSPlugin;
   protected static RangerHelper rangerHelper;
 
-  public static void init() {
+  protected static RangerHelper rangerHDFSHelper;
+
+  public static void init(boolean allowAnyoneAccessHDFS) {
     containerSuite.startRangerContainer();
     rangerClient = containerSuite.getRangerContainer().rangerClient;
 
@@ -104,6 +111,28 @@ public class RangerITEnv {
                 RangerContainer.rangerPassword,
                 AuthorizationPropertiesMeta.RANGER_SERVICE_NAME,
                 RangerITEnv.RANGER_HIVE_REPO_NAME));
+
+    RangerAuthorizationHDFSPlugin spyRangerAuthorizationHDFSPlugin =
+        Mockito.spy(
+            RangerAuthorizationHDFSPlugin.getInstance(
+                "metalake",
+                ImmutableMap.of(
+                    AuthorizationPropertiesMeta.RANGER_ADMIN_URL,
+                    String.format(
+                        "http://%s:%d";,
+                        
containerSuite.getRangerContainer().getContainerIpAddress(),
+                        RangerContainer.RANGER_SERVER_PORT),
+                    AuthorizationPropertiesMeta.RANGER_AUTH_TYPE,
+                    RangerContainer.authType,
+                    AuthorizationPropertiesMeta.RANGER_USERNAME,
+                    RangerContainer.rangerUserName,
+                    AuthorizationPropertiesMeta.RANGER_PASSWORD,
+                    RangerContainer.rangerPassword,
+                    AuthorizationPropertiesMeta.RANGER_SERVICE_NAME,
+                    RangerITEnv.RANGER_HDFS_REPO_NAME)));
+    
doReturn("/test").when(spyRangerAuthorizationHDFSPlugin).getFileSetPath(Mockito.any());
+    rangerAuthHDFSPlugin = spyRangerAuthorizationHDFSPlugin;
+
     rangerHelper =
         new RangerHelper(
             rangerClient,
@@ -112,12 +141,22 @@ public class RangerITEnv {
             rangerAuthHivePlugin.ownerMappingRule(),
             rangerAuthHivePlugin.policyResourceDefinesRule());
 
+    rangerHDFSHelper =
+        new RangerHelper(
+            rangerClient,
+            RangerContainer.rangerUserName,
+            RangerITEnv.RANGER_HDFS_REPO_NAME,
+            rangerAuthHDFSPlugin.ownerMappingRule(),
+            rangerAuthHDFSPlugin.policyResourceDefinesRule());
+
     if (!initRangerService) {
       synchronized (RangerITEnv.class) {
         // No IP address set, no impact on testing
         createRangerHdfsRepository("", true);
         createRangerHiveRepository("", true);
-        allowAnyoneAccessHDFS();
+        if (allowAnyoneAccessHDFS) {
+          allowAnyoneAccessHDFS();
+        }
         initRangerService = true;
       }
     }
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
index 7b45eda7a..a4fc1253e 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerIcebergE2EIT.java
@@ -71,7 +71,7 @@ public class RangerIcebergE2EIT extends RangerBaseE2EIT {
     registerCustomConfigs(configs);
     super.startIntegrationTest();
 
-    RangerITEnv.init();
+    RangerITEnv.init(true);
     RangerITEnv.startHiveRangerContainer();
 
     RANGER_ADMIN_URL =
diff --git 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerPaimonE2EIT.java
 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerPaimonE2EIT.java
index 7cb600b9d..b2529837e 100644
--- 
a/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerPaimonE2EIT.java
+++ 
b/authorizations/authorization-ranger/src/test/java/org/apache/gravitino/authorization/ranger/integration/test/RangerPaimonE2EIT.java
@@ -70,7 +70,7 @@ public class RangerPaimonE2EIT extends RangerBaseE2EIT {
     registerCustomConfigs(configs);
     super.startIntegrationTest();
 
-    RangerITEnv.init();
+    RangerITEnv.init(true);
     RangerITEnv.startHiveRangerContainer();
 
     RANGER_ADMIN_URL =

Reply via email to