This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new cff80824d2 [#6680] feat(fileset): support fileset location placeholder 
(#6724)
cff80824d2 is described below

commit cff80824d2db4d817468fd23e2bee9b74aa9a12a
Author: mchades <liminghu...@datastrato.com>
AuthorDate: Wed Mar 26 06:40:48 2025 +0800

    [#6680] feat(fileset): support fileset location placeholder (#6724)
    
    ### What changes were proposed in this pull request?
    
    support fileset location placeholder
    
    ### Why are the changes needed?
    
    Fix: #6680
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, user now can use placeholders in the location definition
    
    ### How was this patch tested?
    
    tests added
---
 .../java/org/apache/gravitino/file/Fileset.java    |  55 ++-
 .../catalog/hadoop/HadoopCatalogOperations.java    | 127 +++++-
 .../hadoop/HadoopFilesetPropertiesMetadata.java    |  29 +-
 .../hadoop/TestHadoopCatalogOperations.java        | 431 +++++++++++++++++++++
 .../hadoop/integration/test/HadoopCatalogIT.java   |  15 +
 .../client-python/gravitino/api/file/fileset.py    |  49 ++-
 docs/hadoop-catalog.md                             |   9 +
 docs/manage-fileset-metadata-using-gravitino.md    | 133 ++++++-
 8 files changed, 811 insertions(+), 37 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/file/Fileset.java 
b/api/src/main/java/org/apache/gravitino/file/Fileset.java
index 6ada5f8c47..48cccf78b3 100644
--- a/api/src/main/java/org/apache/gravitino/file/Fileset.java
+++ b/api/src/main/java/org/apache/gravitino/file/Fileset.java
@@ -42,6 +42,27 @@ import org.apache.gravitino.tag.SupportsTags;
 @Evolving
 public interface Fileset extends Auditable {
 
+  /** The prefix of fileset placeholder property */
+  String LOCATION_PLACEHOLDER_PREFIX = "placeholder-";
+
+  /**
+   * The reserved property name for the catalog name placeholder, when 
creating a fileset, all
+   * placeholders as {{catalog}} will be replaced by the catalog name
+   */
+  String RESERVED_CATALOG_PLACEHOLDER = "placeholder-catalog";
+
+  /**
+   * The reserved property name for the schema name placeholder, when creating 
a fileset, all
+   * placeholders as {{schema}} will be replaced by the schema name
+   */
+  String RESERVED_SCHEMA_PLACEHOLDER = "placeholder-schema";
+
+  /**
+   * The reserved property name for the fileset name placeholder, when 
creating a fileset, all
+   * placeholders as {{fileset}} will be replaced by the fileset name
+   */
+  String RESERVED_FILESET_PLACEHOLDER = "placeholder-fileset";
+
   /** An enum representing the type of the fileset object. */
   enum Type {
 
@@ -77,18 +98,41 @@ public interface Fileset extends Auditable {
    * object, or the one specified in the catalog / schema level if the fileset 
object is created
    * under this catalog / schema.
    *
+   * <p>The storageLocation in each level can contain placeholders, format as 
{{name}}, which will
+   * be replaced by the corresponding fileset property value when the fileset 
object is created. The
+   * placeholder property in the fileset object is formed as 
"placeholder-{{name}}". For example, if
+   * the storageLocation is "file:///path/{{schema}}-{{fileset}}-{{version}}", 
and the fileset
+   * object "catalog1.schema1.fileset1" has the property "placeholder-version" 
set to "v1", then the
+   * storageLocation will be "file:///path/schema1-fileset1-v1".
+   *
    * <p>For managed fileset, the storageLocation can be:
    *
-   * <p>1) The one specified when creating the fileset object.
+   * <p>1) The one specified when creating the fileset object, and the 
placeholders in the
+   * storageLocation will be replaced by the placeholder value specified in 
the fileset properties.
    *
    * <p>2) When catalog property "location" is specified but schema property 
"location" is not
-   * specified, then the storageLocation will be "{catalog 
location}/schemaName/filesetName".
+   * specified, then the storageLocation will be:
+   *
+   * <p>a. "{catalog location}/schemaName/filesetName" if {catalog location} 
does not contain any
+   * placeholder.
+   *
+   * <p>b. "{catalog location}" - placeholders in the {catalog location} will 
be replaced by the
+   * placeholder value specified in the fileset properties.
    *
    * <p>3) When catalog property "location" is not specified but schema 
property "location" is
-   * specified, then the storageLocation will be "{schema 
location}/filesetName".
+   * specified, then the storageLocation will be:
+   *
+   * <p>a. "{schema location}/filesetName" if {schema location} does not 
contain any placeholder.
+   *
+   * <p>b. "{schema location}" - placeholders in the {schema location} will be 
replaced by the
+   * placeholder value specified in the fileset properties.
    *
    * <p>4) When both catalog property "location" and schema property 
"location" are specified, then
-   * the storageLocation will be "{schema location}/filesetName".
+   * the storageLocation will be:
+   *
+   * <p>a. "{schema location}/filesetName" if {schema location} does not 
contain any placeholder.
+   *
+   * <p>b. "{schema location}" - placeholders in the {schema location} will be 
replaced by the
    *
    * <p>5) When both catalog property "location" and schema property 
"location" are not specified,
    * and storageLocation specified when creating the fileset object is null, 
this situation is
@@ -96,7 +140,8 @@ public interface Fileset extends Auditable {
    *
    * <p>For external fileset, the storageLocation can be:
    *
-   * <p>1) The one specified when creating the fileset object.
+   * <p>1) The one specified when creating the fileset object, and the 
placeholders in the
+   * storageLocation will be replaced by the placeholder value specified in 
the fileset properties.
    *
    * @return The storage location of the fileset object.
    */
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 6c032414be..11af5be1ab 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -19,6 +19,10 @@
 package org.apache.gravitino.catalog.hadoop;
 
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+import static org.apache.gravitino.file.Fileset.LOCATION_PLACEHOLDER_PREFIX;
+import static org.apache.gravitino.file.Fileset.RESERVED_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.RESERVED_FILESET_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.RESERVED_SCHEMA_PLACEHOLDER;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -28,11 +32,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
@@ -83,6 +90,9 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
   private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not 
