This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ed89565fcbe [fix](split) FileSystemCacheKey are always different in overload equals (#36432) ed89565fcbe is described below commit ed89565fcbe3fa712a35a0fc30114d3ff65fe076 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Fri Jun 21 08:56:50 2024 +0800 [fix](split) FileSystemCacheKey are always different in overload equals (#36432) ## Proposed changes ## Fixed Bugs introduced from #33937 1. `FileSystemCacheKey.equals()` compares properties by `==`, resulting in creating new file system in each partition 2. `dfsFileSystem` is not synchronized, resulting in creating more file systems than need. 3. `jobConf.iterator()` will produce more than 2000 pairs of key-value --- .../doris/datasource/hive/HiveMetaStoreCache.java | 20 +++++------ .../java/org/apache/doris/fs/FileSystemCache.java | 40 +++++++++++++++++----- .../org/apache/doris/fs/remote/S3FileSystem.java | 24 +++++++------ .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 35 +++++++++---------- 4 files changed, 70 insertions(+), 49 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index b76b4675dee..f402d27cf6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -349,11 +349,11 @@ public class HiveMetaStoreCache { List<String> partitionValues, String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); - Map<String, String> properties = new HashMap<>(); - jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( - location, bindBrokerName), properties, bindBrokerName)); + location, bindBrokerName), + catalog.getCatalogProperty().getProperties(), + bindBrokerName, jobConf)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location)); // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -781,12 +781,12 @@ public class HiveMetaStoreCache { return Collections.emptyList(); } String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); - Map<String, String> properties = new HashMap<>(); - jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), properties, bindBrokerName)); + bindBrokerName), + catalog.getCatalogProperty().getProperties(), + bindBrokerName, jobConf)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -806,12 +806,10 @@ public class HiveMetaStoreCache { List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>(); for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); - Map<String, String> properties = new HashMap<>(); - jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - properties, bindBrokerName)); + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); List<RemoteFile> remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { @@ -833,12 +831,10 @@ public class HiveMetaStoreCache { // base if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); - Map<String, String> properties = new HashMap<>(); - jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - properties, bindBrokerName)); + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); List<RemoteFile> remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index dd66c359b9d..e96258dc719 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -23,14 +23,16 @@ import org.apache.doris.common.Pair; import org.apache.doris.fs.remote.RemoteFileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.hadoop.conf.Configuration; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; public class FileSystemCache { - private LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache; + private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache; public FileSystemCache() { // no need to set refreshAfterWrite, because the FileSystem is created once and never changed @@ -40,11 +42,11 @@ public class FileSystemCache { Config.max_remote_file_system_cache_num, false, null); - fileSystemCache = fsCacheFactory.buildCache(key -> loadFileSystem(key)); + fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem); } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - return FileSystemFactory.getRemoteFileSystem(key.type, key.properties, key.bindBrokerName); + return FileSystemFactory.getRemoteFileSystem(key.type, key.getFsProperties(), key.bindBrokerName); } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { @@ -57,13 +59,32 @@ public class FileSystemCache { private final String fsIdent; private final Map<String, String> properties; private final String bindBrokerName; + // only for creating new file system + private final Configuration conf; public FileSystemCacheKey(Pair<FileSystemType, String> fs, - Map<String, String> properties, String bindBrokerName) { + Map<String, String> properties, + String bindBrokerName, + Configuration conf) { this.type = fs.first; this.fsIdent = fs.second; this.properties = properties; this.bindBrokerName = bindBrokerName; + this.conf = conf; + } + + public FileSystemCacheKey(Pair<FileSystemType, String> fs, + Map<String, String> properties, String bindBrokerName) { + this(fs, properties, bindBrokerName, null); + } + + public Map<String, String> getFsProperties() { + if (conf == null) { + return properties; + } + Map<String, String> result = new HashMap<>(); + conf.iterator().forEachRemaining(e -> result.put(e.getKey(), e.getValue())); + return result; } @Override @@ -74,13 +95,14 @@ public class FileSystemCache { if (!(obj instanceof FileSystemCacheKey)) { return false; } - boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type) - && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) - && properties == ((FileSystemCacheKey) obj).properties; + FileSystemCacheKey o = (FileSystemCacheKey) obj; + boolean equalsWithoutBroker = type.equals(o.type) + && fsIdent.equals(o.fsIdent) + && properties.equals(o.properties); if (bindBrokerName == null) { - return equalsWithoutBroker; + return equalsWithoutBroker && o.bindBrokerName == null; } - return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName); + return equalsWithoutBroker && bindBrokerName.equals(o.bindBrokerName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 2b94d2195da..3130a0cea52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -58,16 +58,20 @@ public class S3FileSystem extends ObjFileSystem { @Override protected FileSystem nativeFileSystem(String remotePath) throws UserException { if (dfsFileSystem == null) { - Configuration conf = new Configuration(); - System.setProperty("com.amazonaws.services.s3.enableV4", "true"); - // the entry value in properties may be null, and - PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream() - .filter(entry -> entry.getKey() != null && entry.getValue() != null) - .forEach(entry -> conf.set(entry.getKey(), entry.getValue())); - try { - dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (Exception e) { - throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); + synchronized (this) { + if (dfsFileSystem == null) { + Configuration conf = new Configuration(); + System.setProperty("com.amazonaws.services.s3.enableV4", "true"); + // the entry value in properties may be null, and + PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream() + .filter(entry -> entry.getKey() != null && entry.getValue() != null) + .forEach(entry -> conf.set(entry.getKey(), entry.getValue())); + try { + dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (Exception e) { + throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); + } + } } } return dfsFileSystem; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7f31f8eed49..d608653024f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -30,7 +30,6 @@ import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -73,25 +72,25 @@ public class DFSFileSystem extends RemoteFileSystem { @VisibleForTesting @Override public FileSystem nativeFileSystem(String remotePath) throws UserException { - if (dfsFileSystem != null) { - return dfsFileSystem; - } - - Configuration conf = new HdfsConfiguration(); - for (Map.Entry<String, String> propEntry : properties.entrySet()) { - conf.set(propEntry.getKey(), propEntry.getValue()); - } + if (dfsFileSystem == null) { + synchronized (this) { + if (dfsFileSystem == null) { + Configuration conf = new HdfsConfiguration(); + for (Map.Entry<String, String> propEntry : properties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + } - dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); + dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + operations = new HDFSFileOperations(dfsFileSystem); + } } - }); - - Preconditions.checkNotNull(dfsFileSystem); - operations = new HDFSFileOperations(dfsFileSystem); + } return dfsFileSystem; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org