This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c7d64fd99 [feat](params-refactor) restructure parameters for FS V2 
integration (#51177)
98c7d64fd99 is described below

commit 98c7d64fd9950d1be1de59272774fa118a25e806
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Jun 5 10:55:09 2025 +0800

    [feat](params-refactor) restructure parameters for FS V2 integration 
(#51177)
    
    Issue Number: #50238
    
    This PR integrates the new file system (FS) with the Catalog module,
    continuing the previous effort where we migrated functionalities like
    export and backup to use the new FS implementation.
    
    ### Main Changes:
    
    Catalog Integration with New FS:
    
    Updated the Catalog-related logic to work with the new file system
    abstraction, ensuring consistent usage across the system.
    
    Code Sync from FS V1 to V2:
    
    Migrated relevant code from FS V1 to FS V2 to maintain functional parity
    and ensure compatibility.
    
    ### Test Enhancements:
    
    Added unit and integration tests for the new FS usage within the Catalog
    to ensure correctness and stability.
---
 .../org/apache/doris/common/util/LocationPath.java |   4 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |   2 +-
 .../org/apache/doris/datasource/hive/AcidUtil.java |   4 +-
 .../doris/datasource/hive/HMSExternalCatalog.java  |   6 +-
 .../doris/datasource/hive/HMSExternalTable.java    |   2 +-
 .../doris/datasource/hive/HMSTransaction.java      |  12 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  16 +-
 .../org/apache/doris/datasource/hive/HiveUtil.java |   4 +-
 .../doris/datasource/hive/source/HiveScanNode.java |   2 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   2 +-
 .../doris/fs/remote/SwitchingFileSystem.java       |  14 +-
 .../DirectoryLister.java}                          |  25 +--
 .../org/apache/doris/fsv2/FileSystemCache.java     | 117 +++++++++++
 .../FileSystemDirectoryLister.java}                |  29 +--
 .../org/apache/doris/fsv2/FileSystemFactory.java   |  17 ++
 .../apache/doris/fsv2/FileSystemIOException.java   |  65 ++++++
 .../FileSystemProvider.java}                       |  20 +-
 .../apache/doris/fsv2/FileSystemProviderImpl.java  |  43 ++++
 .../java/org/apache/doris/fsv2/FileSystemType.java |  49 +++++
 .../java/org/apache/doris/fsv2/FileSystemUtil.java |  70 +++++++
 .../org/apache/doris/fsv2/LocalDfsFileSystem.java  | 199 +++++++++++++++++++
 .../doris/fsv2/RemoteFileRemoteIterator.java       |  47 +++++
 .../RemoteFiles.java}                              |  21 +-
 .../RemoteIterator.java}                           |  23 +--
 .../SimpleRemoteIterator.java}                     |  31 +--
 .../fsv2/TransactionDirectoryListingCacheKey.java  |  64 ++++++
 .../TransactionScopeCachingDirectoryLister.java    | 219 +++++++++++++++++++++
 ...nsactionScopeCachingDirectoryListerFactory.java |  59 ++++++
 .../{fs => fsv2}/remote/SwitchingFileSystem.java   |   8 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   6 +-
 .../doris/transaction/HiveTransactionManager.java  |   2 +-
 .../transaction/TransactionManagerFactory.java     |   2 +-
 .../apache/doris/common/util/LocationPathTest.java |  10 +-
 .../apache/doris/datasource/hive/HiveAcidTest.java |   2 +-
 .../doris/datasource/hive/HmsCommitTest.java       |   8 +-
 .../doris/fs/remote/RemoteFileSystemTest.java      | 158 ---------------
 .../apache/doris/fsv2/obj/S3ObjStorageTest.java    | 209 ++++++++++++++++++++
 .../doris/fsv2/remote/RemoteFileSystemTest.java    | 172 ++++++++++++++++
 38 files changed, 1450 insertions(+), 293 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 798e13d321f..9374f2ccca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -26,7 +26,7 @@ import 
org.apache.doris.datasource.property.constants.CosProperties;
 import org.apache.doris.datasource.property.constants.ObsProperties;
 import org.apache.doris.datasource.property.constants.OssProperties;
 import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.doris.fs.FileSystemType;
