This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ed89565fcbe [fix](split) FileSystemCacheKey are always different in 
overload equals (#36432)
ed89565fcbe is described below

commit ed89565fcbe3fa712a35a0fc30114d3ff65fe076
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Fri Jun 21 08:56:50 2024 +0800

    [fix](split) FileSystemCacheKey are always different in overload equals 
(#36432)
    
    ## Proposed changes
    
    ## Fixed Bugs introduced from #33937
    1. `FileSystemCacheKey.equals()` compares properties by `==`, resulting
    in creating new file system in each partition
    2. `dfsFileSystem` is not synchronized, resulting in creating more file
    systems than need.
    3. `jobConf.iterator()` will produce more than 2000 pairs of key-value
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 20 +++++------
 .../java/org/apache/doris/fs/FileSystemCache.java  | 40 +++++++++++++++++-----
 .../org/apache/doris/fs/remote/S3FileSystem.java   | 24 +++++++------
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  | 35 +++++++++----------
 4 files changed, 70 insertions(+), 49 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index b76b4675dee..f402d27cf6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -349,11 +349,11 @@ public class HiveMetaStoreCache {
             List<String> partitionValues,
             String bindBrokerName) throws UserException {
         FileCacheValue result = new FileCacheValue();
-        Map<String, String> properties = new HashMap<>();
-        jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), 
e.getValue()));
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
-                        location, bindBrokerName), properties, 
bindBrokerName));
+                        location, bindBrokerName),
+                        catalog.getCatalogProperty().getProperties(),
+                        bindBrokerName, jobConf));
         result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
         // For Tez engine, it may generate subdirectoies for "union" query.
         // So there may be files and directories in the table directory at the 
same time. eg:
@@ -781,12 +781,12 @@ public class HiveMetaStoreCache {
                         return Collections.emptyList();
                     }
                     String acidVersionPath = new Path(baseOrDeltaPath, 
"_orc_acid_version").toUri().toString();
-                    Map<String, String> properties = new HashMap<>();
-                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
-                                            bindBrokerName), properties, 
bindBrokerName));
+                                            bindBrokerName),
+                                            
catalog.getCatalogProperty().getProperties(),
+                                            bindBrokerName, jobConf));
                     Status status = fs.exists(acidVersionPath);
                     if (status != Status.OK) {
                         if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -806,12 +806,10 @@ public class HiveMetaStoreCache {
                 List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
                 for (AcidUtils.ParsedDelta delta : 
directory.getCurrentDirectories()) {
                     String location = delta.getPath().toString();
-                    Map<String, String> properties = new HashMap<>();
-                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                    properties, bindBrokerName));
+                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
@@ -833,12 +831,10 @@ public class HiveMetaStoreCache {
                 // base
                 if (directory.getBaseDirectory() != null) {
                     String location = directory.getBaseDirectory().toString();
-                    Map<String, String> properties = new HashMap<>();
-                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                    properties, bindBrokerName));
+                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index dd66c359b9d..e96258dc719 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -23,14 +23,16 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
 
 public class FileSystemCache {
 
-    private LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;
+    private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> 
fileSystemCache;
 
     public FileSystemCache() {
         // no need to set refreshAfterWrite, because the FileSystem is created 
once and never changed
@@ -40,11 +42,11 @@ public class FileSystemCache {
                 Config.max_remote_file_system_cache_num,
                 false,
                 null);
-        fileSystemCache = fsCacheFactory.buildCache(key -> 
loadFileSystem(key));
+        fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem);
     }
 
     private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
-        return FileSystemFactory.getRemoteFileSystem(key.type, key.properties, 
key.bindBrokerName);
+        return FileSystemFactory.getRemoteFileSystem(key.type, 
key.getFsProperties(), key.bindBrokerName);
     }
 
     public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -57,13 +59,32 @@ public class FileSystemCache {
         private final String fsIdent;
         private final Map<String, String> properties;
         private final String bindBrokerName;
+        // only for creating new file system
+        private final Configuration conf;
 
         public FileSystemCacheKey(Pair<FileSystemType, String> fs,
-                Map<String, String> properties, String bindBrokerName) {
+                Map<String, String> properties,
+                String bindBrokerName,
+                Configuration conf) {
             this.type = fs.first;
             this.fsIdent = fs.second;
             this.properties = properties;
             this.bindBrokerName = bindBrokerName;
+            this.conf = conf;
+        }
+
+        public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+                Map<String, String> properties, String bindBrokerName) {
+            this(fs, properties, bindBrokerName, null);
+        }
+
+        public Map<String, String> getFsProperties() {
+            if (conf == null) {
+                return properties;
+            }
+            Map<String, String> result = new HashMap<>();
+            conf.iterator().forEachRemaining(e -> result.put(e.getKey(), 
e.getValue()));
+            return result;
         }
 
         @Override
