xloya commented on code in PR #4320:
URL: https://github.com/apache/gravitino/pull/4320#discussion_r1770707925


##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -672,4 +721,24 @@ static Path formalizePath(Path path, Configuration 
configuration) throws IOExcep
     FileSystem defaultFs = FileSystem.get(configuration);
     return path.makeQualified(defaultFs.getUri(), 
defaultFs.getWorkingDirectory());
   }
+
+  private boolean hasCallerContext() {
+    return CallerContext.CallerContextHolder.get() != null
+        && CallerContext.CallerContextHolder.get().context() != null
+        && !CallerContext.CallerContextHolder.get().context().isEmpty();
+  }
+
+  private boolean checkSingleFile(Fileset fileset) {
+    try {
+      Path locationPath = new Path(fileset.storageLocation());
+      return 
locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
+    } catch (FileNotFoundException e) {
+      // We should always return false here, same with the logic in 
`FileSystem.isFile(Path f)`.
+      return false;
+    } catch (IOException e) {
+      throw new GravitinoRuntimeException(
+          "Exception occurs when checking whether fileset: %s mounts a single 
file, exception: %s",
+          fileset.name(), e.getCause());

Review Comment:
   Done



##########
clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java:
##########
@@ -381,51 +339,66 @@ NameIdentifier extractIdentifier(URI virtualUri) {
     return NameIdentifier.of(metalakeName, matcher.group(1), matcher.group(2), 
matcher.group(3));
   }
 
-  private FilesetContext getFilesetContext(Path virtualPath) {
+  private FilesetContextPair getFilesetContext(Path virtualPath, 
FilesetDataOperation operation) {
     NameIdentifier identifier = extractIdentifier(virtualPath.toUri());
-    Pair<Fileset, FileSystem> pair = filesetCache.get(identifier, 
this::constructNewFilesetPair);
-    Preconditions.checkState(
-        pair != null,
-        "Cannot get the pair of fileset instance and actual file system for 
%s",
-        identifier);
-    Path actualPath = getActualPathByIdentifier(identifier, pair, virtualPath);
-    return FilesetContext.builder()
-        .withIdentifier(identifier)
-        .withFileset(pair.getLeft())
-        .withFileSystem(pair.getRight())
-        .withActualPath(actualPath)
-        .build();
-  }
+    String virtualPathString = virtualPath.toString();
+    String subPath = getSubPathFromVirtualPath(identifier, virtualPathString);
 
-  private Pair<Fileset, FileSystem> constructNewFilesetPair(NameIdentifier 
identifier) {
-    // Always create a new file system instance for the fileset.
-    // Therefore, users cannot bypass gvfs and use `FileSystem.get()` to 
directly obtain the
-    // FileSystem
-    try {
-      Fileset fileset = loadFileset(identifier);
-      URI storageUri = URI.create(fileset.storageLocation());
-      FileSystem actualFileSystem = FileSystem.newInstance(storageUri, 
getConf());
-      Preconditions.checkState(actualFileSystem != null, "Cannot get the 
actual file system");
-      return Pair.of(fileset, actualFileSystem);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot create file system for fileset: %s, exception: %s",
-              identifier, e.getMessage()),
-          e);
-    } catch (RuntimeException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot load fileset: %s from the server. exception: %s",
-              identifier, e.getMessage()));
-    }
+    NameIdentifier catalogIdent = NameIdentifier.of(metalakeName, 
identifier.namespace().level(1));
+    FilesetCatalog filesetCatalog =
+        catalogCache.get(
+            catalogIdent, ident -> 
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+    Preconditions.checkArgument(
+        filesetCatalog != null, String.format("Loaded fileset catalog: %s is 
null.", catalogIdent));
+
+    // set the thread local audit info
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(
+        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+        InternalClientType.HADOOP_GVFS.name());
+    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+
+    String actualFileLocation =
+        filesetCatalog.getFileLocation(
+            NameIdentifier.of(identifier.namespace().level(2), 
identifier.name()), subPath);
+
+    URI uri = new Path(actualFileLocation).toUri();
+    // we cache the fs for the same scheme, so we can reuse it
+    FileSystem fs =
+        internalFileSystemCache.get(
+            uri.getScheme(),

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to