This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 442ae632e3 [fix](fs-cache) add 'scheme://authority' to fs cache key (#22263) 442ae632e3 is described below commit 442ae632e3085f07921521f7c937bb42ec9e9713 Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Jul 27 23:53:54 2023 +0800 [fix](fs-cache) add 'scheme://authority' to fs cache key (#22263) This file system cache key should contains `scheme://authority`, eg: `hdfs//nameservices1`. Or it will encounter error: ``` Wrong FS: hdfs//abc/xxxx, expected: hdfs://def ``` --- .../doris/datasource/hive/HiveMetaStoreCache.java | 10 +++++----- .../java/org/apache/doris/fs/FileSystemCache.java | 14 ++++++++++---- .../java/org/apache/doris/fs/FileSystemFactory.java | 21 +++++++++++++++------ .../java/org/apache/doris/fs/FileSystemType.java | 3 ++- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 1db50c93fb..0bd190d945 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -366,7 +366,7 @@ public class HiveMetaStoreCache { FileCacheValue result = new FileCacheValue(); result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); try { // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -762,8 +762,8 @@ public class HiveMetaStoreCache { .getPath() : null; String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getLocationType(baseOrDeltaPath.toUri().toString()), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -784,7 +784,7 @@ public class HiveMetaStoreCache { for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter( @@ -802,7 +802,7 @@ public class HiveMetaStoreCache { if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getLocationType(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index aa6123d807..edc746ebe2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -18,6 +18,7 @@ package org.apache.doris.fs; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.datasource.CacheException; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -65,10 +66,13 @@ public class FileSystemCache { public static class FileSystemCacheKey { private final FileSystemType type; + // eg: hdfs://nameservices1 + private final String fsIdent; private final JobConf conf; - public FileSystemCacheKey(FileSystemType type, JobConf conf) { - this.type = type; + public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf) { + this.type = fs.first; + this.fsIdent = fs.second; this.conf = conf; } @@ -80,12 +84,14 @@ public class FileSystemCache { if (!(obj instanceof FileSystemCacheKey)) { return false; } - return type.equals(((FileSystemCacheKey) obj).type) && conf == ((FileSystemCacheKey) obj).conf; + return type.equals(((FileSystemCacheKey) obj).type) + && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) + && conf == ((FileSystemCacheKey) obj).conf; } @Override public int hashCode() { - return Objects.hash(conf, type); + return Objects.hash(conf, fsIdent, type); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index 73d0c19472..1c6217ff4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -19,6 +19,7 @@ package org.apache.doris.fs; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.S3Util; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -27,10 +28,12 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.fs.remote.dfs.JFSFileSystem; import org.apache.doris.fs.remote.dfs.OFSFileSystem; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -53,22 +56,28 @@ public class FileSystemFactory { } } - public static FileSystemType getLocationType(String location) { + public static Pair<FileSystemType, String> getFSIdentity(String location) { + FileSystemType fsType; if (S3Util.isObjStorage(location)) { if (S3Util.isHdfsOnOssEndpoint(location)) { // if hdfs service is enabled on oss, use hdfs lib to access oss. - return FileSystemType.DFS; + fsType = FileSystemType.DFS; } - return FileSystemType.S3; + fsType = FileSystemType.S3; } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) { - return FileSystemType.DFS; + fsType = FileSystemType.DFS; } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { - return FileSystemType.OFS; + fsType = FileSystemType.OFS; } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - return FileSystemType.JFS; + fsType = FileSystemType.JFS; } else { throw new UnsupportedOperationException("Unknown file system for location: " + location); } + + Path path = new Path(location); + URI uri = path.toUri(); + String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); + return Pair.of(fsType, fsIdent); } public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java index e3147943c2..5ddea01174 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java @@ -21,5 +21,6 @@ public enum FileSystemType { S3, DFS, OFS, - JFS + JFS, + FILE } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org