+import org.apache.doris.fsv2.FileSystemType;
 import org.apache.doris.thrift.TFileType;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -453,7 +453,7 @@ public class LocationPath {
             case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib 
to access oss.
             case VIEWFS:
             case GFS:
-                fsType = FileSystemType.DFS;
+                fsType = FileSystemType.HDFS;
                 break;
             case JFS:
                 fsType = FileSystemType.JFS;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 990c825927e..3999b30d6f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -37,7 +37,7 @@ import org.apache.doris.datasource.metacache.MetaCache;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.paimon.PaimonMetadataCache;
 import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
-import org.apache.doris.fs.FileSystemCache;
+import org.apache.doris.fsv2.FileSystemCache;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 
 import com.github.benmanes.caffeine.cache.CacheLoader;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java
index 915a1a410d1..412e5c41d77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java
@@ -21,8 +21,8 @@ import org.apache.doris.backup.Status;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fsv2.FileSystem;
+import org.apache.doris.fsv2.remote.RemoteFile;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 88a310d5f1d..9fa2183c48a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -37,9 +37,9 @@ import 
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.fs.FileSystemProvider;
-import org.apache.doris.fs.FileSystemProviderImpl;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.fsv2.FileSystemProvider;
+import org.apache.doris.fsv2.FileSystemProviderImpl;
+import org.apache.doris.fsv2.remote.dfs.DFSFileSystem;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index c913a2faf0d..3c4dc962d69 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -49,7 +49,7 @@ import org.apache.doris.datasource.mvcc.MvccTable;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.systable.SupportedSysTables;
 import org.apache.doris.datasource.systable.SysTable;
-import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fsv2.FileSystemDirectoryLister;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index dce7cc7cdd7..501f8f967ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -27,12 +27,12 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.datasource.statistics.CommonStatistics;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.FileSystemProvider;
-import org.apache.doris.fs.FileSystemUtil;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.S3FileSystem;
-import org.apache.doris.fs.remote.SwitchingFileSystem;
+import org.apache.doris.fsv2.FileSystem;
+import org.apache.doris.fsv2.FileSystemProvider;
+import org.apache.doris.fsv2.FileSystemUtil;
+import org.apache.doris.fsv2.remote.RemoteFile;
+import org.apache.doris.fsv2.remote.S3FileSystem;
+import org.apache.doris.fsv2.remote.SwitchingFileSystem;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TFileType;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index f7e5cf2bc5f..a57914277b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -40,14 +40,14 @@ import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.fs.DirectoryLister;
-import org.apache.doris.fs.FileSystemCache;
-import org.apache.doris.fs.FileSystemDirectoryLister;
-import org.apache.doris.fs.FileSystemIOException;
-import org.apache.doris.fs.RemoteIterator;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.fsv2.DirectoryLister;
+import org.apache.doris.fsv2.FileSystemCache;
+import org.apache.doris.fsv2.FileSystemDirectoryLister;
+import org.apache.doris.fsv2.FileSystemIOException;
+import org.apache.doris.fsv2.RemoteIterator;
+import org.apache.doris.fsv2.remote.RemoteFile;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
+import org.apache.doris.fsv2.remote.dfs.DFSFileSystem;
 import org.apache.doris.metric.GaugeMetric;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.MetricLabel;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index ac7dcadbc26..ef1890be255 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -22,8 +22,8 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.statistics.CommonStatistics;
-import org.apache.doris.fs.remote.BrokerFileSystem;
-import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fsv2.remote.BrokerFileSystem;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.qe.ConnectContext;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 09a031ce325..782dd890c86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -43,7 +43,7 @@ import org.apache.doris.datasource.hive.HiveProperties;
 import org.apache.doris.datasource.hive.HiveTransaction;
 import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
 import org.apache.doris.datasource.mvcc.MvccUtil;
-import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fsv2.DirectoryLister;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 330a92bfa1f..1b8dd982e81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -36,7 +36,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.datasource.mvcc.MvccUtil;
-import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fsv2.DirectoryLister;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
index ab7c91d693a..7b57f4d76d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
@@ -18,10 +18,8 @@
 package org.apache.doris.fs.remote;
 
 import org.apache.doris.backup.Status;
-import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.FileSystemCache;
 
 import java.util.List;
 import java.util.Map;
@@ -36,7 +34,7 @@ public class SwitchingFileSystem implements FileSystem {
     private final Map<String, String> properties;
 
     public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String 
bindBrokerName,
-            Map<String, String> properties) {
+                               Map<String, String> properties) {
         this.extMetaCacheMgr = extMetaCacheMgr;
         this.bindBrokerName = bindBrokerName;
         this.properties = properties;
@@ -123,10 +121,16 @@ public class SwitchingFileSystem implements FileSystem {
     }
 
     public FileSystem fileSystem(String location) {
-        return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
+        // todo: This method is currently unused.
+        // LocationPath has already been adapted to the new V2 logic.
+        // We’re keeping this code commented out for now, but it will be fully 
removed once
+        // V2 is finalized and fully adopted.
+        /* return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
                 new FileSystemCache.FileSystemCacheKey(
                         LocationPath.getFSIdentity(location, properties,
-                                bindBrokerName), properties, bindBrokerName));
+                                bindBrokerName), properties, 
bindBrokerName));*/
+        //
+        return null;
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/DirectoryLister.java
similarity index 53%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fsv2/DirectoryLister.java
index b8898d9b279..7440d15166e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/DirectoryLister.java
@@ -14,23 +14,16 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java
+// and modified by Doris
 
-package org.apache.doris.transaction;
+package org.apache.doris.fsv2;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fsv2.remote.RemoteFile;
 
-import java.util.concurrent.Executor;
-
-public class TransactionManagerFactory {
-
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
-    }
-
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
-    }
+public interface DirectoryLister {
+    RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, 
TableIf table, String location)
+            throws FileSystemIOException;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemCache.java
new file mode 100644
index 00000000000..578f76b8275
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemCache.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public class FileSystemCache {
+
+    private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> 
fileSystemCache;
+
+    public FileSystemCache() {
+        // no need to set refreshAfterWrite, because the FileSystem is created 
once and never changed
+        CacheFactory fsCacheFactory = new CacheFactory(
+                OptionalLong.of(86400L),
+                OptionalLong.empty(),
+                Config.max_remote_file_system_cache_num,
+                false,
+                null);
+        fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem);
+    }
+
+    private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) throws 
UserException {
+        return FileSystemFactory.get(key.type, key.getFsProperties(), 
key.bindBrokerName);
+    }
+
+    public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
+        return fileSystemCache.get(key);
+    }
+
+    public static class FileSystemCacheKey {
+        private final FileSystemType type;
+        // eg: hdfs://nameservices1
+        private final String fsIdent;
+        private final Map<String, String> properties;
+        private final String bindBrokerName;
+        // only for creating new file system
+        private final Configuration conf;
+
+        public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+                                  Map<String, String> properties,
+                                  String bindBrokerName,
+                                  Configuration conf) {
+            this.type = fs.first;
+            this.fsIdent = fs.second;
+            this.properties = properties;
+            this.bindBrokerName = bindBrokerName;
+            this.conf = conf;
+        }
+
+        public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+                                  Map<String, String> properties, String 
bindBrokerName) {
+            this(fs, properties, bindBrokerName, null);
+        }
+
+        public Map<String, String> getFsProperties() {
+            if (conf == null) {
+                return properties;
+            }
+            Map<String, String> result = new HashMap<>();
+            conf.iterator().forEachRemaining(e -> result.put(e.getKey(), 
e.getValue()));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileSystemCacheKey)) {
+                return false;
+            }
+            FileSystemCacheKey o = (FileSystemCacheKey) obj;
+            boolean equalsWithoutBroker = type.equals(o.type)
+                    && fsIdent.equals(o.fsIdent)
+                    && properties.equals(o.properties);
+            if (bindBrokerName == null) {
+                return equalsWithoutBroker && o.bindBrokerName == null;
+            }
+            return equalsWithoutBroker && 
bindBrokerName.equals(o.bindBrokerName);
+        }
+
+        @Override
+        public int hashCode() {
+            if (bindBrokerName == null) {
+                return Objects.hash(properties, fsIdent, type);
+            }
+            return Objects.hash(properties, fsIdent, type, bindBrokerName);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemDirectoryLister.java
similarity index 52%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemDirectoryLister.java
index b8898d9b279..6fe1b110783 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemDirectoryLister.java
@@ -15,22 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
+package org.apache.doris.fsv2;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fsv2.remote.RemoteFile;
 
-import java.util.concurrent.Executor;
+import java.util.ArrayList;
+import java.util.List;
 
-public class TransactionManagerFactory {
-
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
-    }
-
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
+public class FileSystemDirectoryLister implements DirectoryLister {
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        List<RemoteFile> result = new ArrayList<>();
+        Status status = fs.listFiles(location, recursive, result);
+        if (!status.ok()) {
+            throw new FileSystemIOException(status.getErrCode(), 
status.getErrMsg());
+        }
+        return new RemoteFileRemoteIterator(result);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemFactory.java
index 319188c9109..162d440f2d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fsv2.remote.BrokerFileSystem;
 import org.apache.doris.fsv2.remote.RemoteFileSystem;
 
+import java.util.List;
 import java.util.Map;
 
 public class FileSystemFactory {
@@ -52,6 +53,22 @@ public class FileSystemFactory {
         return new BrokerFileSystem(name, properties);
     }
 
+    public static RemoteFileSystem get(FileSystemType fileSystemType, 
Map<String, String> properties,
+                                       String bindBrokerName)
+            throws UserException {
+        if (fileSystemType == FileSystemType.BROKER) {
+            return new BrokerFileSystem(bindBrokerName, properties);
+        }
+        List<StorageProperties> storagePropertiesList = 
StorageProperties.createAll(properties);
+
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if 
(storageProperties.getStorageName().equalsIgnoreCase(fileSystemType.name())) {
+                return StorageTypeMapper.create(storageProperties);
+            }
+        }
+        throw new RuntimeException("Unsupported file system type: " + 
fileSystemType);
+    }
+
     public static RemoteFileSystem get(BrokerDesc brokerDesc) {
         if (null != brokerDesc.getStorageProperties()) {
             return get(brokerDesc.getStorageProperties());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemIOException.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemIOException.java
new file mode 100644
index 00000000000..5e1e569b76a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemIOException.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.backup.Status.ErrCode;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+public class FileSystemIOException extends IOException {
+
+    @Nullable
+    private ErrCode errCode;
+
+    public FileSystemIOException(ErrCode errCode, String message) {
+        super(message);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(ErrCode errCode, String message, Throwable 
cause) {
+        super(message, cause);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(String message) {
+        super(message);
+        this.errCode = null;
+    }
+
+    public FileSystemIOException(String message, Throwable cause) {
+        super(message, cause);
+        this.errCode = null;
+    }
+
+    public Optional<ErrCode> getErrorCode() {
+        return Optional.ofNullable(errCode);
+    }
+
+    @Override
+    public String getMessage() {
+        if (errCode != null) {
+            return String.format("[%s]: %s",
+                    errCode,
+                    super.getMessage());
+        } else {
+            return super.getMessage();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProvider.java
similarity index 53%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProvider.java
index b8898d9b279..4fc5ac316f0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProvider.java
@@ -15,22 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
+package org.apache.doris.fsv2;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.datasource.SessionContext;
 
-import java.util.concurrent.Executor;
-
-public class TransactionManagerFactory {
-
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
-    }
-
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
-    }
+public interface FileSystemProvider {
+    FileSystem get(SessionContext ctx);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProviderImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProviderImpl.java
new file mode 100644
index 00000000000..f664012c6ac
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemProviderImpl.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.fsv2.remote.SwitchingFileSystem;
+
+import java.util.Map;
+
+public class FileSystemProviderImpl implements FileSystemProvider {
+    private ExternalMetaCacheMgr extMetaCacheMgr;
+    private String bindBrokerName;
+
+    private Map<String, String> properties;
+
+    public FileSystemProviderImpl(ExternalMetaCacheMgr extMetaCacheMgr, String 
bindBrokerName,
+                                  Map<String, String> properties) {
+        this.extMetaCacheMgr = extMetaCacheMgr;
+        this.bindBrokerName = bindBrokerName;
+        this.properties = properties;
+    }
+
+    @Override
+    public FileSystem get(SessionContext ctx) {
+        return new SwitchingFileSystem(extMetaCacheMgr, bindBrokerName, 
properties);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemType.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemType.java
new file mode 100644
index 00000000000..f586da232c8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemType.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+// TODO: [FileSystemType Unification]
+// There are currently multiple definitions of file system types across the 
codebase, including but not limited to:
+// 1. Backend module (e.g., FileSystemBackendType)
+// 2. Location/path parsing logic (e.g., LocationType or string-based tags)
+// 3. This enum: FileSystemType (used in the SPI/plugin layer)
+//
+// Problem:
+// - File system type definitions are scattered across different modules with 
inconsistent naming and granularity
+// - Adding a new type requires changes in multiple places, increasing risk of 
bugs and maintenance overhead
+// - Difficult to maintain and error-prone
+//
+// Refactoring Goal:
+// - Consolidate file system type definitions into a single source of truth
+// - Clearly define the semantics and usage of each type (e.g., remote vs 
local, object storage vs file system)
+// - All modules should reference the unified definition to avoid duplication 
and hardcoded strings
+//
+// Suggested Approach:
+// - Create a centralized `FsType` enum/class as the canonical definition
+// - Provide mapping or adapter methods where needed (e.g., map LocationType 
to FsType)
+// - Gradually deprecate other definitions and annotate them with @Deprecated, 
including migration instructions
+//
+public enum FileSystemType {
+    S3,
+    HDFS,
+    OFS,
+    JFS,
+    BROKER,
+    FILE,
+    AZURE
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemUtil.java
new file mode 100644
index 00000000000..a3cbf6369fb
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemUtil.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.backup.Status;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FileSystemUtil {
+
+    public static void asyncRenameFiles(FileSystem fs,
+                                        Executor executor,
+                                        List<CompletableFuture<?>> 
renameFileFutures,
+                                        AtomicBoolean cancelled,
+                                        String origFilePath,
+                                        String destFilePath,
+                                        List<String> fileNames) {
+        for (String fileName : fileNames) {
+            Path source = new Path(origFilePath, fileName);
+            Path target = new Path(destFilePath, fileName);
+            renameFileFutures.add(CompletableFuture.runAsync(() -> {
+                if (cancelled.get()) {
+                    return;
+                }
+                Status status = fs.rename(source.toString(), 
target.toString());
+                if (!status.ok()) {
+                    throw new RuntimeException(status.getErrMsg());
+                }
+            }, executor));
+        }
+    }
+
+    public static void asyncRenameDir(FileSystem fs,
+                                      Executor executor,
+                                      List<CompletableFuture<?>> 
renameFileFutures,
+                                      AtomicBoolean cancelled,
+                                      String origFilePath,
+                                      String destFilePath,
+                                      Runnable runWhenPathNotExist) {
+        renameFileFutures.add(CompletableFuture.runAsync(() -> {
+            if (cancelled.get()) {
+                return;
+            }
+            Status status = fs.renameDir(origFilePath, destFilePath, 
runWhenPathNotExist);
+            if (!status.ok()) {
+                throw new RuntimeException(status.getErrMsg());
+            }
+        }, executor));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/LocalDfsFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/LocalDfsFileSystem.java
new file mode 100644
index 00000000000..4b2a12e8597
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/LocalDfsFileSystem.java
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.fsv2.remote.RemoteFile;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class LocalDfsFileSystem implements FileSystem {
+
+    public LocalFileSystem fs = LocalFileSystem.getLocal(new Configuration());
+
+    public LocalDfsFileSystem() throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return null;
+    }
+
+    @Override
+    public Status directoryExists(String dir) {
+        return exists(dir);
+    }
+
+    @Override
+    public Status exists(String remotePath) {
+        boolean exists = false;
+        try {
+            exists = fs.exists(new Path(remotePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (exists) {
+            return Status.OK;
+        } else {
+            return new Status(Status.ErrCode.NOT_FOUND, "");
+        }
+    }
+
+    @Override
+    public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
+        return null;
+    }
+
+    @Override
+    public Status upload(String localPath, String remotePath) {
+        return null;
+    }
+
+    @Override
+    public Status directUpload(String content, String remoteFile) {
+        return null;
+    }
+
+    @Override
+    public Status rename(String origFilePath, String destFilePath) {
+        try {
+            fs.rename(new Path(origFilePath), new Path(destFilePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status renameDir(String origFilePath, String destFilePath, Runnable 
runWhenPathNotExist) {
+        Status status = exists(destFilePath);
+        if (status.ok()) {
+            throw new RuntimeException("Destination directory already exists: 
" + destFilePath);
+        }
+        String targetParent = new Path(destFilePath).getParent().toString();
+        status = exists(targetParent);
+        if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+            status = makeDir(targetParent);
+        }
+        if (!status.ok()) {
+            throw new RuntimeException(status.getErrMsg());
+        }
+
+        runWhenPathNotExist.run();
+
+        return rename(origFilePath, destFilePath);
+    }
+
+    @Override
+    public Status delete(String remotePath) {
+        try {
+            fs.delete(new Path(remotePath), true);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status makeDir(String remotePath) {
+        try {
+            fs.mkdirs(new Path(remotePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
+        try {
+            FileStatus[] locatedFileStatusRemoteIterator = fs.globStatus(new 
Path(remotePath));
+            if (locatedFileStatusRemoteIterator == null) {
+                return Status.OK;
+            }
+            for (FileStatus fileStatus : locatedFileStatusRemoteIterator) {
+                RemoteFile remoteFile = new RemoteFile(
+                        fileNameOnly ? fileStatus.getPath().getName() : 
fileStatus.getPath().toString(),
+                        !fileStatus.isDirectory(), fileStatus.isDirectory() ? 
-1 : fileStatus.getLen(),
+                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime());
+                result.add(remoteFile);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
+        try {
+            Path locatedPath = new Path(remotePath);
+            RemoteIterator<LocatedFileStatus> locatedFiles = 
fs.listFiles(locatedPath, recursive);
+            while (locatedFiles.hasNext()) {
+                LocatedFileStatus fileStatus = locatedFiles.next();
+                RemoteFile location = new RemoteFile(
+                        fileStatus.getPath(), fileStatus.isDirectory(), 
fileStatus.getLen(),
+                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime(), fileStatus.getBlockLocations());
+                result.add(location);
+            }
+        } catch (FileNotFoundException e) {
+            return new Status(Status.ErrCode.NOT_FOUND, e.getMessage());
+        } catch (Exception e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status listDirectories(String remotePath, Set<String> result) {
+        try {
+            FileStatus[] fileStatuses = fs.listStatus(new Path(remotePath));
+            result.addAll(
+                    Arrays.stream(fileStatuses)
+                            .filter(FileStatus::isDirectory)
+                            .map(file -> file.getPath().toString() + "/")
+                            .collect(ImmutableSet.toImmutableSet()));
+        } catch (IOException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
+
+    public void createFile(String path) throws IOException {
+        Path path1 = new Path(path);
+        if (!exists(path1.getParent().toString()).ok()) {
+            makeDir(path1.getParent().toString());
+        }
+        FSDataOutputStream build = fs.createFile(path1).build();
+        build.close();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFileRemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFileRemoteIterator.java
new file mode 100644
index 00000000000..1ee41aaecd0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFileRemoteIterator.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.fsv2.remote.RemoteFile;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class RemoteFileRemoteIterator
+        implements RemoteIterator<RemoteFile> {
+    private final List<RemoteFile> remoteFileList;
+    private int currentIndex = 0;
+
+    public RemoteFileRemoteIterator(List<RemoteFile> remoteFileList) {
+        this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator 
is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return currentIndex < remoteFileList.size();
+    }
+
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        if (!hasNext()) {
+            throw new NoSuchElementException("No more elements in 
RemoteFileRemoteIterator");
+        }
+        return remoteFileList.get(currentIndex++);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFiles.java
similarity index 54%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFiles.java
index b8898d9b279..54a80af5891 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteFiles.java
@@ -15,22 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
+package org.apache.doris.fsv2;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fs.remote.RemoteFile;
 
-import java.util.concurrent.Executor;
+import java.util.List;
 
-public class TransactionManagerFactory {
+public class RemoteFiles {
 
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
+    private final List<RemoteFile> files;
+
+    public RemoteFiles(List<RemoteFile> files) {
+        this.files = files;
     }
 
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
+    public List<RemoteFile> files() {
+        return files;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteIterator.java
similarity index 53%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteIterator.java
index b8898d9b279..9f93e3eb549 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/RemoteIterator.java
@@ -14,23 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java
+// and modified by Doris
 
-package org.apache.doris.transaction;
+package org.apache.doris.fsv2;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+public interface RemoteIterator<T> {
+    boolean hasNext() throws FileSystemIOException;
 
-import java.util.concurrent.Executor;
-
-public class TransactionManagerFactory {
-
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
-    }
-
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
-    }
+    T next() throws FileSystemIOException;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fsv2/SimpleRemoteIterator.java
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fsv2/SimpleRemoteIterator.java
index b8898d9b279..a631241fbcd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fsv2/SimpleRemoteIterator.java
@@ -15,22 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+package org.apache.doris.fsv2;
 
-import java.util.concurrent.Executor;
+import org.apache.doris.fsv2.remote.RemoteFile;
 
-public class TransactionManagerFactory {
+import java.util.Iterator;
+import java.util.Objects;
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java
+// and modified by Doris
 
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
-            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
-        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
+class SimpleRemoteIterator implements RemoteIterator<RemoteFile> {
+    private final Iterator<RemoteFile> iterator;
+
+    public SimpleRemoteIterator(Iterator<RemoteFile> iterator) {
+        this.iterator = Objects.requireNonNull(iterator, "iterator is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return iterator.hasNext();
     }
 
-    public static TransactionManager 
createIcebergTransactionManager(IcebergMetadataOps ops) {
-        return new IcebergTransactionManager(ops);
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        return iterator.next();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionDirectoryListingCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionDirectoryListingCacheKey.java
new file mode 100644
index 00000000000..ba4f80c3762
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionDirectoryListingCacheKey.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java
+// and modified by Doris
+
+package org.apache.doris.fsv2;
+
+import java.util.Objects;
+
+public class TransactionDirectoryListingCacheKey {
+
+    private final long transactionId;
+    private final String path;
+
+    public TransactionDirectoryListingCacheKey(long transactionId, String 
path) {
+        this.transactionId = transactionId;
+        this.path = Objects.requireNonNull(path, "path is null");
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TransactionDirectoryListingCacheKey that = 
(TransactionDirectoryListingCacheKey) o;
+        return transactionId == that.transactionId && path.equals(that.path);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transactionId, path);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("TransactionDirectoryListingCacheKey{");
+        sb.append("transactionId=").append(transactionId);
+        sb.append(", path='").append(path).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryLister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryLister.java
new file mode 100644
index 00000000000..f7bdc5e3c05
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryLister.java
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fsv2.remote.RemoteFile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.commons.collections.ListUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+/**
+ * Caches directory content (including listings that were started 
concurrently).
+ * {@link TransactionScopeCachingDirectoryLister} assumes that all listings
+ * are performed by same user within single transaction, therefore any failure 
can
+ * be shared between concurrent listings.
+ */
+public class TransactionScopeCachingDirectoryLister implements DirectoryLister 
{
+    private final long transactionId;
+
+    @VisibleForTesting
+    public Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> 
getCache() {
+        return cache;
+    }
+
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    private final Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cache;
+    private final DirectoryLister delegate;
+
+    public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, 
long transactionId,
+                                                  
Cache<TransactionDirectoryListingCacheKey,
+                                                          FetchingValueHolder> 
cache) {
+        this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+        this.transactionId = transactionId;
+        this.cache = Objects.requireNonNull(cache, "cache is null");
+    }
+
+    @Override
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        return listInternal(fs, recursive, table, new 
TransactionDirectoryListingCacheKey(transactionId, location));
+    }
+
+    private RemoteIterator<RemoteFile> listInternal(FileSystem fs, boolean 
recursive, TableIf table,
+                                                    
TransactionDirectoryListingCacheKey cacheKey)
+            throws FileSystemIOException {
+        FetchingValueHolder cachedValueHolder;
+        try {
+            cachedValueHolder = cache.get(cacheKey,
+                    () -> new 
FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, 
cacheKey)));
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable throwable = e.getCause();
+            Throwables.throwIfInstanceOf(throwable, 
FileSystemIOException.class);
+            Throwables.throwIfUnchecked(throwable);
+            throw new RuntimeException("Failed to list directory: " + 
cacheKey.getPath(), throwable);
+        }
+
+        if (cachedValueHolder.isFullyCached()) {
+            return new 
SimpleRemoteIterator(cachedValueHolder.getCachedFiles());
+        }
+
+        return cachingRemoteIterator(cachedValueHolder, cacheKey);
+    }
+
+    private RemoteIterator<RemoteFile> createListingRemoteIterator(FileSystem 
fs, boolean recursive,
+                                                                   TableIf 
table,
+                                                                   
TransactionDirectoryListingCacheKey cacheKey)
+            throws FileSystemIOException {
+        return delegate.listFiles(fs, recursive, table, cacheKey.getPath());
+    }
+
+
+    private RemoteIterator<RemoteFile> 
cachingRemoteIterator(FetchingValueHolder cachedValueHolder,
+                                                             
TransactionDirectoryListingCacheKey cacheKey) {
+        return new RemoteIterator<RemoteFile>() {
+            private int fileIndex;
+
+            @Override
+            public boolean hasNext()
+                    throws FileSystemIOException {
+                try {
+                    boolean hasNext = 
cachedValueHolder.getCachedFile(fileIndex).isPresent();
+                    // Update cache weight of cachedValueHolder for a given 
path.
+                    // The cachedValueHolder acts as an invalidation guard.
+                    // If a cache invalidation happens while this iterator 
goes over the files from the specified path,
+                    // the eventually outdated file listing will not be added 
anymore to the cache.
+                    cache.asMap().replace(cacheKey, cachedValueHolder, 
cachedValueHolder);
+                    return hasNext;
+                } catch (Exception exception) {
+                    // invalidate cached value to force retry of directory 
listing
+                    cache.invalidate(cacheKey);
+                    throw exception;
+                }
+            }
+
+            @Override
+            public RemoteFile next()
+                    throws FileSystemIOException {
+                // force cache entry weight update in case next file is cached
+                Preconditions.checkState(hasNext());
+                return 
cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new);
+            }
+        };
+    }
+
+    @VisibleForTesting
+    boolean isCached(String location) {
+        return isCached(new TransactionDirectoryListingCacheKey(transactionId, 
location));
+    }
+
+    @VisibleForTesting
+    boolean isCached(TransactionDirectoryListingCacheKey cacheKey) {
+        FetchingValueHolder cached = cache.getIfPresent(cacheKey);
+        return cached != null && cached.isFullyCached();
+    }
+
+    static class FetchingValueHolder {
+
+        private final List<RemoteFile> cachedFiles = 
ListUtils.synchronizedList(new ArrayList<RemoteFile>());
+
+        @GuardedBy("this")
+        @Nullable
+        private RemoteIterator<RemoteFile> fileIterator;
+        @GuardedBy("this")
+        @Nullable
+        private Exception exception;
+
+        public FetchingValueHolder(RemoteIterator<RemoteFile> fileIterator) {
+            this.fileIterator = Objects.requireNonNull(fileIterator, 
"fileIterator is null");
+        }
+
+        public synchronized boolean isFullyCached() {
+            return fileIterator == null && exception == null;
+        }
+
+        public long getCacheFileCount() {
+            return cachedFiles.size();
+        }
+
+        public Iterator<RemoteFile> getCachedFiles() {
+            Preconditions.checkState(isFullyCached());
+            return cachedFiles.iterator();
+        }
+
+        public Optional<RemoteFile> getCachedFile(int index)
+                throws FileSystemIOException {
+            int filesSize = cachedFiles.size();
+            Preconditions.checkArgument(index >= 0 && index <= filesSize,
+                    "File index (%s) out of bounds [0, %s]", index, filesSize);
+
+            // avoid fileIterator synchronization (and thus blocking) for 
already cached files
+            if (index < filesSize) {
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            return fetchNextCachedFile(index);
+        }
+
+        private synchronized Optional<RemoteFile> fetchNextCachedFile(int 
index)
+                throws FileSystemIOException {
+            if (exception != null) {
+                throw new FileSystemIOException("Exception while listing 
directory", exception);
+            }
+
+            if (index < cachedFiles.size()) {
+                // file was fetched concurrently
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            try {
+                if (fileIterator == null || !fileIterator.hasNext()) {
+                    // no more files
+                    fileIterator = null;
+                    return Optional.empty();
+                }
+
+                RemoteFile fileStatus = fileIterator.next();
+                cachedFiles.add(fileStatus);
+                return Optional.of(fileStatus);
+            } catch (Exception exception) {
+                fileIterator = null;
+                this.exception = exception;
+                throw exception;
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryListerFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryListerFactory.java
new file mode 100644
index 00000000000..24511201c0b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/TransactionScopeCachingDirectoryListerFactory.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java
+// and modified by Doris
+
+package org.apache.doris.fsv2;
+
+import org.apache.doris.common.EvictableCacheBuilder;
+import 
org.apache.doris.fsv2.TransactionScopeCachingDirectoryLister.FetchingValueHolder;
+
+import com.google.common.cache.Cache;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionScopeCachingDirectoryListerFactory {
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    // private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final AtomicLong nextTransactionId = new AtomicLong();
+
+    public TransactionScopeCachingDirectoryListerFactory(long maxSize) {
+        if (maxSize > 0) {
+            EvictableCacheBuilder<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cacheBuilder =
+                    EvictableCacheBuilder.newBuilder()
+                            .maximumWeight(maxSize)
+                            .weigher((key, value) ->
+                                    
Math.toIntExact(value.getCacheFileCount()));
+            this.cache = Optional.of(cacheBuilder.build());
+        } else {
+            cache = Optional.empty();
+        }
+    }
+
+    public DirectoryLister get(DirectoryLister delegate) {
+        return cache
+                .map(cache -> (DirectoryLister) new 
TransactionScopeCachingDirectoryLister(delegate,
+                        nextTransactionId.getAndIncrement(), cache))
+                .orElse(delegate);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/SwitchingFileSystem.java
similarity index 95%
copy from 
fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/SwitchingFileSystem.java
index ab7c91d693a..7ef6b462831 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fsv2/remote/SwitchingFileSystem.java
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.fs.remote;
+package org.apache.doris.fsv2.remote;
 
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.FileSystemCache;
+import org.apache.doris.fsv2.FileSystem;
+import org.apache.doris.fsv2.FileSystemCache;
 
 import java.util.List;
 import java.util.Map;
@@ -36,7 +36,7 @@ public class SwitchingFileSystem implements FileSystem {
     private final Map<String, String> properties;
 
     public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String 
bindBrokerName,
-            Map<String, String> properties) {
+                               Map<String, String> properties) {
         this.extMetaCacheMgr = extMetaCacheMgr;
         this.bindBrokerName = bindBrokerName;
         this.properties = properties;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index fc57a62ef88..74174a5105c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -71,9 +71,9 @@ import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
 import 
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
-import org.apache.doris.fs.DirectoryLister;
-import org.apache.doris.fs.FileSystemDirectoryLister;
-import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
+import org.apache.doris.fsv2.DirectoryLister;
+import org.apache.doris.fsv2.FileSystemDirectoryLister;
+import org.apache.doris.fsv2.TransactionScopeCachingDirectoryListerFactory;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.DistributionSpecAllSingleton;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
index 65f0c2bd5e3..5839643d770 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -19,7 +19,7 @@ package org.apache.doris.transaction;
 
 import org.apache.doris.datasource.hive.HMSTransaction;
 import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fsv2.FileSystemProvider;
 
 import java.util.concurrent.Executor;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
index b8898d9b279..fe6699626a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
@@ -19,7 +19,7 @@ package org.apache.doris.transaction;
 
 import org.apache.doris.datasource.hive.HiveMetadataOps;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fsv2.FileSystemProvider;
 
 import java.util.concurrent.Executor;
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index e49302cef7f..034b5479758 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -20,7 +20,7 @@ package org.apache.doris.common.util;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.util.LocationPath.Scheme;
 import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.fs.FileSystemType;
+import org.apache.doris.fsv2.FileSystemType;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -41,7 +41,7 @@ public class LocationPathTest {
         String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("hdfs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
Collections.emptyMap(), null).first,
-                FileSystemType.DFS);
+                FileSystemType.HDFS);
 
         // HA props
         Map<String, String> props = new HashMap<>();
@@ -130,13 +130,13 @@ public class LocationPathTest {
         locationPath = new 
LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps);
         Assertions.assertEquals("oss://test.oss-dls.aliyuncs.com/path", 
locationPath.get());
         Assertions.assertEquals(LocationPath.getFSIdentity(locationPath.get(), 
rangeProps, null).first,
-                FileSystemType.DFS);
+                FileSystemType.HDFS);
         // FE
         
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
         // BE
         beLocation = locationPath.toStorageLocation().toString();
         
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
-        Assertions.assertEquals(locationPath.getFileSystemType(), 
FileSystemType.DFS);
+        Assertions.assertEquals(locationPath.getFileSystemType(), 
FileSystemType.HDFS);
     }
 
     @Test
@@ -177,7 +177,7 @@ public class LocationPathTest {
         beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("gfs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
Collections.emptyMap(), null).first,
-                FileSystemType.DFS);
+                FileSystemType.HDFS);
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java
index a54084e9b45..dd97addf2b9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java
@@ -19,7 +19,7 @@ package org.apache.doris.datasource.hive;
 
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
-import org.apache.doris.fs.LocalDfsFileSystem;
+import org.apache.doris.fsv2.LocalDfsFileSystem;
 
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 61b373706d9..217057c9123 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -24,10 +24,10 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.TestHMSCachedClient;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.FileSystemProvider;
-import org.apache.doris.fs.LocalDfsFileSystem;
-import org.apache.doris.fs.remote.SwitchingFileSystem;
+import org.apache.doris.fsv2.FileSystem;
+import org.apache.doris.fsv2.FileSystemProvider;
+import org.apache.doris.fsv2.LocalDfsFileSystem;
+import org.apache.doris.fsv2.remote.SwitchingFileSystem;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.THiveLocationParams;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fs/remote/RemoteFileSystemTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/fs/remote/RemoteFileSystemTest.java
deleted file mode 100644
index 3fc15ab8e37..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/fs/remote/RemoteFileSystemTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.remote;
-
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import 
org.apache.doris.common.security.authentication.HadoopKerberosAuthenticator;
-import 
org.apache.doris.common.security.authentication.HadoopSimpleAuthenticator;
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.fs.FileSystemCache;
-import org.apache.doris.fs.FileSystemType;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import mockit.Mock;
-import mockit.MockUp;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Map;
-
-public class RemoteFileSystemTest {
-
-    @Test
-    public void testFilesystemAndAuthType() throws UserException {
-
-        // These paths should use s3 filesystem, and use simple auth
-        ArrayList<String> s3Paths = new ArrayList<>();
-        s3Paths.add("s3://a/b/c");
-        s3Paths.add("s3a://a/b/c");
-        s3Paths.add("s3n://a/b/c");
-        s3Paths.add("oss://a/b/c");  // default use s3 filesystem
-        s3Paths.add("gs://a/b/c");
-        s3Paths.add("bos://a/b/c");
-        s3Paths.add("cos://a/b/c");
-        s3Paths.add("cosn://a/b/c");
-        s3Paths.add("lakefs://a/b/c");
-        s3Paths.add("obs://a/b/c");
-
-        // These paths should use dfs filesystem, and auth will be changed by 
configure
-        ArrayList<String> dfsPaths = new ArrayList<>();
-        dfsPaths.add("ofs://a/b/c");
-        dfsPaths.add("gfs://a/b/c");
-        dfsPaths.add("hdfs://a/b/c");
-        dfsPaths.add("oss://a/b/c");  // if endpoint contains 
'oss-dls.aliyuncs', will use dfs filesystem
-
-        new MockUp<UserGroupInformation>(UserGroupInformation.class) {
-            @Mock
-            public <T> T doAs(PrivilegedExceptionAction<T> action) throws 
IOException, InterruptedException {
-                return (T) new LocalFileSystem();
-            }
-        };
-
-        new 
MockUp<HadoopKerberosAuthenticator>(HadoopKerberosAuthenticator.class) {
-            @Mock
-            public synchronized UserGroupInformation getUGI() throws 
IOException {
-                return UserGroupInformation.getCurrentUser();
-            }
-        };
-
-        Configuration confWithoutKerberos = new Configuration();
-
-        Configuration confWithKerberosIncomplete = new Configuration();
-        
confWithKerberosIncomplete.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
 "kerberos");
-
-        Configuration confWithKerberos = new Configuration();
-        
confWithKerberos.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
 "kerberos");
-        confWithKerberos.set(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, 
"principal");
-        confWithKerberos.set(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, 
"keytab");
-
-        ImmutableMap<String, String> s3props = ImmutableMap.of("s3.endpoint", 
"http://127.0.0.1";);
-        s3props.forEach(confWithKerberos::set);
-        s3props.forEach(confWithoutKerberos::set);
-        s3props.forEach(confWithKerberosIncomplete::set);
-
-        for (String path : s3Paths) {
-            checkS3Filesystem(path, confWithKerberos, s3props);
-        }
-        for (String path : s3Paths) {
-            checkS3Filesystem(path, confWithKerberosIncomplete, s3props);
-        }
-        for (String path : s3Paths) {
-            checkS3Filesystem(path, confWithoutKerberos, s3props);
-        }
-
-        s3props = ImmutableMap.of("s3.endpoint", 
"oss://xx-oss-dls.aliyuncs/abc");
-        System.setProperty("java.security.krb5.realm", "realm");
-        System.setProperty("java.security.krb5.kdc", "kdc");
-
-        for (String path : dfsPaths) {
-            checkDFSFilesystem(path, confWithKerberos, 
HadoopKerberosAuthenticator.class.getName(), s3props);
-        }
-        for (String path : dfsPaths) {
-            checkDFSFilesystem(path, confWithKerberosIncomplete, 
HadoopSimpleAuthenticator.class.getName(), s3props);
-        }
-        for (String path : dfsPaths) {
-            checkDFSFilesystem(path, confWithoutKerberos, 
HadoopSimpleAuthenticator.class.getName(), s3props);
-        }
-
-    }
-
-    private void checkS3Filesystem(String path, Configuration conf, 
Map<String, String> m) throws UserException {
-        RemoteFileSystem fs = createFs(path, conf, m);
-        Assert.assertTrue(fs instanceof S3FileSystem);
-        HadoopAuthenticator authenticator = ((S3FileSystem) 
fs).getAuthenticator();
-        Assert.assertTrue(authenticator instanceof HadoopSimpleAuthenticator);
-    }
-
-    private void checkDFSFilesystem(String path, Configuration conf, String 
authClass, Map<String, String> m) throws UserException {
-        RemoteFileSystem fs = createFs(path, conf, m);
-        Assert.assertTrue(fs instanceof DFSFileSystem);
-        HadoopAuthenticator authenticator = ((DFSFileSystem) 
fs).getAuthenticator();
-        Assert.assertEquals(authClass, authenticator.getClass().getName());
-    }
-
-    private RemoteFileSystem createFs(String path, Configuration conf, 
Map<String, String> m) throws UserException {
-        LocationPath locationPath = new LocationPath(path, m);
-        FileSystemType fileSystemType = locationPath.getFileSystemType();
-        URI uri = locationPath.getPath().toUri();
-        String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + 
Strings.nullToEmpty(uri.getAuthority());
-        FileSystemCache fileSystemCache = new FileSystemCache();
-        RemoteFileSystem fs = fileSystemCache.getRemoteFileSystem(
-            new FileSystemCache.FileSystemCacheKey(
-                Pair.of(fileSystemType, fsIdent),
-                ImmutableMap.of(),
-                null,
-                conf));
-        fs.nativeFileSystem(path);
-        return fs;
-    }
-
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fsv2/obj/S3ObjStorageTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/fsv2/obj/S3ObjStorageTest.java
new file mode 100644
index 00000000000..f655a9d5654
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/fsv2/obj/S3ObjStorageTest.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2.obj;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+public class S3ObjStorageTest {
+    private S3ObjStorage storage;
+    private S3Client mockClient;
+    private AbstractS3CompatibleProperties mockProperties;
+
+    @BeforeEach
+    void setUp() throws UserException {
+        mockProperties = Mockito.mock(AbstractS3CompatibleProperties.class);
+        
Mockito.when(mockProperties.getEndpoint()).thenReturn("http://s3.example.com";);
+        Mockito.when(mockProperties.getRegion()).thenReturn("us-east-1");
+        Mockito.when(mockProperties.getUsePathStyle()).thenReturn("false");
+        
Mockito.when(mockProperties.getForceParsingByStandardUrl()).thenReturn("false");
+        // storage = new S3ObjStorage(mockProperties);
+        mockClient = Mockito.mock(S3Client.class);
+        storage = Mockito.spy(new S3ObjStorage(mockProperties));
+        Mockito.doReturn(mockClient).when(storage).getClient();
+    }
+
+    @Test
+    @DisplayName("getClient should return a valid S3Client instance")
+    void getClientReturnsValidS3Client() throws UserException {
+        S3Client client = storage.getClient();
+        Assertions.assertNotNull(client);
+    }
+
+    @Test
+    @DisplayName("headObject should return OK status when object exists")
+    void headObjectReturnsOkWhenObjectExists() throws UserException {
+        
Mockito.when(mockClient.headObject(Mockito.any(HeadObjectRequest.class)))
+                .thenReturn(HeadObjectResponse.builder().build());
+
+        Status status = storage.headObject("s3://bucket/key");
+        Assertions.assertEquals(Status.OK, status);
+    }
+
+    @Test
+    @DisplayName("headObject should return NOT_FOUND status when object does 
not exist")
+    void headObjectReturnsNotFoundWhenObjectDoesNotExist() throws 
UserException {
+        
Mockito.when(mockClient.headObject(Mockito.any(HeadObjectRequest.class)))
+                .thenThrow(S3Exception.builder().statusCode(404).build());
+
+        Status status = storage.headObject("s3://bucket/nonexistent-key");
+        Assertions.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode());
+    }
+
+    @Test
+    @DisplayName("headObject should return COMMON_ERROR status for other 
exceptions")
+    void headObjectReturnsErrorForOtherExceptions() throws UserException {
+        
Mockito.when(mockClient.headObject(Mockito.any(HeadObjectRequest.class)))
+                .thenThrow(S3Exception.builder().statusCode(500).build());
+
+        Status status = storage.headObject("s3://bucket/key");
+        Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, 
status.getErrCode());
+    }
+
+    @Test
+    @DisplayName("putObject should return OK status when upload succeeds")
+    void putObjectReturnsOkWhenUploadSucceeds() throws UserException {
+        Mockito.when(mockClient.putObject(Mockito.any(PutObjectRequest.class), 
Mockito.any(RequestBody.class)))
+                .thenReturn(PutObjectResponse.builder().build());
+
+        InputStream content = new ByteArrayInputStream("test 
content".getBytes());
+        Status status = storage.putObject("s3://bucket/key", content, 12);
+        Assertions.assertEquals(Status.OK, status);
+    }
+
+    @Test
+    @DisplayName("putObject should return COMMON_ERROR status when upload 
fails")
+    void putObjectReturnsErrorWhenUploadFails() throws UserException {
+        Mockito.when(mockClient.putObject(Mockito.any(PutObjectRequest.class), 
Mockito.any(RequestBody.class)))
+                .thenThrow(S3Exception.builder().statusCode(500).build());
+
+        InputStream content = new ByteArrayInputStream("test 
content".getBytes());
+        Status status = storage.putObject("s3://bucket/key", content, 12);
+        Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, 
status.getErrCode());
+    }
+
+    @Test
+    @DisplayName("deleteObject should return OK status when object is deleted 
successfully")
+    void deleteObjectReturnsOkWhenDeletionSucceeds() throws UserException {
+        
Mockito.when(mockClient.deleteObject(Mockito.any(DeleteObjectRequest.class)))
+                .thenReturn(DeleteObjectResponse.builder().build());
+
+        Status status = storage.deleteObject("s3://bucket/key");
+        Assertions.assertEquals(Status.OK, status);
+    }
+
+    @Test
+    @DisplayName("deleteObject should return OK status when object does not 
exist")
+    void deleteObjectReturnsOkWhenObjectDoesNotExist() throws UserException {
+        
Mockito.when(mockClient.deleteObject(Mockito.any(DeleteObjectRequest.class)))
+                .thenThrow(S3Exception.builder().statusCode(404).build());
+
+        Status status = storage.deleteObject("s3://bucket/nonexistent-key");
+        Assertions.assertEquals(Status.OK, status);
+    }
+
+    @Test
+    @DisplayName("deleteObject should return COMMON_ERROR status for other 
exceptions")
+    void deleteObjectReturnsErrorForOtherExceptions() throws UserException {
+        
Mockito.when(mockClient.deleteObject(Mockito.any(DeleteObjectRequest.class)))
+                .thenThrow(S3Exception.builder().statusCode(500).build());
+
+        Status status = storage.deleteObject("s3://bucket/key");
+        Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, 
status.getErrCode());
+    }
+
+    @Test
+    @DisplayName("listObjects should return a list of objects when objects 
exist")
+    void listObjectsReturnsObjectsWhenObjectsExist() throws UserException {
+        ListObjectsV2Response response = ListObjectsV2Response.builder()
+                
.contents(S3Object.builder().key("prefix/key1").size(100L).build(),
+                        
S3Object.builder().key("prefix/key2").size(200L).build())
+                .isTruncated(false)
+                .build();
+        
Mockito.when(mockClient.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(response);
+
+        RemoteObjects objects = storage.listObjects("s3://bucket/prefix", 
null);
+        Assertions.assertEquals(2, objects.getObjectList().size());
+    }
+
+    @Test
+    @DisplayName("listObjects should throw DdlException for errors")
+    void listObjectsThrowsExceptionForErrors() throws UserException {
+        
Mockito.when(mockClient.listObjectsV2(Mockito.any(ListObjectsV2Request.class)))
+                .thenThrow(S3Exception.builder().statusCode(500).build());
+
+        Assertions.assertThrows(DdlException.class, () -> 
storage.listObjects("s3://bucket/prefix", null));
+    }
+
+    @Test
+    @DisplayName("multipartUpload should return OK status when upload 
succeeds")
+    void multipartUploadReturnsOkWhenUploadSucceeds() throws Exception {
+        
Mockito.when(mockClient.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
+                
.thenReturn(CreateMultipartUploadResponse.builder().uploadId("uploadId").build());
+        
Mockito.when(mockClient.uploadPart(Mockito.any(UploadPartRequest.class), 
Mockito.any(RequestBody.class)))
+                .thenReturn(UploadPartResponse.builder().eTag("etag").build());
+        
Mockito.when(mockClient.completeMultipartUpload(Mockito.any(CompleteMultipartUploadRequest.class)))
+                .thenReturn(CompleteMultipartUploadResponse.builder().build());
+
+        InputStream content = new ByteArrayInputStream(new byte[10 * 1024 * 
1024]); // 10 MB
+        Status status = storage.multipartUpload("s3://bucket/key", content, 10 
* 1024 * 1024);
+        Assertions.assertEquals(Status.OK, status);
+    }
+
+    @Test
+    @DisplayName("multipartUpload should return COMMON_ERROR status when 
upload fails")
+    void multipartUploadReturnsErrorWhenUploadFails() throws Exception {
+        
Mockito.when(mockClient.createMultipartUpload(Mockito.any(CreateMultipartUploadRequest.class)))
+                
.thenReturn(CreateMultipartUploadResponse.builder().uploadId("uploadId").build());
+        
Mockito.when(mockClient.uploadPart(Mockito.any(UploadPartRequest.class), 
Mockito.any(RequestBody.class)))
+                .thenThrow(S3Exception.builder().statusCode(500).build());
+
+        InputStream content = new ByteArrayInputStream(new byte[10 * 1024 * 
1024]); // 10 MB
+        Status status = storage.multipartUpload("s3://bucket/key", content, 10 
* 1024 * 1024);
+        Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, 
status.getErrCode());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fsv2/remote/RemoteFileSystemTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/fsv2/remote/RemoteFileSystemTest.java
new file mode 100644
index 00000000000..df5225ff3c0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/fsv2/remote/RemoteFileSystemTest.java
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fsv2.remote;
+
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.backup.Status;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class RemoteFileSystemTest {
+
+    private RemoteFileSystem remoteFileSystem;
+    private FileSystem mockFileSystem;
+
+    @BeforeEach
+    void setUp() {
+        remoteFileSystem = Mockito.spy(new RemoteFileSystem("test", 
StorageBackend.StorageType.HDFS) {
+            @Override
+            public Status exists(String remotePath) {
+                return null;
+            }
+
+            @Override
+            public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
+                return null;
+            }
+
+            @Override
+            public Status upload(String localPath, String remotePath) {
+                return null;
+            }
+
+            @Override
+            public Status directUpload(String content, String remoteFile) {
+                return null;
+            }
+
+            @Override
+            public Status rename(String origFilePath, String destFilePath) {
+                return null;
+            }
+
+            @Override
+            public Status delete(String remotePath) {
+                return null;
+            }
+
+            @Override
+            public Status makeDir(String remotePath) {
+                return null;
+            }
+
+            @Override
+            public Status globList(String remotePath, List<RemoteFile> result, 
boolean fileNameOnly) {
+                return null;
+            }
+        });
+        mockFileSystem = Mockito.mock(FileSystem.class);
+    }
+
+    @Test
+    @DisplayName("listFiles should return OK status and populate result list 
when files are found")
+    void listFilesReturnsOkWhenFilesFound() throws Exception {
+        String remotePath = "/test/path";
+        List<RemoteFile> result = new ArrayList<>();
+        RemoteIterator<LocatedFileStatus> mockIterator = 
Mockito.mock(RemoteIterator.class);
+        LocatedFileStatus mockFileStatus = 
Mockito.mock(LocatedFileStatus.class);
+
+        
Mockito.doReturn(mockFileSystem).when(remoteFileSystem).nativeFileSystem(remotePath);
+        Mockito.when(mockFileSystem.listFiles(new Path(remotePath), 
true)).thenReturn(mockIterator);
+        Mockito.when(mockIterator.hasNext()).thenReturn(true, false);
+        Mockito.when(mockIterator.next()).thenReturn(mockFileStatus);
+        Mockito.when(mockFileStatus.getPath()).thenReturn(new 
Path("/test/path/file1"));
+        Mockito.when(mockFileStatus.isDirectory()).thenReturn(false);
+        Mockito.when(mockFileStatus.getLen()).thenReturn(100L);
+        Mockito.when(mockFileStatus.getBlockSize()).thenReturn(128L);
+        
Mockito.when(mockFileStatus.getModificationTime()).thenReturn(123456789L);
+        Mockito.when(mockFileStatus.getBlockLocations()).thenReturn(null);
+
+        Status status = remoteFileSystem.listFiles(remotePath, true, result);
+
+        Assertions.assertEquals(Status.OK, status);
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertEquals("/test/path/file1", 
result.get(0).getPath().toString());
+    }
+
+    @Test
+    @DisplayName("listFiles should return NOT_FOUND status when 
FileNotFoundException is thrown")
+    void listFilesReturnsNotFoundWhenFileNotFoundExceptionThrown() throws 
Exception {
+
+        mockFileSystem = Mockito.mock(FileSystem.class);
+        String remotePath = "/nonexistent/path";
+        List<RemoteFile> result = new ArrayList<>();
+        
Mockito.doReturn(mockFileSystem).when(remoteFileSystem).nativeFileSystem(remotePath);
+        Mockito.doThrow(new FileNotFoundException("File not found"))
+                .when(mockFileSystem).listFiles(Mockito.any(Path.class), 
Mockito.anyBoolean());
+
+        Status status = remoteFileSystem.listFiles(remotePath, true, result);
+
+        Assertions.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode());
+        Assertions.assertTrue(result.isEmpty());
+    }
+
+    @Test
+    @DisplayName("listDirectories should return OK status and populate result 
set with directories")
+    void listDirectoriesReturnsOkWhenDirectoriesFound() throws Exception {
+        String remotePath = "/test/path";
+        Set<String> result = new HashSet<>();
+        FileStatus[] mockFileStatuses = {
+                Mockito.mock(FileStatus.class),
+                Mockito.mock(FileStatus.class)
+        };
+
+        
Mockito.doReturn(mockFileSystem).when(remoteFileSystem).nativeFileSystem(remotePath);
+        Mockito.when(mockFileSystem.listStatus(new 
Path(remotePath))).thenReturn(mockFileStatuses);
+        Mockito.when(mockFileStatuses[0].isDirectory()).thenReturn(true);
+        Mockito.when(mockFileStatuses[0].getPath()).thenReturn(new 
Path("/test/path/dir1"));
+        Mockito.when(mockFileStatuses[1].isDirectory()).thenReturn(false);
+
+        Status status = remoteFileSystem.listDirectories(remotePath, result);
+
+        Assertions.assertEquals(Status.OK, status);
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertTrue(result.contains("/test/path/dir1/"));
+    }
+
+    @Test
+    @DisplayName("renameDir should return COMMON_ERROR when destination 
directory exists")
+    void renameDirReturnsErrorWhenDestinationExists() throws Exception {
+        String origFilePath = "/test/path/orig";
+        String destFilePath = "/test/path/dest";
+
+        Mockito.doReturn(new Status(Status.ErrCode.OK, ""))
+                .when(remoteFileSystem).exists(destFilePath);
+
+        Status status = remoteFileSystem.renameDir(origFilePath, destFilePath, 
() -> {
+        });
+
+        Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, 
status.getErrCode());
+        Assertions.assertEquals("Destination directory already exists: 
/test/path/dest", status.getErrMsg());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to