fredia commented on code in PR #25818:
URL: https://github.com/apache/flink/pull/25818#discussion_r1901425924


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -240,143 +221,78 @@ public URI getUri() {
 
     @Override
     public boolean exists(final Path f) throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
-        if (localPathTuple.f0) {
-            return localFS.exists(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = fileMappingManager.realPath(f);
+        if (realPath == null) {
+            return delegateFS.exists(f);
         }
-        return delegateFS.exists(f);
+
+        boolean status = false;
+        if (realPath.isLocal) {
+            status |= localFS.exists(realPath.path);
+            if (!status) {
+                status = delegateFS.exists(f);
+            }
+        } else {
+            status = delegateFS.exists(realPath.path);
+        }
+        return status;
     }
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            return localFS.getFileStatus(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
+            return localFS.getFileStatus(realPath.path);
         }
-        return delegateFS.getFileStatus(path);
+        return delegateFS.getFileStatus(realPath.path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
         Path path = file.getPath();
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
+            FileStatus localFile = localFS.getFileStatus(realPath.path);
             return localFS.getFileBlockLocations(localFile, start, len);
         }
         return delegateFS.getFileBlockLocations(file, start, len);
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        FileStatus[] localFiles = new FileStatus[0];
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            localFiles = localFS.listStatus(localPathTuple.f1);
+        // mapping files
+        List<FileStatus> fileStatuses = new ArrayList<>();
+        List<String> mappingFiles = 
fileMappingManager.listByPrefix(path.toString());

Review Comment:
   Good catch, added a filter for direct child.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to