@@ -74,13 +95,14 @@ public class FileSystemCache {
             if (!(obj instanceof FileSystemCacheKey)) {
                 return false;
             }
-            boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) 
obj).type)
-                    && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
-                    && properties == ((FileSystemCacheKey) obj).properties;
+            FileSystemCacheKey o = (FileSystemCacheKey) obj;
+            boolean equalsWithoutBroker = type.equals(o.type)
+                    && fsIdent.equals(o.fsIdent)
+                    && properties.equals(o.properties);
             if (bindBrokerName == null) {
-                return equalsWithoutBroker;
+                return equalsWithoutBroker && o.bindBrokerName == null;
             }
-            return equalsWithoutBroker && 
bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
+            return equalsWithoutBroker && 
bindBrokerName.equals(o.bindBrokerName);
         }
 
         @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 2b94d2195da..3130a0cea52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -58,16 +58,20 @@ public class S3FileSystem extends ObjFileSystem {
     @Override
     protected FileSystem nativeFileSystem(String remotePath) throws 
UserException {
         if (dfsFileSystem == null) {
-            Configuration conf = new Configuration();
-            System.setProperty("com.amazonaws.services.s3.enableV4", "true");
-            // the entry value in properties may be null, and
-            
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
-                    .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
-                    .forEach(entry -> conf.set(entry.getKey(), 
entry.getValue()));
-            try {
-                dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), 
conf);
-            } catch (Exception e) {
-                throw new UserException("Failed to get S3 FileSystem for " + 
e.getMessage(), e);
+            synchronized (this) {
+                if (dfsFileSystem == null) {
+                    Configuration conf = new Configuration();
+                    System.setProperty("com.amazonaws.services.s3.enableV4", 
"true");
+                    // the entry value in properties may be null, and
+                    
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
+                            .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
+                            .forEach(entry -> conf.set(entry.getKey(), 
entry.getValue()));
+                    try {
+                        dfsFileSystem = FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                    } catch (Exception e) {
+                        throw new UserException("Failed to get S3 FileSystem 
for " + e.getMessage(), e);
+                    }
+                }
             }
         }
         return dfsFileSystem;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7f31f8eed49..d608653024f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -30,7 +30,6 @@ import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -73,25 +72,25 @@ public class DFSFileSystem extends RemoteFileSystem {
     @VisibleForTesting
     @Override
     public FileSystem nativeFileSystem(String remotePath) throws UserException 
{
-        if (dfsFileSystem != null) {
-            return dfsFileSystem;
-        }
-
-        Configuration conf = new HdfsConfiguration();
-        for (Map.Entry<String, String> propEntry : properties.entrySet()) {
-            conf.set(propEntry.getKey(), propEntry.getValue());
-        }
+        if (dfsFileSystem == null) {
+            synchronized (this) {
+                if (dfsFileSystem == null) {
+                    Configuration conf = new HdfsConfiguration();
+                    for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
+                        conf.set(propEntry.getKey(), propEntry.getValue());
+                    }
 
-        dfsFileSystem = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
-            try {
-                return FileSystem.get(new Path(remotePath).toUri(), conf);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+                    dfsFileSystem = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
+                        try {
+                            return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                    operations = new HDFSFileOperations(dfsFileSystem);
+                }
             }
-        });
-
-        Preconditions.checkNotNull(dfsFileSystem);
-        operations = new HDFSFileOperations(dfsFileSystem);
+        }
         return dfsFileSystem;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to