exist";
   private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does 
not exist";
   private static final String SLASH = "/";
+
+  // location placeholder pattern format: {{placeholder}}
+  private static final Pattern LOCATION_PLACEHOLDER_PATTERN = 
Pattern.compile("\\{\\{(.*?)\\}\\}");
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopCatalogOperations.class);
 
   private final EntityStore store;
@@ -161,6 +171,8 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
             propertiesMetadata
                 .catalogPropertiesMetadata()
                 .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.LOCATION);
+    checkPlaceholderValue(catalogLocation);
+
     this.catalogStorageLocation =
         StringUtils.isNotBlank(catalogLocation)
             ? Optional.of(catalogLocation)
@@ -250,12 +262,11 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
               + ident
               + " when it's catalog and schema location are not set");
     }
+    checkPlaceholderValue(storageLocation);
 
-    // The specified storageLocation will take precedence over the calculated 
one.
     Path filesetPath =
-        StringUtils.isNotBlank(storageLocation)
-            ? new Path(storageLocation)
-            : new Path(schemaPath, ident.name());
+        caculateFilesetPath(
+            schemaIdent.name(), ident.name(), storageLocation, schemaPath, 
properties);
 
     try {
       // formalize the path to avoid path without scheme, uri, authority, etc.
@@ -469,7 +480,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     }
 
     Path schemaPath = getSchemaPath(ident.name(), properties);
-    if (schemaPath != null) {
+    if (schemaPath != null && !containsPlaceholder(schemaPath.toString())) {
       try {
         FileSystem fs = getFileSystem(schemaPath, conf);
         if (!fs.exists(schemaPath)) {
@@ -707,11 +718,115 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
             propertiesMetadata
                 .schemaPropertiesMetadata()
                 .getOrDefault(properties, 
HadoopSchemaPropertiesMetadata.LOCATION);
+    checkPlaceholderValue(schemaLocation);
 
     return Optional.ofNullable(schemaLocation)
         .map(s -> s.endsWith(SLASH) ? s : s + SLASH)
         .map(Path::new)
-        .orElse(catalogStorageLocation.map(p -> new Path(p, 
name)).orElse(null));
+        .orElse(
+            catalogStorageLocation
+                .map(p -> containsPlaceholder(p.toString()) ? p : new Path(p, 
name))
+                .orElse(null));
+  }
+
+  /**
+   * Check whether the placeholder in the location is valid. Throw an 
exception if the location
+   * contains a placeholder with an empty value.
+   *
+   * @param location the location to check.
+   */
+  private void checkPlaceholderValue(String location) {
+    if (StringUtils.isBlank(location)) {
+      return;
+    }
+
+    Matcher matcher = LOCATION_PLACEHOLDER_PATTERN.matcher(location);
+    while (matcher.find()) {
+      String placeholder = matcher.group(1);
+      if (placeholder.isEmpty()) {
+        throw new IllegalArgumentException(
+            "Placeholder in location should not be empty, location: " + 
location);
+      }
+    }
+  }
+
+  /**
+   * Check whether the location contains a placeholder. The placeholder is in 
the format of
+   * {{name}}.
+   *
+   * @param location the location to check.
+   * @return true if the location contains a placeholder, false otherwise.
+   */
+  private boolean containsPlaceholder(String location) {
+    return StringUtils.isNotBlank(location)
+        && LOCATION_PLACEHOLDER_PATTERN.matcher(location).find();
+  }
+
+  private Path caculateFilesetPath(
+      String schemaName,
+      String filesetName,
+      String storageLocation,
+      Path schemaPath,
+      Map<String, String> properties) {
+    // The specified storageLocation will take precedence
+    // case 1: storageLocation is not empty and does not contain placeholder
+    if (StringUtils.isNotBlank(storageLocation) && 
!containsPlaceholder(storageLocation)) {
+      return new Path(storageLocation);
+    }
+
+    Map<String, String> placeholderMapping = new HashMap<>();
+    properties.forEach(
+        (k, v) -> {
+          if (k.startsWith(LOCATION_PLACEHOLDER_PREFIX)) {
+            
placeholderMapping.put(k.substring(LOCATION_PLACEHOLDER_PREFIX.length()), v);
+          }
+        });
+    placeholderMapping.put(
+        
RESERVED_CATALOG_PLACEHOLDER.substring(LOCATION_PLACEHOLDER_PREFIX.length()),
+        catalogInfo.name());
+    placeholderMapping.put(
+        
RESERVED_SCHEMA_PLACEHOLDER.substring(LOCATION_PLACEHOLDER_PREFIX.length()), 
schemaName);
+    placeholderMapping.put(
+        
RESERVED_FILESET_PLACEHOLDER.substring(LOCATION_PLACEHOLDER_PREFIX.length()), 
filesetName);
+
+    // case 2: storageLocation is not empty and contains placeholder
+    if (StringUtils.isNotBlank(storageLocation)) {
+      return new Path(replacePlaceholders(storageLocation, 
placeholderMapping));
+    }
+
+    // case 3: storageLocation is empty and schemaPath does not contain 
placeholder
+    if (!containsPlaceholder(schemaPath.toString())) {
+      return new Path(schemaPath, filesetName);
+    }
+
+    // case 4: storageLocation is empty and schemaPath contains placeholder
+    return new Path(replacePlaceholders(schemaPath.toString(), 
placeholderMapping));
+  }
+
+  private String replacePlaceholders(String location, Map<String, String> 
placeholderMapping) {
+    Matcher matcher = LOCATION_PLACEHOLDER_PATTERN.matcher(location);
+    StringBuilder result = new StringBuilder();
+    int currentPosition = 0;
+    while (matcher.find()) {
+      // Append the text before the match
+      result.append(location, currentPosition, matcher.start());
+
+      // Append the replacement
+      String key = matcher.group(1);
+      String replacement = placeholderMapping.get(key);
+      if (replacement == null) {
+        throw new IllegalArgumentException("No value found for placeholder: " 
+ key);
+      }
+      result.append(replacement);
+
+      currentPosition = matcher.end();
+    }
+
+    // Append the rest of the text
+    if (currentPosition < location.length()) {
+      result.append(location, currentPosition, location.length());
+    }
+    return result.toString();
   }
 
   @VisibleForTesting
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 84862dd094..1bdf224671 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -18,6 +18,10 @@
  */
 package org.apache.gravitino.catalog.hadoop;
 
+import static org.apache.gravitino.file.Fileset.RESERVED_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.RESERVED_FILESET_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.RESERVED_SCHEMA_PLACEHOLDER;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
@@ -31,9 +35,28 @@ public class HadoopFilesetPropertiesMetadata extends 
BasePropertiesMetadata {
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
     ImmutableMap.Builder<String, PropertyEntry<?>> builder = 
ImmutableMap.builder();
-    builder.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
-    builder.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES);
-    builder.putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES);
+    builder
+        .put(
+            RESERVED_CATALOG_PLACEHOLDER,
+            PropertyEntry.stringReservedPropertyEntry(
+                RESERVED_CATALOG_PLACEHOLDER,
+                "The placeholder will be replaced to catalog name in the 
location",
+                true /* hidden */))
+        .put(
+            RESERVED_SCHEMA_PLACEHOLDER,
+            PropertyEntry.stringReservedPropertyEntry(
+                RESERVED_SCHEMA_PLACEHOLDER,
+                "The placeholder will be replaced to schema name in the 
location",
+                true /* hidden */))
+        .put(
+            RESERVED_FILESET_PLACEHOLDER,
+            PropertyEntry.stringReservedPropertyEntry(
+                RESERVED_FILESET_PLACEHOLDER,
+                "The placeholder will be replaced to fileset name in the 
location",
+                true /* hidden */))
+        .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
+        .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
+        .putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES);
     return builder.build();
   }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 1cdb7cca0d..717a76cf9e 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -167,6 +167,19 @@ public class TestHadoopCatalogOperations {
         Namespace.of(metalakeName));
   }
 
