This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new b06b51153 [#5019] feat: (hadoop-catalog): Add a framework to support 
multi-storage in a pluggable manner for fileset catalog (#5020)
b06b51153 is described below

commit b06b51153b02ba5194e3a8afedcd4077ef80ce8e
Author: Qi Yu <y...@datastrato.com>
AuthorDate: Wed Oct 16 18:07:21 2024 +0800

    [#5019] feat: (hadoop-catalog): Add a framework to support multi-storage in 
a pluggable manner for fileset catalog (#5020)
    
    ### What changes were proposed in this pull request?
    
    Add a framework to support multiple storage system within Hadoop catalog
    
    ### Why are the changes needed?
    
    Some users want Gravitino to manage file system like S3 or GCS.
    
    Fix: #5019
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Existing test.
---
 .gitignore                                         |   4 +
 catalogs/catalog-hadoop/build.gradle.kts           |   2 +
 .../catalog/hadoop/HadoopCatalogOperations.java    |  83 ++++++++++------
 .../hadoop/HadoopCatalogPropertiesMetadata.java    |  41 +++++++-
 .../catalog/hadoop/fs/FileSystemProvider.java      |  69 ++++++++++++++
 .../catalog/hadoop/fs/FileSystemUtils.java         | 104 +++++++++++++++++++++
 .../catalog/hadoop/fs/HDFSFileSystemProvider.java  |  54 +++++++++++
 .../catalog/hadoop/fs/LocalFileSystemProvider.java |  53 +++++++++++
 ....gravitino.catalog.hadoop.fs.FileSystemProvider |  21 +++++
 .../hadoop/TestHadoopCatalogOperations.java        |  87 +++++++++++------
 clients/filesystem-hadoop3/build.gradle.kts        |   9 ++
 .../hadoop/GravitinoVirtualFileSystem.java         |  39 +++++++-
 .../GravitinoVirtualFileSystemConfiguration.java   |  10 ++
 docs/hadoop-catalog.md                             |  23 +++--
 14 files changed, 531 insertions(+), 68 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7889cf7a9..eae3d3c95 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,7 @@ include clients/client-python/.gitignore
 **/metastore_db
 **/spark-warehouse
 derby.log
+
+web/node_modules
+web/dist
+web/.next
diff --git a/catalogs/catalog-hadoop/build.gradle.kts 
b/catalogs/catalog-hadoop/build.gradle.kts
index ba60a161d..940289347 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -36,6 +36,8 @@ dependencies {
     exclude(group = "*")
   }
 
+  compileOnly(libs.guava)
+
   implementation(libs.hadoop3.common) {
     exclude("com.sun.jersey")
     exclude("javax.servlet", "servlet-api")
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index da4d0e1a1..8515ea7d2 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
@@ -44,6 +43,8 @@ import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
@@ -71,11 +72,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HadoopCatalogOperations implements CatalogOperations, 
SupportsSchemas, FilesetCatalog {
-
   private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not 
exist";
   private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does 
not exist";
   private static final String SLASH = "/";
-
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopCatalogOperations.class);
 
   private final EntityStore store;
@@ -90,6 +89,10 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
 
   private CatalogInfo catalogInfo;
 
+  private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
+
+  private FileSystemProvider defaultFileSystemProvider;
+
   HadoopCatalogOperations(EntityStore store) {
     this.store = store;
   }
@@ -107,7 +110,9 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
   }
 
   public Configuration getHadoopConf() {
-    return hadoopConf;
+    Configuration configuration = new Configuration();
+    conf.forEach((k, v) -> configuration.set(k.replace(CATALOG_BYPASS_PREFIX, 
""), v));
+    return configuration;
   }
 
   public Map<String, String> getConf() {
@@ -119,26 +124,31 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
     this.propertiesMetadata = propertiesMetadata;
-    // Initialize Hadoop Configuration.
-    this.conf = config;
-    this.hadoopConf = new Configuration();
     this.catalogInfo = info;
-    Map<String, String> bypassConfigs =
-        config.entrySet().stream()
-            .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
-            .collect(
-                Collectors.toMap(
-                    e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()),
-                    Map.Entry::getValue));
-    bypassConfigs.forEach(hadoopConf::set);
+
+    this.conf = config;
+
+    String fileSystemProviders =
+        (String)
+            propertiesMetadata
+                .catalogPropertiesMetadata()
+                .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS);
+    
this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+
+    String defaultFileSystemProviderName =
+        (String)
+            propertiesMetadata
+                .catalogPropertiesMetadata()
+                .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.DEFAULT_FS_PROVIDER);
+    this.defaultFileSystemProvider =
+        FileSystemUtils.getFileSystemProviderByName(
+            fileSystemProvidersMap, defaultFileSystemProviderName);
 
     String catalogLocation =
         (String)
             propertiesMetadata
                 .catalogPropertiesMetadata()
                 .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.LOCATION);
-    conf.forEach(hadoopConf::set);
-
     this.catalogStorageLocation =
         StringUtils.isNotBlank(catalogLocation)
             ? Optional.of(catalogLocation).map(Path::new)
@@ -235,8 +245,9 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
 
     try {
       // formalize the path to avoid path without scheme, uri, authority, etc.
-      filesetPath = formalizePath(filesetPath, hadoopConf);
-      FileSystem fs = filesetPath.getFileSystem(hadoopConf);
+      filesetPath = formalizePath(filesetPath, conf);
+
+      FileSystem fs = getFileSystem(filesetPath, conf);
       if (!fs.exists(filesetPath)) {
         if (!fs.mkdirs(filesetPath)) {
           throw new RuntimeException(
@@ -339,7 +350,7 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
 
       // For managed fileset, we should delete the related files.
       if (filesetEntity.filesetType() == Fileset.Type.MANAGED) {
-        FileSystem fs = filesetPath.getFileSystem(hadoopConf);
+        FileSystem fs = getFileSystem(filesetPath, conf);
         if (fs.exists(filesetPath)) {
           if (!fs.delete(filesetPath, true)) {
             LOG.warn("Failed to delete fileset {} location {}", ident, 
filesetPath);
@@ -459,7 +470,7 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
     Path schemaPath = getSchemaPath(ident.name(), properties);
     if (schemaPath != null) {
       try {
-        FileSystem fs = schemaPath.getFileSystem(hadoopConf);
+        FileSystem fs = getFileSystem(schemaPath, conf);
         if (!fs.exists(schemaPath)) {
           if (!fs.mkdirs(schemaPath)) {
             // Fail the operation when failed to create the schema path.
@@ -577,8 +588,7 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
       if (schemaPath == null) {
         return false;
       }
-
-      FileSystem fs = schemaPath.getFileSystem(hadoopConf);
+      FileSystem fs = getFileSystem(schemaPath, conf);
       // Nothing to delete if the schema path does not exist.
       if (!fs.exists(schemaPath)) {
         return false;
@@ -717,8 +727,8 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
   }
 
   @VisibleForTesting
-  static Path formalizePath(Path path, Configuration configuration) throws 
IOException {
-    FileSystem defaultFs = FileSystem.get(configuration);
+  Path formalizePath(Path path, Map<String, String> configuration) throws 
IOException {
+    FileSystem defaultFs = getFileSystem(path, configuration);
     return path.makeQualified(defaultFs.getUri(), 
defaultFs.getWorkingDirectory());
   }
 
@@ -731,7 +741,7 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
   private boolean checkSingleFile(Fileset fileset) {
     try {
       Path locationPath = new Path(fileset.storageLocation());
-      return 
locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
+      return getFileSystem(locationPath, 
conf).getFileStatus(locationPath).isFile();
     } catch (FileNotFoundException e) {
       // We should always return false here, same with the logic in 
`FileSystem.isFile(Path f)`.
       return false;
@@ -742,4 +752,25 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
           fileset.name());
     }
   }
+
+  FileSystem getFileSystem(Path path, Map<String, String> config) throws 
IOException {
+    if (path == null) {
+      throw new IllegalArgumentException("Path should not be null");
+    }
+
+    String scheme =
+        path.toUri().getScheme() != null
+            ? path.toUri().getScheme()
+            : defaultFileSystemProvider.scheme();
+
+    FileSystemProvider provider = fileSystemProvidersMap.get(scheme);
+    if (provider == null) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Unsupported scheme: %s, path: %s, all supported schemes: %s and 
providers: %s",
+              scheme, path, fileSystemProvidersMap.keySet(), 
fileSystemProvidersMap.values()));
+    }
+
+    return provider.getFileSystem(path, config);
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
index 9a68e2d55..397e13aa4 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
@@ -18,10 +18,13 @@
  */
 package org.apache.gravitino.catalog.hadoop;
 
+import static 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KERBEROS_PROPERTY_ENTRIES;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
-import 
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
@@ -34,6 +37,24 @@ public class HadoopCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetada
   // If not, users have to specify the storage location in the Schema or 
Fileset level.
   public static final String LOCATION = "location";
 
+  /**
+   * The name of {@link FileSystemProvider} to be added to the catalog. Except 
built-in
+   * FileSystemProvider like LocalFileSystemProvider and 
HDFSFileSystemProvider, users can add their
+   * own FileSystemProvider by specifying the provider name here. The value 
can be find {@link
+   * FileSystemProvider#name()}.
+   */
+  public static final String FILESYSTEM_PROVIDERS = "filesystem-providers";
+
+  /**
+   * The default file system provider class name, used to create the default 
file system. If not
+   * specified, the default file system provider will be {@link 
LocalFileSystemProvider#name()}:
+   * 'builtin-local'.
+   */
+  public static final String DEFAULT_FS_PROVIDER = 
"default-filesystem-provider";
+
+  public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
+  public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";
+
   private static final Map<String, PropertyEntry<?>> 
HADOOP_CATALOG_PROPERTY_ENTRIES =
       ImmutableMap.<String, PropertyEntry<?>>builder()
           .put(
@@ -44,8 +65,24 @@ public class HadoopCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetada
                   false /* immutable */,
                   null,
                   false /* hidden */))
+          .put(
+              FILESYSTEM_PROVIDERS,
+              PropertyEntry.stringOptionalPropertyEntry(
+                  FILESYSTEM_PROVIDERS,
+                  "The file system provider names, separated by comma",
+                  false /* immutable */,
+                  null,
+                  false /* hidden */))
+          .put(
+              DEFAULT_FS_PROVIDER,
+              PropertyEntry.stringOptionalPropertyEntry(
+                  DEFAULT_FS_PROVIDER,
+                  "Default file system provider name",
+                  false /* immutable */,
+                  BUILTIN_LOCAL_FS_PROVIDER, // please see 
LocalFileSystemProvider#name()
+                  false /* hidden */))
           // The following two are about authentication.
-          .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
+          .putAll(KERBEROS_PROPERTY_ENTRIES)
           .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
           .build();
 
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
new file mode 100644
index 000000000..5bee821e5
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * FileSystemProvider is an interface for providing FileSystem instances. It 
is used by the
+ * HadoopCatalog to create FileSystem instances for accessing Hadoop 
compatible file systems.
+ */
+public interface FileSystemProvider {
+
+  /**
+   * Get the FileSystem instance according to the configuration map and file 
path.
+   *
+   * <p>Compared to the {@link FileSystem#get(Configuration)} method, this 
method allows the
+   * provider to create a FileSystem instance with a specific configuration 
and do further
+   * initialization if needed.
+   *
+   * <p>For example: 1. We can check the endpoint value validity for 
S3AFileSystem then do further
+   * actions. 2. We can also change some default behavior of the FileSystem 
initialization process
+   * 3. More...
+   *
+   * @param config The configuration for the FileSystem instance.
+   * @param path The path to the file system.
+   * @return The FileSystem instance.
+   * @throws IOException If the FileSystem instance cannot be created.
+   */
+  FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String> 
config)
+      throws IOException;
+
+  /**
+   * Scheme of this FileSystem provider. The value is 'file' for 
LocalFileSystem, 'hdfs' for HDFS,
+   * etc.
+   *
+   * @return The scheme of this FileSystem provider used.
+   */
+  String scheme();
+
+  /**
+   * Name of this FileSystem provider. The value is 'builtin-local' for 
LocalFileSystem,
+   * 'builtin-hdfs' for HDFS, etc.
+   *
+   * @return The name of this FileSystem provider.
+   */
+  String name();
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
new file mode 100644
index 000000000..3a959ff37
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Streams;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FileSystemUtils {
+
+  private FileSystemUtils() {}
+
+  public static Map<String, FileSystemProvider> getFileSystemProviders(String 
fileSystemProviders) {
+    Map<String, FileSystemProvider> resultMap = Maps.newHashMap();
+    ServiceLoader<FileSystemProvider> allFileSystemProviders =
+        ServiceLoader.load(FileSystemProvider.class);
+
+    Set<String> providersInUses =
+        fileSystemProviders != null
+            ? Arrays.stream(fileSystemProviders.split(","))
+                .map(f -> f.trim().toLowerCase(Locale.ROOT))
+                .collect(java.util.stream.Collectors.toSet())
+            : Sets.newHashSet();
+
+    // Add built-in file system providers to the use list automatically.
+    providersInUses.add(BUILTIN_LOCAL_FS_PROVIDER.toLowerCase(Locale.ROOT));
+    providersInUses.add(BUILTIN_HDFS_FS_PROVIDER.toLowerCase(Locale.ROOT));
+
+    // Only get the file system providers that are in the user list and check 
if the scheme is
+    // unique.
+    Streams.stream(allFileSystemProviders.iterator())
+        .filter(
+            fileSystemProvider ->
+                
providersInUses.contains(fileSystemProvider.name().toLowerCase(Locale.ROOT)))
+        .forEach(
+            fileSystemProvider -> {
+              if (resultMap.containsKey(fileSystemProvider.scheme())) {
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "File system provider: '%s' with scheme '%s' already 
exists in the use provider list "
+                            + "Please make sure the file system provider 
scheme is unique.",
+                        fileSystemProvider.getClass().getName(), 
fileSystemProvider.scheme()));
+              }
+              resultMap.put(fileSystemProvider.scheme(), fileSystemProvider);
+            });
+
+    // If not all file system providers in providersInUses was found, throw an 
exception.
+    Set<String> notFoundProviders =
+        Sets.difference(
+                providersInUses,
+                resultMap.values().stream()
+                    .map(p -> p.name().toLowerCase(Locale.ROOT))
+                    .collect(Collectors.toSet()))
+            .immutableCopy();
+    if (!notFoundProviders.isEmpty()) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "File system providers %s not found in the classpath. Please 
make sure the file system "
+                  + "provider is in the classpath.",
+              notFoundProviders));
+    }
+
+    return resultMap;
+  }
+
+  public static FileSystemProvider getFileSystemProviderByName(
+      Map<String, FileSystemProvider> fileSystemProviders, String 
fileSystemProviderName) {
+    return fileSystemProviders.entrySet().stream()
+        .filter(entry -> 
entry.getValue().name().equals(fileSystemProviderName))
+        .map(Map.Entry::getValue)
+        .findFirst()
+        .orElseThrow(
+            () ->
+                new UnsupportedOperationException(
+                    String.format(
+                        "File system provider with name '%s' not found in the 
file system provider list.",
+                        fileSystemProviderName)));
+  }
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
new file mode 100644
index 000000000..7c9ceebdd
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+public class HDFSFileSystemProvider implements FileSystemProvider {
+
+  @Override
+  public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, 
String> config)
+      throws IOException {
+    Configuration configuration = new Configuration();
+    config.forEach(
+        (k, v) -> {
+          configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+        });
+    return DistributedFileSystem.newInstance(path.toUri(), configuration);
+  }
+
+  @Override
+  public String scheme() {
+    return "hdfs";
+  }
+
+  @Override
+  public String name() {
+    return BUILTIN_HDFS_FS_PROVIDER;
+  }
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
new file mode 100644
index 000000000..70e44c76f
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class LocalFileSystemProvider implements FileSystemProvider {
+
+  @Override
+  public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
+    Configuration configuration = new Configuration();
+    config.forEach(
+        (k, v) -> {
+          configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+        });
+
+    return LocalFileSystem.newInstance(path.toUri(), configuration);
+  }
+
+  @Override
+  public String scheme() {
+    return "file";
+  }
+
+  @Override
+  public String name() {
+    return BUILTIN_LOCAL_FS_PROVIDER;
+  }
+}
diff --git 
a/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
 
b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
new file mode 100644
index 000000000..93a84744a
--- /dev/null
+++ 
b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider
+org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider
\ No newline at end of file
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index d32069726..2b89180a8 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -34,7 +34,6 @@ import static 
org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META;
-import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
@@ -50,10 +49,12 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
@@ -65,6 +66,7 @@ import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.PropertiesMetadata;
+import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -74,6 +76,7 @@ import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
 import org.apache.gravitino.storage.relational.service.CatalogMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.utils.NameIdentifierUtil;
@@ -230,18 +233,10 @@ public class TestHadoopCatalogOperations {
 
     CatalogInfo catalogInfo = randomCatalogInfo();
     ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
-    Configuration conf = ops.hadoopConf;
+    Configuration conf = ops.getHadoopConf();
     String value = conf.get("fs.defaultFS");
     Assertions.assertEquals("file:///", value);
 
-    emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", 
"hdfs://localhost:9000");
-    ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
-    Configuration conf1 = ops.hadoopConf;
-    String value1 = conf1.get("fs.defaultFS");
-    Assertions.assertEquals("hdfs://localhost:9000", value1);
-
-    Assertions.assertFalse(ops.catalogStorageLocation.isPresent());
-
     emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/catalog");
     ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
     Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
@@ -677,33 +672,68 @@ public class TestHadoopCatalogOperations {
   }
 
   @Test
-  public void testFormalizePath() throws IOException {
+  public void testFormalizePath() throws IOException, IllegalAccessException {
 
     String[] paths =
-        new String[] {
-          "tmp/catalog",
-          "/tmp/catalog",
-          "file:/tmp/catalog",
-          "file:///tmp/catalog",
-          "hdfs://localhost:9000/tmp/catalog",
-          "s3://bucket/tmp/catalog",
-          "gs://bucket/tmp/catalog"
-        };
+        new String[] {"tmp/catalog", "/tmp/catalog", "file:/tmp/catalog", 
"file:///tmp/catalog"};
 
     String[] expected =
         new String[] {
           "file:" + Paths.get("").toAbsolutePath() + "/tmp/catalog",
           "file:/tmp/catalog",
           "file:/tmp/catalog",
-          "file:/tmp/catalog",
-          "hdfs://localhost:9000/tmp/catalog",
-          "s3://bucket/tmp/catalog",
-          "gs://bucket/tmp/catalog"
+          "file:/tmp/catalog"
+        };
+
+    HasPropertyMetadata hasPropertyMetadata =
+        new HasPropertyMetadata() {
+          @Override
+          public PropertiesMetadata tablePropertiesMetadata() throws 
UnsupportedOperationException {
+            return null;
+          }
+
+          @Override
+          public PropertiesMetadata catalogPropertiesMetadata()
+              throws UnsupportedOperationException {
+            return new PropertiesMetadata() {
+              @Override
+              public Map<String, PropertyEntry<?>> propertyEntries() {
+                return new HadoopCatalogPropertiesMetadata().propertyEntries();
+              }
+            };
+          }
+
+          @Override
+          public PropertiesMetadata schemaPropertiesMetadata()
+              throws UnsupportedOperationException {
+            return null;
+          }
+
+          @Override
+          public PropertiesMetadata filesetPropertiesMetadata()
+              throws UnsupportedOperationException {
+            return null;
+          }
+
+          @Override
+          public PropertiesMetadata topicPropertiesMetadata() throws 
UnsupportedOperationException {
+            return null;
+          }
         };
 
-    for (int i = 0; i < paths.length; i++) {
-      Path actual = HadoopCatalogOperations.formalizePath(new Path(paths[i]), 
new Configuration());
-      Assertions.assertEquals(expected[i], actual.toString());
+    try {
+      FieldUtils.writeField(
+          GravitinoEnv.getInstance(), "entityStore", new 
RelationalEntityStore(), true);
+      try (HadoopCatalogOperations hadoopCatalogOperations = new 
HadoopCatalogOperations()) {
+        Map<String, String> map = ImmutableMap.of("default-filesystem", 
"file:///");
+        hadoopCatalogOperations.initialize(map, null, hasPropertyMetadata);
+        for (int i = 0; i < paths.length; i++) {
+          Path actual = hadoopCatalogOperations.formalizePath(new 
Path(paths[i]), map);
+          Assertions.assertEquals(expected[i], actual.toString());
+        }
+      }
+    } finally {
+      FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", null, 
true);
     }
   }
 
@@ -877,8 +907,11 @@ public class TestHadoopCatalogOperations {
     try (HadoopCatalogOperations mockOps = 
Mockito.mock(HadoopCatalogOperations.class)) {
       mockOps.hadoopConf = new Configuration();
       when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
+      when(mockOps.getConf()).thenReturn(Maps.newHashMap());
       String subPath = "/test/test.parquet";
       when(mockOps.getFileLocation(filesetIdent, 
subPath)).thenCallRealMethod();
+      when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
+          .thenReturn(FileSystem.getLocal(new Configuration()));
       String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
       Assertions.assertEquals(
           String.format("%s%s", mockFileset.storageLocation(), 
subPath.substring(1)), fileLocation);
diff --git a/clients/filesystem-hadoop3/build.gradle.kts 
b/clients/filesystem-hadoop3/build.gradle.kts
index d7905cd3b..aefac5f28 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -26,6 +26,10 @@ plugins {
 dependencies {
   compileOnly(project(":clients:client-java-runtime", configuration = 
"shadow"))
   compileOnly(libs.hadoop3.common)
+  implementation(project(":catalogs:catalog-hadoop")) {
+    exclude(group = "*")
+  }
+
   implementation(libs.caffeine)
 
   testImplementation(project(":api"))
@@ -71,6 +75,11 @@ tasks.build {
   dependsOn("javadoc")
 }
 
+tasks.compileJava {
+  dependsOn(":catalogs:catalog-hadoop:jar")
+  dependsOn(":catalogs:catalog-hadoop:runtimeJars")
+}
+
 tasks.test {
   val skipITs = project.hasProperty("skipITs")
   if (skipITs) {
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index de0eb758e..05e769667 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -18,6 +18,9 @@
  */
 package org.apache.gravitino.filesystem.hadoop;
 
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.GVFS_CONFIG_PREFIX;
+
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Scheduler;
@@ -41,6 +44,8 @@ import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.audit.InternalClientType;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.KerberosTokenProvider;
@@ -81,6 +86,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
   private static final Pattern IDENTIFIER_PATTERN =
       
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
   private static final String SLASH = "/";
+  private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
+  private static final String GRAVITINO_BYPASS_PREFIX = "gravitino.bypass.";
 
   @Override
   public void initialize(URI name, Configuration configuration) throws 
IOException {
@@ -125,6 +132,10 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
 
     initializeClient(configuration);
 
+    // Register the default local and HDFS FileSystemProvider
+    String fileSystemProviders = configuration.get(FS_FILESYSTEM_PROVIDERS);
+    
fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+
     this.workingDirectory = new Path(name);
     this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
 
@@ -351,7 +362,6 @@ public class GravitinoVirtualFileSystem extends FileSystem {
     Preconditions.checkArgument(
         filesetCatalog != null, String.format("Loaded fileset catalog: %s is 
null.", catalogIdent));
 
-    // set the thread local audit info
     Map<String, String> contextMap = Maps.newHashMap();
     contextMap.put(
         FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
@@ -364,7 +374,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
         filesetCatalog.getFileLocation(
             NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()), subPath);
 
-    URI uri = new Path(actualFileLocation).toUri();
+    Path filePath = new Path(actualFileLocation);
+    URI uri = filePath.toUri();
     // we cache the fs for the same scheme, so we can reuse it
     String scheme = uri.getScheme();
     Preconditions.checkArgument(
@@ -374,7 +385,14 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
             scheme,
             str -> {
               try {
-                return FileSystem.newInstance(uri, getConf());
+                Map<String, String> maps = getConfigMap(getConf());
+                FileSystemProvider provider = 
fileSystemProvidersMap.get(scheme);
+                if (provider == null) {
+                  throw new GravitinoRuntimeException(
+                      "Unsupported file system scheme: %s for %s.",
+                      scheme, 
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
+                }
+                return provider.getFileSystem(filePath, maps);
               } catch (IOException ioe) {
                 throw new GravitinoRuntimeException(
                     "Exception occurs when create new FileSystem for actual 
uri: %s, msg: %s",
@@ -385,6 +403,21 @@ public class GravitinoVirtualFileSystem extends FileSystem 
{
     return new FilesetContextPair(new Path(actualFileLocation), fs);
   }
 
+  private Map<String, String> getConfigMap(Configuration configuration) {
+    Map<String, String> maps = Maps.newHashMap();
+    configuration.forEach(
+        entry -> {
+          String key = entry.getKey();
+          if (key.startsWith(GRAVITINO_BYPASS_PREFIX)) {
+            maps.put(key.substring(GRAVITINO_BYPASS_PREFIX.length()), 
entry.getValue());
+          } else if (!key.startsWith(GVFS_CONFIG_PREFIX)) {
+            maps.put(key, entry.getValue());
+          }
+        });
+
+    return maps;
+  }
+
   private String getSubPathFromVirtualPath(NameIdentifier identifier, String 
virtualPathString) {
     return 
virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
         ? virtualPathString.substring(
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index 8076c02c3..cd1ecb92f 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -18,10 +18,13 @@
  */
 package org.apache.gravitino.filesystem.hadoop;
 
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+
 /** Configuration class for Gravitino Virtual File System. */
 class GravitinoVirtualFileSystemConfiguration {
   public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
   public static final String GVFS_SCHEME = "gvfs";
+  public static final String GVFS_CONFIG_PREFIX = "fs.gvfs.";
 
   /** The configuration key for the Gravitino server URI. */
   public static final String FS_GRAVITINO_SERVER_URI_KEY = 
"fs.gravitino.server.uri";
@@ -32,6 +35,13 @@ class GravitinoVirtualFileSystemConfiguration {
   /** The configuration key for the Gravitino client auth type. */
   public static final String FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY = 
"fs.gravitino.client.authType";
 
+  /**
+   * File system provider names configuration key. The value is a comma 
separated list of file
+   * system provider name which is defined in the service loader. Users can 
custom their own file
+   * system by implementing the {@link FileSystemProvider} interface.
+   */
+  public static final String FS_FILESYSTEM_PROVIDERS = 
"fs.gvfs.filesystem.providers";
+
   public static final String SIMPLE_AUTH_TYPE = "simple";
   public static final String OAUTH2_AUTH_TYPE = "oauth2";
   public static final String KERBEROS_AUTH_TYPE = "kerberos";
diff --git a/docs/hadoop-catalog.md b/docs/hadoop-catalog.md
index d6706ff3e..d28e6d93b 100644
--- a/docs/hadoop-catalog.md
+++ b/docs/hadoop-catalog.md
@@ -25,16 +25,19 @@ Hadoop 3. If there's any compatibility issue, please create 
an [issue](https://g
 
 Besides the [common catalog 
properties](./gravitino-server-config.md#gravitino-catalog-properties-configuration),
 the Hadoop catalog has the following properties:
 
-| Property Name                                      | Description             
                                                                       | 
Default Value | Required                                                    | 
Since Version |
-|----------------------------------------------------|------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
-| `location`                                         | The storage location 
managed by Hadoop catalog.                                                | 
(none)        | No                                                          | 
0.5.0         |
-| `authentication.impersonation-enable`              | Whether to enable 
impersonation for the Hadoop catalog.                                        | 
`false`       | No                                                          | 
0.5.1         |
-| `authentication.type`                              | The type of 
authentication for Hadoop catalog, currently we only support `kerberos`, 
`simple`. | `simple`      | No                                                  
        | 0.5.1         |
-| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication                                                   | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
-| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                         | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
-| `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for Hadoop catalog.                                  | 60   
         | No                                                          | 0.5.1  
       |
-| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.     | 60  
          | No                                                          | 0.5.1 
        |
-
+| Property Name                                      | Description             
                                                                                
                                                                                
                                                                                
                                       | Default Value   | Required             
                                       | Since Version    |
+|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|-------------------------------------------------------------|------------------|
+| `location`                                         | The storage location 
managed by Hadoop catalog.                                                      
                                                                                
                                                                                
                                          | (none)          | No                
                                          | 0.5.0            |
+| `filesystem-providers`                             | The names (split by 
comma) of filesystem providers for the Hadoop catalog. Gravitino already 
support built-in `builtin-local`(`local file`) and `builtin-hdfs`(`hdfs`). If 
users want to support more file system and add it to Gravitino, they custom 
more file system by implementing `FileSystemProvider`.  | (none)          | No  
                                                        | 0.7.0-incubating |
+| `default-filesystem-provider`                      | The name default 
filesystem providers of this Hadoop catalog if users do not specify the scheme 
in the URI. Default value is `builtin-local`                                    
                                                                                
                                               | `builtin-local` | No           
                                               | 0.7.0-incubating |
+| `authentication.impersonation-enable`              | Whether to enable 
impersonation for the Hadoop catalog.                                           
                                                                                
                                                                                
                                             | `false`         | No             
                                             | 0.5.1            |
+| `authentication.type`                              | The type of 
authentication for Hadoop catalog, currently we only support `kerberos`, 
`simple`.                                                                       
                                                                                
                                                          | `simple`        | 
No                                                          | 0.5.1            |
+| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication                                                         
                                                                                
                                                                                
                                          | (none)          | required if the 
value of `authentication.type` is Kerberos. | 0.5.1            |
+| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                                
                                                                                
                                                                                
                                         | (none)          | required if the 
value of `authentication.type` is Kerberos. | 0.5.1            |
+| `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for Hadoop catalog.                                         
                                                                                
                                                                                
                                         | 60              | No                 
                                         | 0.5.1            |
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.           
                                                                                
                                                                                
                                          | 60              | No                
                                          | 0.5.1            |
+
+For more about `filesystem-providers`, please refer to 
`HadoopFileSystemProvider` or `LocalFileSystemProvider` in the source code. 
Furthermore, you also need to place the jar of the file system provider into 
the `$GRAVITINO_HOME/catalogs/hadoop/libs` directory if it's not in the 
classpath.
 
 ### Authentication for Hadoop Catalog
 

Reply via email to