+  private static CatalogInfo randomCatalogInfo(
+      String metalakeName, String catalogName, Map<String, String> props) {
+    return new CatalogInfo(
+        idGenerator.nextId(),
+        catalogName,
+        CatalogInfo.Type.FILESET,
+        "hadoop",
+        "comment1",
+        props,
+        null,
+        Namespace.of(metalakeName));
+  }
+
   @BeforeAll
   public static void setUp() {
     Config config = Mockito.mock(Config.class);
@@ -250,6 +263,23 @@ public class TestHadoopCatalogOperations {
     Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
     Path expectedPath = new Path("file:///tmp/catalog");
     Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+
+    // test placeholder in location
+    emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/{{catalog}}");
+    ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
+    Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
+    expectedPath = new Path("file:///tmp/{{catalog}}");
+    Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+
+    // test illegal placeholder in location
+    emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/{{}}");
+    Throwable exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> ops.initialize(emptyProps, catalogInfo, 
HADOOP_PROPERTIES_METADATA));
+    Assertions.assertTrue(
+        exception.getMessage().contains("Placeholder in location should not be 
empty"),
+        exception.getMessage());
   }
 
   @Test
@@ -295,6 +325,16 @@ public class TestHadoopCatalogOperations {
     Assertions.assertTrue(fs.exists(schemaPath));
     Assertions.assertTrue(fs.getFileStatus(schemaPath).isDirectory());
     Assertions.assertTrue(fs.listStatus(schemaPath).length == 0);
+
+    // test placeholder in catalog location
+    name = "schema12_1";
+    catalogPath = TEST_ROOT_PATH + "/" + "{{catalog}}-{{schema}}";
+    schema = createSchema(name, comment, catalogPath, null);
+    Assertions.assertEquals(name, schema.name());
+
+    schemaPath = new Path(catalogPath, name);
+    fs = schemaPath.getFileSystem(new Configuration());
+    Assertions.assertFalse(fs.exists(schemaPath));
   }
 
   @Test
@@ -311,6 +351,27 @@ public class TestHadoopCatalogOperations {
     Assertions.assertTrue(fs.exists(schemaPath1));
     Assertions.assertTrue(fs.getFileStatus(schemaPath1).isDirectory());
     Assertions.assertTrue(fs.listStatus(schemaPath1).length == 0);
+
+    // test placeholder in schema location
+    name = "schema13_1";
+    schemaPath = catalogPath + "/" + "{{schema}}";
+    schema = createSchema(name, comment, null, schemaPath);
+    Assertions.assertEquals(name, schema.name());
+
+    schemaPath1 = new Path(schemaPath);
+    fs = schemaPath1.getFileSystem(new Configuration());
+    Assertions.assertFalse(fs.exists(schemaPath1));
+
+    // test illegal placeholder in schema location
+    String schemaName1 = "schema13_2";
+    String schemaPath2 = catalogPath + "/" + "{{}}";
+    Throwable exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> createSchema(schemaName1, comment, null, schemaPath2));
+    Assertions.assertTrue(
+        exception.getMessage().contains("Placeholder in location should not be 
empty"),
+        exception.getMessage());
   }
 
   @Test
@@ -330,6 +391,19 @@ public class TestHadoopCatalogOperations {
 
     Assertions.assertFalse(fs.exists(new Path(catalogPath)));
     Assertions.assertFalse(fs.exists(new Path(catalogPath, name)));
+
+    // test placeholder in location
+    name = "schema14_1";
+    catalogPath = TEST_ROOT_PATH + "/" + "{{catalog}}";
+    schemaPath = TEST_ROOT_PATH + "/" + "{{schema}}";
+    schema = createSchema(name, comment, catalogPath, schemaPath);
+    Assertions.assertEquals(name, schema.name());
+
+    schemaPath1 = new Path(schemaPath);
+    fs = schemaPath1.getFileSystem(new Configuration());
+    Assertions.assertFalse(fs.exists(schemaPath1));
+    Assertions.assertFalse(fs.exists(new Path(catalogPath)));
+    Assertions.assertFalse(fs.exists(new Path(catalogPath, name)));
   }
 
   @Test
@@ -534,6 +608,8 @@ public class TestHadoopCatalogOperations {
       } else {
         Assertions.assertTrue(fs.exists(expectedPath));
       }
+      // clean expected path if exist
+      fs.delete(expectedPath, true);
 
       // Test drop non-existent fileset
       Assertions.assertFalse(ops.dropFileset(filesetIdent), "fileset should be 
non-existent");
@@ -980,6 +1056,191 @@ public class TestHadoopCatalogOperations {
     }
   }
 
+  @Test
+  public void testLocationPlaceholdersWithException() throws IOException {
+    // test empty placeholder value
+    String schemaName = "schema24";
+    createSchema(schemaName, null, null, null);
+    String name = "fileset24";
+    String storageLocation = TEST_ROOT_PATH + "/{{fileset}}/{{user}}/{{id}}";
+
+    Exception exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                createFileset(
+                    name,
+                    schemaName,
+                    "comment",
+                    Fileset.Type.MANAGED,
+                    null,
+                    storageLocation,
+                    ImmutableMap.of("user", "tom")));
+    Assertions.assertEquals("No value found for placeholder: user", 
exception.getMessage());
+
+    // test placeholder value not found
+    String storageLocation1 = TEST_ROOT_PATH + "/{{fileset}}/{{}}";
+    Exception exception1 =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                createFileset(
+                    name,
+                    schemaName,
+                    "comment",
+                    Fileset.Type.MANAGED,
+                    null,
+                    storageLocation1,
+                    ImmutableMap.of("user", "tom")));
+    Assertions.assertEquals(
+        "Placeholder in location should not be empty, location: " + 
storageLocation1,
+        exception1.getMessage());
+  }
+
+  @ParameterizedTest
+  @MethodSource("locationWithPlaceholdersArguments")
+  public void testPlaceholdersInLocation(
+      String name,
+      Fileset.Type type,
+      String catalogPath,
+      String schemaPath,
+      String storageLocation,
+      Map<String, String> placeholders,
+      String expect)
+      throws IOException {
+    String schemaName = "s1_" + name;
+    String comment = "comment_s1";
+    Map<String, String> catalogProps = Maps.newHashMap();
+    if (catalogPath != null) {
+      catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+    }
+
+    NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogProps, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+      if (!ops.schemaExists(schemaIdent)) {
+        createSchema(schemaName, comment, catalogPath, schemaPath);
+      }
+      Fileset fileset =
+          createFileset(
+              name, schemaName, "comment", type, catalogPath, storageLocation, 
placeholders);
+
+      Assertions.assertEquals(name, fileset.name());
+      Assertions.assertEquals(type, fileset.type());
+      Assertions.assertEquals("comment", fileset.comment());
+      Assertions.assertEquals(expect, fileset.storageLocation());
+      placeholders.forEach((k, v) -> 
Assertions.assertEquals(fileset.properties().get(k), v));
+
+      // Test load
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
+      Fileset loadedFileset = ops.loadFileset(filesetIdent);
+      Assertions.assertEquals(name, loadedFileset.name());
+      Assertions.assertEquals(type, loadedFileset.type());
+      Assertions.assertEquals("comment", loadedFileset.comment());
+      Assertions.assertEquals(expect, loadedFileset.storageLocation());
+
+      // Test drop
+      ops.dropFileset(filesetIdent);
+      Path expectedPath = new Path(expect);
+      FileSystem fs = expectedPath.getFileSystem(new Configuration());
+      if (type == Fileset.Type.MANAGED) {
+        Assertions.assertFalse(fs.exists(expectedPath));
+      } else {
+        Assertions.assertTrue(fs.exists(expectedPath));
+      }
+      // clean expected path if exist
+      fs.delete(expectedPath, true);
+
+      // Test drop non-existent fileset
+      Assertions.assertFalse(ops.dropFileset(filesetIdent), "fileset should be 
non-existent");
+    }
+  }
+
+  private static Stream<Arguments> locationWithPlaceholdersArguments() {
+    return Stream.of(
+        // placeholders in catalog location
+        Arguments.of(
+            "fileset41",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            null,
+            null,
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset41-fileset41/workspace/tom"),
+        // placeholders in schema location
+        Arguments.of(
+            "fileset42",
+            Fileset.Type.MANAGED,
+            null,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            null,
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset42-fileset42/workspace/tom"),
+        // placeholders in schema location
+        Arguments.of(
+            "fileset43",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/{{catalog}}",
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            null,
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset43-fileset43/workspace/tom"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset44",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/{{catalog}}",
+            TEST_ROOT_PATH + "/{{schema}}",
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset44-fileset44/workspace/tom"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset45",
+            Fileset.Type.MANAGED,
+            null,
+            null,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset45-fileset45/workspace/tom"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset46",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/{{catalog}}",
+            null,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "tom"),
+            TEST_ROOT_PATH + "/c1-s1_fileset46-fileset46/workspace/tom"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset47",
+            Fileset.Type.EXTERNAL,
+            TEST_ROOT_PATH + "/{{catalog}}",
+            TEST_ROOT_PATH + "/{{schema}}",
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "jack"),
+            TEST_ROOT_PATH + "/c1-s1_fileset47-fileset47/workspace/jack"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset48",
+            Fileset.Type.EXTERNAL,
+            TEST_ROOT_PATH + "/{{catalog}}",
+            null,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "jack"),
+            TEST_ROOT_PATH + "/c1-s1_fileset48-fileset48/workspace/jack"),
+        // placeholders in storage location
+        Arguments.of(
+            "fileset49",
+            Fileset.Type.EXTERNAL,
+            null,
+            null,
+            TEST_ROOT_PATH + 
"/{{catalog}}-{{schema}}-{{fileset}}/workspace/{{user}}",
+            ImmutableMap.of("placeholder-user", "jack"),
+            TEST_ROOT_PATH + "/c1-s1_fileset49-fileset49/workspace/jack"));
+  }
+
   private static Stream<Arguments> locationArguments() {
     return Stream.of(
         // Honor the catalog location
@@ -990,6 +1251,14 @@ public class TestHadoopCatalogOperations {
             null,
             null,
             TEST_ROOT_PATH + "/catalog21/s1_fileset11/fileset11"),
+        Arguments.of(
+            // honor the catalog location with placeholder
+            "fileset11",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/catalog21/{{schema}}/{{fileset}}",
+            null,
+            null,
+            TEST_ROOT_PATH + "/catalog21/s1_fileset11/fileset11"),
         Arguments.of(
             // honor the schema location
             "fileset12",
@@ -998,6 +1267,14 @@ public class TestHadoopCatalogOperations {
             TEST_ROOT_PATH + "/s1_fileset12",
             null,
             TEST_ROOT_PATH + "/s1_fileset12/fileset12"),
+        Arguments.of(
+            // honor the schema location with placeholder
+            "fileset12",
+            Fileset.Type.MANAGED,
+            null,
+            TEST_ROOT_PATH + "/{{schema}}/{{fileset}}",
+            null,
+            TEST_ROOT_PATH + "/s1_fileset12/fileset12"),
         Arguments.of(
             // honor the schema location
             "fileset13",
@@ -1006,6 +1283,14 @@ public class TestHadoopCatalogOperations {
             TEST_ROOT_PATH + "/s1_fileset13",
             null,
             TEST_ROOT_PATH + "/s1_fileset13/fileset13"),
+        Arguments.of(
+            // honor the schema location with placeholder
+            "fileset13",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/catalog22",
+            TEST_ROOT_PATH + "/{{schema}}/{{fileset}}",
+            null,
+            TEST_ROOT_PATH + "/s1_fileset13/fileset13"),
         Arguments.of(
             // honor the storage location
             "fileset14",
@@ -1014,6 +1299,14 @@ public class TestHadoopCatalogOperations {
             TEST_ROOT_PATH + "/s1_fileset14",
             TEST_ROOT_PATH + "/fileset14",
             TEST_ROOT_PATH + "/fileset14"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset14",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/catalog23",
+            TEST_ROOT_PATH + "/{{schema}}",
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset14"),
         Arguments.of(
             // honor the storage location
             "fileset15",
@@ -1022,6 +1315,14 @@ public class TestHadoopCatalogOperations {
             null,
             TEST_ROOT_PATH + "/fileset15",
             TEST_ROOT_PATH + "/fileset15"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset15",
+            Fileset.Type.MANAGED,
+            null,
+            null,
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset15"),
         Arguments.of(
             // honor the storage location
             "fileset16",
@@ -1030,6 +1331,14 @@ public class TestHadoopCatalogOperations {
             null,
             TEST_ROOT_PATH + "/fileset16",
             TEST_ROOT_PATH + "/fileset16"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset16",
+            Fileset.Type.MANAGED,
+            TEST_ROOT_PATH + "/catalog24",
+            null,
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset16"),
         Arguments.of(
             // honor the storage location
             "fileset17",
@@ -1038,6 +1347,14 @@ public class TestHadoopCatalogOperations {
             TEST_ROOT_PATH + "/s1_fileset17",
             TEST_ROOT_PATH + "/fileset17",
             TEST_ROOT_PATH + "/fileset17"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset17",
+            Fileset.Type.EXTERNAL,
+            TEST_ROOT_PATH + "/catalog25",
+            TEST_ROOT_PATH + "/s1_fileset17",
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset17"),
         Arguments.of(
             // honor the storage location
             "fileset18",
@@ -1046,6 +1363,14 @@ public class TestHadoopCatalogOperations {
             TEST_ROOT_PATH + "/s1_fileset18",
             TEST_ROOT_PATH + "/fileset18",
             TEST_ROOT_PATH + "/fileset18"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset18",
+            Fileset.Type.EXTERNAL,
+            null,
+            TEST_ROOT_PATH + "/s1_fileset18",
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset18"),
         Arguments.of(
             // honor the storage location
             "fileset19",
@@ -1054,6 +1379,14 @@ public class TestHadoopCatalogOperations {
             null,
             TEST_ROOT_PATH + "/fileset19",
             TEST_ROOT_PATH + "/fileset19"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset19",
+            Fileset.Type.EXTERNAL,
+            null,
+            null,
+            TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset19"),
         // Honor the catalog location
         Arguments.of(
             "fileset101",
@@ -1062,6 +1395,14 @@ public class TestHadoopCatalogOperations {
             null,
             null,
             TEST_ROOT_PATH + "/catalog201/s1_fileset101/fileset101"),
+        Arguments.of(
+            // Honor the catalog location with placeholder
+            "fileset101",
+            Fileset.Type.MANAGED,
+            UNFORMALIZED_TEST_ROOT_PATH + "/catalog201/{{schema}}/{{fileset}}",
+            null,
+            null,
+            TEST_ROOT_PATH + "/catalog201/s1_fileset101/fileset101"),
         Arguments.of(
             // honor the schema location
             "fileset102",
@@ -1070,6 +1411,14 @@ public class TestHadoopCatalogOperations {
             UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset102",
             null,
             TEST_ROOT_PATH + "/s1_fileset102/fileset102"),
+        Arguments.of(
+            // honor the schema location with placeholder
+            "fileset102",
+            Fileset.Type.MANAGED,
+            null,
+            UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset102/{{fileset}}",
+            null,
+            TEST_ROOT_PATH + "/s1_fileset102/fileset102"),
         Arguments.of(
             // honor the schema location
             "fileset103",
@@ -1078,6 +1427,14 @@ public class TestHadoopCatalogOperations {
             UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset103",
             null,
             TEST_ROOT_PATH + "/s1_fileset103/fileset103"),
+        Arguments.of(
+            // honor the schema location with placeholder
+            "fileset103",
+            Fileset.Type.MANAGED,
+            UNFORMALIZED_TEST_ROOT_PATH + "/catalog202",
+            UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset103/{{fileset}}",
+            null,
+            TEST_ROOT_PATH + "/s1_fileset103/fileset103"),
         Arguments.of(
             // honor the storage location
             "fileset104",
@@ -1086,6 +1443,14 @@ public class TestHadoopCatalogOperations {
             UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset104",
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset104",
             TEST_ROOT_PATH + "/fileset104"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset104",
+            Fileset.Type.MANAGED,
+            UNFORMALIZED_TEST_ROOT_PATH + "/catalog203",
+            UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset104",
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset104"),
         Arguments.of(
             // honor the storage location
             "fileset105",
@@ -1094,6 +1459,14 @@ public class TestHadoopCatalogOperations {
             null,
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset105",
             TEST_ROOT_PATH + "/fileset105"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset105",
+            Fileset.Type.MANAGED,
+            null,
+            null,
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset105"),
         Arguments.of(
             // honor the storage location
             "fileset106",
@@ -1102,6 +1475,14 @@ public class TestHadoopCatalogOperations {
             null,
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset106",
             TEST_ROOT_PATH + "/fileset106"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset106",
+            Fileset.Type.MANAGED,
+            UNFORMALIZED_TEST_ROOT_PATH + "/catalog204",
+            null,
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset106"),
         Arguments.of(
             // honor the storage location
             "fileset107",
@@ -1110,6 +1491,14 @@ public class TestHadoopCatalogOperations {
             UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset107",
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset107",
             TEST_ROOT_PATH + "/fileset107"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset107",
+            Fileset.Type.EXTERNAL,
+            UNFORMALIZED_TEST_ROOT_PATH + "/catalog205",
+            UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset107",
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset107"),
         Arguments.of(
             // honor the storage location
             "fileset108",
@@ -1118,6 +1507,14 @@ public class TestHadoopCatalogOperations {
             UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset108",
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset108",
             TEST_ROOT_PATH + "/fileset108"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset108",
+            Fileset.Type.EXTERNAL,
+            null,
+            UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset108",
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
+            TEST_ROOT_PATH + "/fileset108"),
         Arguments.of(
             // honor the storage location
             "fileset109",
@@ -1125,6 +1522,14 @@ public class TestHadoopCatalogOperations {
             null,
             null,
             UNFORMALIZED_TEST_ROOT_PATH + "/fileset109",
+            TEST_ROOT_PATH + "/fileset109"),
+        Arguments.of(
+            // honor the storage location with placeholder
+            "fileset109",
+            Fileset.Type.EXTERNAL,
+            null,
+            null,
+            UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}",
             TEST_ROOT_PATH + "/fileset109"));
   }
 
@@ -1260,4 +1665,30 @@ public class TestHadoopCatalogOperations {
       return ops.createFileset(filesetIdent, comment, type, storageLocation, 
filesetProps);
     }
   }
+
+  private Fileset createFileset(
+      String name,
+      String schemaName,
+      String comment,
+      Fileset.Type type,
+      String catalogPath,
+      String storageLocation,
+      Map<String, String> filesetProps)
+      throws IOException {
+    Map<String, String> props = Maps.newHashMap();
+    if (catalogPath != null) {
+      props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+    }
+
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(props, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
+      filesetProps = filesetProps == null ? Maps.newHashMap() : filesetProps;
+      StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
+      filesetProps = 
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, filesetProps));
+
+      return ops.createFileset(filesetIdent, comment, type, storageLocation, 
filesetProps);
+    }
+  }
 }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 25952bb4ab..6c438506a5 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -225,6 +225,21 @@ public class HadoopCatalogIT extends BaseIT {
         "storage location should be created");
     Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(), 
"properties should be empty");
 
+    // create fileset with placeholder in storage location
+    String filesetName4 = "test_create_fileset_with_placeholder";
+    String storageLocation4 = defaultBaseLocation() + "/{{fileset}}";
+    String expectedStorageLocation4 = defaultBaseLocation() + "/" + 
filesetName4;
+    Assertions.assertFalse(
+        fileSystem.exists(new Path(expectedStorageLocation4)),
+        "storage location should not exists");
+    Fileset fileset4 = createFileset(filesetName4, "comment", MANAGED, 
storageLocation4, null);
+    assertFilesetExists(filesetName4);
+    Assertions.assertNotNull(fileset4, "fileset should be created");
+    Assertions.assertEquals("comment", fileset4.comment());
+    Assertions.assertEquals(MANAGED, fileset4.type());
+    Assertions.assertEquals(expectedStorageLocation4, 
fileset4.storageLocation());
+    Assertions.assertEquals(0, fileset4.properties().size(), "properties 
should be empty");
+
     // create fileset with null fileset name
     Assertions.assertThrows(
         IllegalNameIdentifierException.class,
diff --git a/clients/client-python/gravitino/api/file/fileset.py 
b/clients/client-python/gravitino/api/file/fileset.py
index a0fb7a8447..25f0f4d4f9 100644
--- a/clients/client-python/gravitino/api/file/fileset.py
+++ b/clients/client-python/gravitino/api/file/fileset.py
@@ -71,35 +71,52 @@ class Fileset(Auditable):
 
     @abstractmethod
     def storage_location(self) -> str:
-        """Get the storage location of the file or directory path that is 
managed by this fileset object.
+        """Get the storage location of the file or directory path managed by 
this fileset object.
 
-        The returned storageLocation can either be the one specified when 
creating the fileset
-        object, or the one specified in the catalog / schema level if the 
fileset object is created
-        under this catalog / schema.
+        The returned storageLocation can be either the one specified when 
creating the fileset object,
+        or the one specified at the catalog/schema level if the fileset object 
is created under this
+        catalog/schema.
+
+        The storageLocation at each level can contain placeholders, formatted 
as {{name}}, which will
+        be replaced by the corresponding fileset property value when the 
fileset object is created.
+        The placeholder property in the fileset object is formed as 
"placeholder-{{name}}". For example,
+        if the storageLocation is 
"file:///path/{{schema}}-{{fileset}}-{{version}}", and the fileset
+        object "catalog1.schema1.fileset1" has the property 
"placeholder-version" set to "v1",
+        then the storageLocation will be "file:///path/schema1-fileset1-v1".
 
         For managed fileset, the storageLocation can be:
 
-        1) The one specified when creating the fileset object.
+        1) The one specified when creating the fileset object, and the 
placeholders in the
+           storageLocation will be replaced by the placeholder value specified 
in the fileset properties.
 
-        2) When catalog property "location" is specified but schema property 
"location" is not
-        specified, then the storageLocation will be "{catalog 
location}/schemaName/filesetName".
+        2) When catalog property "location" is specified but schema property 
"location" is not specified,
+           then the storageLocation will be:
+            a. "{catalog location}/schemaName/filesetName" - if {catalog 
location} has no placeholders
+            b. "{catalog location}" - placeholders in {catalog location} will 
be replaced by values
+               specified in fileset properties
 
-        3) When catalog property "location" is not specified but schema 
property "location" is
-        specified, then the storageLocation will be "{schema 
location}/filesetName".
+        3) When catalog property "location" is not specified but schema 
property "location" is specified,
+           then the storageLocation will be:
+            a. "{schema location}/filesetName" - if {schema location} has no 
placeholders
+            b. "{schema location}" - placeholders in {schema location} will be 
replaced by values
+               specified in fileset properties
 
-        4) When both catalog property "location" and schema property 
"location" are specified, then
-        the storageLocation will be "{schema location}/filesetName".
+        4) When both catalog property "location" and schema property 
"location" are specified,
+           then the storageLocation will be:
+            a. "{schema location}/filesetName" - if {schema location} has no 
placeholders
+            b. "{schema location}" - placeholders in {schema location} will be 
replaced by values
+               specified in fileset properties
 
         5) When both catalog property "location" and schema property 
"location" are not specified,
-        and storageLocation specified when creating the fileset object is 
null, this situation is
-        illegal.
+           and storageLocation specified when creating the fileset object is 
null, this situation
+           is illegal.
 
         For external fileset, the storageLocation can be:
-
-        1) The one specified when creating the fileset object.
+        1) The one specified when creating the fileset object, and the 
placeholders in the
+           storageLocation will be replaced by the placeholder value specified 
in the fileset properties.
 
         Returns:
-             The storage location of the fileset object.
+            str: The storage location of the fileset object.
         """
         pass
 
diff --git a/docs/hadoop-catalog.md b/docs/hadoop-catalog.md
index 0bf480851f..3b382235f8 100644
--- a/docs/hadoop-catalog.md
+++ b/docs/hadoop-catalog.md
@@ -129,6 +129,15 @@ Refer to [Schema 
operation](./manage-fileset-metadata-using-gravitino.md#schema-
 | `authentication.kerberos.principal`   | The principal of the Kerberos 
authentication for the fileset.                                          | The 
parent(schema) value | No       | 0.6.0-incubating |
 | `authentication.kerberos.keytab-uri`  | The URI of The keytab for the 
Kerberos authentication for the fileset.                                 | The 
parent(schema) value | No       | 0.6.0-incubating |
 | `credential-providers`                | The credential provider types, 
separated by comma.                                                     | 
(none)                   | No       | 0.8.0-incubating |
+| `placeholder-`                        | Properties that start with 
`placeholder-` are used to replace placeholders in the location.            | 
(none)                   | No       | 0.9.0-incubating |
+
+Some properties are reserved and cannot be set by users:
+
+| Property name         | Description                           | Default 
value               | Since Version    |
+|-----------------------|---------------------------------------|-----------------------------|------------------|
+| `placeholder-catalog` | The placeholder for the catalog name. | catalog name 
of the fileset | 0.9.0-incubating |
+| `placeholder-schema`  | The placeholder for the schema name.  | schema name 
of the fileset  | 0.9.0-incubating |
+| `placeholder-fileset` | The placeholder for the fileset name. | fileset name 
               | 0.9.0-incubating |
 
 Credential providers can be specified in several places, as listed below. 
Gravitino checks the `credential-providers` setting in the following order of 
precedence:
 
diff --git a/docs/manage-fileset-metadata-using-gravitino.md 
b/docs/manage-fileset-metadata-using-gravitino.md
index 90550da14b..cce54513fa 100644
--- a/docs/manage-fileset-metadata-using-gravitino.md
+++ b/docs/manage-fileset-metadata-using-gravitino.md
@@ -16,7 +16,7 @@ filesets to manage non-tabular data like training datasets 
and other raw data.
 Typically, a fileset is mapped to a directory on a file system like HDFS, S3, 
ADLS, GCS, etc.
 With the fileset managed by Gravitino, the non-tabular data can be managed as 
assets together with
 tabular data in Gravitino in a unified way. The following operations will use 
HDFS as an example, for other
-HCFS like S3, OSS, GCS, etc, please refer to the corresponding operations 
[hadoop-with-s3](./hadoop-catalog-with-s3.md), 
[hadoop-with-oss](./hadoop-catalog-with-oss.md), 
[hadoop-with-gcs](./hadoop-catalog-with-gcs.md) and 
+HCFS like S3, OSS, GCS, etc., please refer to the corresponding operations 
[hadoop-with-s3](./hadoop-catalog-with-s3.md), 
[hadoop-with-oss](./hadoop-catalog-with-oss.md), 
[hadoop-with-gcs](./hadoop-catalog-with-gcs.md) and 
 [hadoop-with-adls](./hadoop-catalog-with-adls.md).
 
 After a fileset is created, users can easily access, manage the 
files/directories through
@@ -320,24 +320,143 @@ Currently, Gravitino supports two **types** of filesets:
 The `storageLocation` is the physical location of the fileset. Users can 
specify this location
 when creating a fileset, or follow the rules of the catalog/schema location if 
not specified.
 
+The `storageLocation` in each level can contain **placeholders**, format as 
`{{name}}`, which will
+be replaced by the corresponding fileset property value when the fileset 
object is created. The
+placeholder property in the fileset object is formed as 
"placeholder-{{name}}". For example, if
+the `storageLocation` is `file:///tmp/{{schema}}-{{fileset}}-{{verion}}`, and 
the fileset object 
+named "catalog1.schema1.fileset1" contains the properties 
`placeholder-version=v1`, 
+the actual `storageLocation` will be `file:///tmp/schema1-fileset1-v1`.
+
+The following is an example of creating a fileset with placeholders in the 
`storageLocation`:
+
+<Tabs groupId="language" queryString>
+<TabItem value="shell" label="Shell">
+
+```shell
+# create a calota first
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "test_catalog",
+  "type": "FILESET",
+  "comment": "comment",
+  "provider": "hadoop",
+  "properties": {
+    "location": "file:///{{catalog}}/{{schema}}/workspace_{{project}}/{{user}}"
+  }
+}' http://localhost:8090/api/metalakes/metalake/catalogs
+
+# create a schema under the catalog
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "test_schema",
+  "comment": "comment",
+  "properties": {}
+}' http://localhost:8090/api/metalakes/metalake/catalogs/test_catalog/schemas
+
+# create a fileset by placeholders
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "example_fileset",
+  "comment": "This is an example fileset",
+  "type": "MANAGED",
+  "properties": {
+    "placeholder-project": "test_project",
+    "placeholder-user": "test_user"
+  }
+}' 
http://localhost:8090/api/metalakes/metalake/catalogs/test_catalog/schemas/test_schema/filesets
+
+# the actual storage location of the fileset will be: 
file:///test_catalog/test_schema/workspace_test_project/test_user
+```
+
+</TabItem>
+<TabItem value="java" label="Java">
+
+```java
+GravitinoClient gravitinoClient = GravitinoClient
+    .builder("http://localhost:8090";)
+    .withMetalake("metalake")
+    .build();
+// create a catalog first
+Catalog catalog = gravitinoClient.createCatalog(
+    "test_catalog",
+    Type.FILESET,
+    "hadoop", // provider
+    "comment",
+    ImmutableMap.of("location", 
"file:///{{catalog}}/{{schema}}/workspace_{{project}}/{{user}}"));
+FilesetCatalog filesetCatalog = catalog.asFilesetCatalog();
+
+// create a schema under the catalog
+filesetCatalog.createSchema("test_schema", "comment", null);
+
+// create a fileset by placeholders
+filesetCatalog.createFileset(
+  NameIdentifier.of("test_schema", "example_fileset"),
+  "This is an example fileset",
+  Fileset.Type.MANAGED,
+  null,
+  ImmutableMap.of("placeholder-project", "test_project", "placeholder-user", 
"test_user")
+);
+
+// the actual storage location of the fileset will be: 
file:///test_catalog/test_schema/workspace_test_project/test_user
+```
+
+</TabItem>
+<TabItem value="python" label="Python">
+
+```python
+gravitino_client: GravitinoClient = 
GravitinoClient(uri="http://localhost:8090";, metalake_name="metalake")
+
+# create a catalog first
+catalog: Catalog = gravitino_client.create_catalog(name="test_catalog",
+                                                   
catalog_type=Catalog.Type.FILESET,
+                                                   provider="hadoop",
+                                                   comment="comment",
+                                                   properties={"location": 
"file:///{{catalog}}/{{schema}}/workspace_{{project}}/{{user}}"})
+
+# create a schema under the catalog
+catalog.as_schemas().create_schema(name="test_schema", comment="comment", 
properties={})
+
+# create a fileset by placeholders
+catalog.as_fileset_catalog().create_fileset(ident=NameIdentifier.of("test_schema",
 "example_fileset"),
+                                            type=Fileset.Type.MANAGED,
+                                            comment="This is an example 
fileset",
+                                            storage_location=None,
+                                            properties={"placeholder-project": 
"test_project", "placeholder-user": "test_user"})
+
+# the actual storage location of the fileset will be: 
file:///test_catalog/test_schema/workspace_test_project/test_user
+```
+
+</TabItem>
+</Tabs>
+
 The value of `storageLocation` depends on the configuration settings of the 
catalog:
 - If this is a local fileset catalog, the `storageLocation` should be in the 
format of `file:///path/to/fileset`.
 - If this is a HDFS fileset catalog, the `storageLocation` should be in the 
format of `hdfs://namenode:port/path/to/fileset`.
 
 For a `MANAGED` fileset, the storage location is:
 
-1. The one specified by the user during the fileset creation.
-2. When the catalog property `location` is specified but the schema property 
`location` isn't specified,
-   the storage location is `catalog location/schema name/fileset name`.
+1. The one specified by the user during the fileset creation, and the 
placeholder will be replaced by the
+   corresponding fileset property value.
+2. When the catalog property `location` is specified but the schema property 
`location` isn't specified, the storage location is:
+   1. `catalog location/schema name/fileset name` if `catalog location` does 
not contain any placeholder. 
+   2. `catalog location` - placeholders in the catalog location will be 
replaced by the corresponding fileset property value.
+
 3. When the catalog property `location` isn't specified but the schema 
property `location` is specified,
-   the storage location is `schema location/fileset name`.
+   the storage location is:
+   1. `schema location/fileset name` if `schema location` does not contain any 
placeholder.
+   2. `schema location` - placeholders in the schema location will be replaced 
by the corresponding fileset property value.
+   
 4. When both the catalog property `location` and the schema property 
`location` are specified, the storage
-   location is `schema location/fileset name`.
+   location is:
+   1. `schema location/fileset name` if `schema location` does not contain any 
placeholder.
+   2. `schema location` - placeholders in the schema location will be replaced 
by the corresponding fileset property value.
+
 5. When both the catalog property `location` and schema property `location` 
isn't specified, the user
    should specify the `storageLocation` in the fileset creation.
 
 For `EXTERNAL` fileset, users should specify `storageLocation` during the 
fileset creation,
-otherwise, Gravitino will throw an exception.
+otherwise, Gravitino will throw an exception. If the `storageLocation` 
contains placeholders, the
+placeholder will be replaced by the corresponding fileset property value.
 
 ### Alter a fileset
 

Reply via email to