This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 944c63c4e74 [fix](cosn) use s3 client to read cosn on BE side (#30835)
(#30881)
944c63c4e74 is described below
commit 944c63c4e74cef9af31ab5aa4b35c28ada427263
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Feb 6 09:57:07 2024 +0800
[fix](cosn) use s3 client to read cosn on BE side (#30835) (#30881)
bp #30835 #27874
---
.../org/apache/doris/common/util/LocationPath.java | 8 +-
.../java/org/apache/doris/common/util/S3Util.java | 138 ----------------
.../doris/datasource/hive/HiveMetaStoreCache.java | 34 ++--
.../datasource/property/PropertyConverter.java | 6 +-
.../org/apache/doris/fs/FileSystemFactory.java | 34 ----
.../doris/planner/external/FileQueryScanNode.java | 32 ----
.../doris/planner/external/HiveScanNode.java | 6 +-
.../doris/planner/external/hudi/HudiScanNode.java | 11 +-
.../planner/external/iceberg/IcebergScanNode.java | 12 +-
.../planner/external/paimon/PaimonScanNode.java | 4 +-
.../apache/doris/common/util/LocationPathTest.java | 178 +++++++++++++++++++++
.../org/apache/doris/common/util/S3UtilTest.java | 104 ------------
12 files changed, 222 insertions(+), 345 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index d56e67bb0d1..0ddba406cdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -297,10 +297,11 @@ public class LocationPath {
/**
* provide file type for BE.
+ *
* @param location the location is from fs.listFile
* @return on BE, we will use TFileType to get the suitable client to
access storage.
*/
- public static TFileType getTFileType(String location) {
+ public static TFileType getTFileTypeForBE(String location) {
if (location == null || location.isEmpty()) {
return null;
}
@@ -314,12 +315,13 @@ public class LocationPath {
case OBS:
case BOS:
case GCS:
+ // ATTN, for COSN, on FE side, use HadoopFS to access, but on
BE, use S3 client to access.
+ case COSN:
// now we only support S3 client for object storage on BE
return TFileType.FILE_S3;
case HDFS:
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
case VIEWFS:
- case COSN:
return TFileType.FILE_HDFS;
case GFS:
case JFS:
@@ -346,12 +348,12 @@ public class LocationPath {
case OBS:
case BOS:
case GCS:
+ case COSN:
// All storage will use s3 client to access on BE, so need
convert to s3
return new Path(convertToS3(location));
case HDFS:
case OSS_HDFS:
case VIEWFS:
- case COSN:
case GFS:
case JFS:
case OFS:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 98790bc9e83..2d40af321fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,18 +17,8 @@
package org.apache.doris.common.util;
-import org.apache.doris.catalog.HdfsResource;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -43,138 +33,10 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Map;
public class S3Util {
- private static final Logger LOG = LogManager.getLogger(S3Util.class);
-
- public static boolean isObjStorage(String location) {
- return isObjStorageUseS3Client(location)
- // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
- || (location.startsWith(FeConstants.FS_PREFIX_COS) &&
!location.startsWith(FeConstants.FS_PREFIX_COSN))
- || location.startsWith(FeConstants.FS_PREFIX_OSS)
- || location.startsWith(FeConstants.FS_PREFIX_OBS);
- }
-
- private static boolean isObjStorageUseS3Client(String location) {
- return location.startsWith(FeConstants.FS_PREFIX_S3)
- || location.startsWith(FeConstants.FS_PREFIX_S3A)
- || location.startsWith(FeConstants.FS_PREFIX_S3N)
- || location.startsWith(FeConstants.FS_PREFIX_GCS)
- || location.startsWith(FeConstants.FS_PREFIX_BOS);
- }
-
- private static boolean isS3EndPoint(String location, Map<String, String>
props) {
- if (props.containsKey(ObsProperties.ENDPOINT)
- || props.containsKey(OssProperties.ENDPOINT)
- || props.containsKey(CosProperties.ENDPOINT)) {
- return false;
- }
- // wide check range for the compatibility of s3 properties
- return (props.containsKey(S3Properties.ENDPOINT) ||
props.containsKey(S3Properties.Env.ENDPOINT))
- && isObjStorage(location);
- }
-
- /**
- * The converted path is used for FE to get metadata
- * @param location origin location
- * @return metadata location path. just convert when storage is compatible
with s3 client.
- */
- public static String convertToS3IfNecessary(String location, Map<String,
String> props) {
- LOG.debug("try convert location to s3 prefix: " + location);
- // include the check for multi locations and in a table, such as both
s3 and hdfs are in a table.
- if (isS3EndPoint(location, props) ||
isObjStorageUseS3Client(location)) {
- int pos = location.indexOf("://");
- if (pos == -1) {
- throw new RuntimeException("No '://' found in location: " +
location);
- }
- return "s3" + location.substring(pos);
- }
- return normalizedLocation(location, props);
- }
-
- private static String normalizedLocation(String location, Map<String,
String> props) {
- try {
- if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
- return normalizedHdfsPath(location, props);
- }
- return location;
- } catch (URISyntaxException | UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- private static String normalizedHdfsPath(String location, Map<String,
String> props)
- throws URISyntaxException, UnsupportedEncodingException {
- // Hive partition may contain special characters such as ' ', '<', '>'
and so on.
- // Need to encode these characters before creating URI.
- // But doesn't encode '/' and ':' so that we can get the correct uri
host.
- location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name()).replace("%2F", "/").replace("%3A", ":");
- URI normalizedUri = new URI(location);
- // compatible with 'hdfs:///' or 'hdfs:/'
- if (StringUtils.isEmpty(normalizedUri.getHost())) {
- location = URLDecoder.decode(location,
StandardCharsets.UTF_8.name());
- String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
- String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
- if (location.startsWith(brokenPrefix) &&
!location.startsWith(normalizedPrefix)) {
- location = location.replace(brokenPrefix, normalizedPrefix);
- }
- // Need add hdfs host to location
- String host = props.get(HdfsResource.DSF_NAMESERVICES);
- if (StringUtils.isNotEmpty(host)) {
- // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
- // Or hdfs:///abc to hdfs://name_service/abc
- // TODO: check host in path when the 'dfs.nameservices' has
multiple hosts
- return location.replace(normalizedPrefix, normalizedPrefix +
host + "/");
- } else {
- // 'hdfs://null/' equals the 'hdfs:///'
- if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
- // Do not support hdfs:///location
- throw new RuntimeException("Invalid location with empty
host: " + location);
- } else {
- // Replace 'hdfs://key/' to '/key/', try access local
NameNode on BE.
- return location.replace(normalizedPrefix, "/");
- }
- }
- }
- return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
- }
-
- /**
- * The converted path is used for BE
- * @param location origin split path
- * @return BE scan range path
- */
- public static Path toScanRangeLocation(String location, Map<String,
String> props) {
- // All storage will use s3 client on BE.
- if (isObjStorage(location)) {
- int pos = location.indexOf("://");
- if (pos == -1) {
- throw new RuntimeException("No '://' found in location: " +
location);
- }
- if (isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use oss location
- // example:
oss://examplebucket.cn-shanghai.oss-dls.aliyuncs.com/dir/file/0000.orc
- location = "oss" + location.substring(pos);
- } else {
- location = "s3" + location.substring(pos);
- }
- }
- return new Path(normalizedLocation(location, props));
- }
-
- public static boolean isHdfsOnOssEndpoint(String location) {
- // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
- //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
- return location.contains("oss-dls.aliyuncs");
- }
public static S3Client buildS3Client(URI endpoint, String region,
CloudCredential credential) {
StaticCredentialsProvider scp;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 72f413b5dec..3e98b887eeb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -34,14 +34,13 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.CacheBulkLoader;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.fs.FileSystemCache;
-import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -361,7 +360,7 @@ public class HiveMetaStoreCache {
String bindBrokerName) throws
UserException {
FileCacheValue result = new FileCacheValue();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
+ new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
try {
@@ -374,9 +373,10 @@ public class HiveMetaStoreCache {
//
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true,
true);
for (RemoteFile remoteFile : locatedFiles.files()) {
- Path srcPath = remoteFile.getPath();
- Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
- if (!convertedPath.toString().equals(srcPath.toString())) {
+ String srcPath = remoteFile.getPath().toString();
+ LocationPath locationPath = new LocationPath(srcPath,
catalog.getProperties());
+ Path convertedPath = locationPath.toScanRangeLocation();
+ if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
@@ -400,13 +400,12 @@ public class HiveMetaStoreCache {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- String finalLocation = S3Util.convertToS3IfNecessary(key.location,
- catalog.getCatalogProperty().getProperties());
+ Map<String, String> props =
catalog.getCatalogProperty().getProperties();
+ LocationPath finalLocation = new LocationPath(key.location, props);
// disable the fs cache in FileSystem, or it will always from new
FileSystem
// and save it in cache when calling
FileInputFormat.setInputPaths().
try {
- Path path = new Path(finalLocation);
- URI uri = path.toUri();
+ URI uri = finalLocation.getPath().toUri();
if (uri.getScheme() != null) {
String scheme = uri.getScheme();
updateJobConf("fs." + scheme + ".impl.disable.cache",
"true");
@@ -419,13 +418,13 @@ public class HiveMetaStoreCache {
} catch (Exception e) {
LOG.warn("unknown scheme in path: " + finalLocation, e);
}
- FileInputFormat.setInputPaths(jobConf, finalLocation);
+ FileInputFormat.setInputPaths(jobConf, finalLocation.get());
try {
FileCacheValue result;
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
if (key.useSelfSplitter) {
- result = getFileCache(finalLocation, inputFormat, jobConf,
+ result = getFileCache(finalLocation.get(), inputFormat,
jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
@@ -442,8 +441,9 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
- Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString(),
- catalog.getProperties());
+ String dataFilePath = fs.getPath().toString();
+ LocationPath locationPath = new
LocationPath(dataFilePath, catalog.getProperties());
+ Path splitFilePath =
locationPath.toScanRangeLocation();
result.addSplit(new FileSplit(splitFilePath,
fs.getStart(), fs.getLength(), -1, null, null));
}
}
@@ -815,7 +815,7 @@ public class HiveMetaStoreCache {
String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
-
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
+
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), jobConf,
bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
@@ -838,7 +838,7 @@ public class HiveMetaStoreCache {
String location = delta.getPath().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
- FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ LocationPath.getFSIdentity(location,
bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
if (delta.isDeleteDelta()) {
@@ -858,7 +858,7 @@ public class HiveMetaStoreCache {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
- FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ LocationPath.getFSIdentity(location,
bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
locatedFiles.files().stream().filter(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
index 39b68dda144..66396e2c337 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
@@ -17,7 +17,7 @@
package org.apache.doris.datasource.property;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.InitCatalogLog.Type;
@@ -301,7 +301,7 @@ public class PropertyConverter {
ossProperties.put("fs.oss.impl.disable.cache", "true");
ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss"));
boolean hdfsEnabled =
Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED,
"false"));
- if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
+ if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
// use endpoint or enable hdfs
rewriteHdfsOnOssProperties(ossProperties, endpoint);
}
@@ -321,7 +321,7 @@ public class PropertyConverter {
}
private static void rewriteHdfsOnOssProperties(Map<String, String>
ossProperties, String endpoint) {
- if (!S3Util.isHdfsOnOssEndpoint(endpoint)) {
+ if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) {
// just for robustness here, avoid wrong endpoint when oss-hdfs is
enabled.
// convert "oss-cn-beijing.aliyuncs.com" to
"cn-beijing.oss-dls.aliyuncs.com"
// reference link:
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index e54a73bbff3..63f552a8ab8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -18,9 +18,6 @@
package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.S3Util;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
@@ -28,12 +25,10 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.fs.remote.dfs.JFSFileSystem;
import org.apache.doris.fs.remote.dfs.OFSFileSystem;
-import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -56,35 +51,6 @@ public class FileSystemFactory {
}
}
- public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
- FileSystemType fsType;
- if (bindBrokerName != null) {
- fsType = FileSystemType.BROKER;
- } else if (S3Util.isObjStorage(location)) {
- if (S3Util.isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use hdfs lib to access
oss.
- fsType = FileSystemType.DFS;
- } else {
- fsType = FileSystemType.S3;
- }
- } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) ||
location.startsWith(FeConstants.FS_PREFIX_GFS)
- || location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
- fsType = FileSystemType.DFS;
- } else if (location.startsWith(FeConstants.FS_PREFIX_OFS) ||
location.startsWith(FeConstants.FS_PREFIX_COSN)) {
- // ofs:// and cosn:// use the same underlying file system: Tencent
Cloud HDFS, aka CHDFS)) {
- fsType = FileSystemType.OFS;
- } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
- fsType = FileSystemType.JFS;
- } else {
- throw new UnsupportedOperationException("Unknown file system for
location: " + location);
- }
-
- Path path = new Path(location);
- URI uri = path.toUri();
- String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
- return Pair.of(fsType, fsIdent);
- }
-
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Configuration conf,
String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index c2cc969cd6d..0293eac192f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -29,11 +29,9 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
-import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
@@ -75,7 +73,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
/**
@@ -452,33 +449,4 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract TableIf getTargetTable() throws UserException;
protected abstract Map<String, String> getLocationProperties() throws
UserException;
-
- protected static Optional<TFileType> getTFileType(String location) {
- if (location != null && !location.isEmpty()) {
- if (S3Util.isObjStorage(location)) {
- if (S3Util.isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use hdfs lib to
access oss.
- return Optional.of(TFileType.FILE_HDFS);
- }
- return Optional.of(TFileType.FILE_S3);
- } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
- return Optional.of(TFileType.FILE_LOCAL);
- } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- }
- }
- return Optional.empty();
- }
}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 58b93112477..0e1f3438c4a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -66,6 +67,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
@@ -337,8 +339,8 @@ public class HiveScanNode extends FileQueryScanNode {
if (bindBrokerName != null) {
return TFileType.FILE_BROKER;
}
- return getTFileType(location).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for hms
table " + hmsTable.getName()));
+ return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
hms table " + hmsTable.getName()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 4da4e85453b..9d601e71daa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -26,7 +26,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
@@ -43,7 +43,6 @@ import org.apache.doris.thrift.THudiFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -294,9 +293,11 @@ public class HudiScanNode extends HiveScanNode {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
- splits.add(new
FileSplit(S3Util.toScanRangeLocation(filePath, Maps.newHashMap()),
- 0, fileSize, fileSize, new String[0],
- partition.getPartitionValues()));
+ // Need add hdfs host to location
+ LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
+ Path splitFilePath = locationPath.toScanRangeLocation();
+ splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
+ new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName,
queryInstant).forEach(fileSlice -> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index eacf372e4fc..d88d18d91af 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -31,7 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.external.iceberg.util.IcebergUtils;
@@ -151,7 +151,9 @@ public class IcebergScanNode extends FileQueryScanNode {
for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
-
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath,
icebergSplit.getConfig()).toString());
+ LocationPath locationPath = new LocationPath(deleteFilePath,
icebergSplit.getConfig());
+ Path splitDeletePath = locationPath.toScanRangeLocation();
+ deleteFileDesc.setPath(splitDeletePath.toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -243,8 +245,8 @@ public class IcebergScanNode extends FileQueryScanNode {
// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
-
- Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
+ LocationPath locationPath = new LocationPath(dataFilePath,
source.getCatalog().getProperties());
+ Path finalDataFilePath = locationPath.toScanRangeLocation();
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
splitTask.start(),
@@ -345,7 +347,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws UserException {
final String fLocation = normalizeLocation(location);
- return getTFileType(fLocation).orElseThrow(() ->
+ return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
new DdlException("Unknown file location " + fLocation + " for
iceberg table " + icebergTable.name()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index cf5edb3b914..07de5aeda79 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -213,8 +213,8 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
- return
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for
paimon table "));
+ return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
paimon table "));
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
new file mode 100644
index 00000000000..71ee9100ffc
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.fs.FileSystemType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocationPathTest {
+
+ @Test
+ public void testHdfsLocationConvert() {
+ // non HA
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("hdfs://dir/file.path",
rangeProps);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
+
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+
+ // HA props
+ Map<String, String> props = new HashMap<>();
+ props.put("dfs.nameservices", "ns");
+ locationPath = new LocationPath("hdfs:///dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://")
+ && !locationPath.get().startsWith("hdfs:///"));
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://") &&
!beLocation.startsWith("hdfs:///"));
+
+ // nonstandard '/' for hdfs path
+ locationPath = new LocationPath("hdfs:/dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://"));
+
+ // empty ha nameservices
+ props.put("dfs.nameservices", "");
+ locationPath = new LocationPath("hdfs:/dir/file.path", props);
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(locationPath.get().startsWith("/dir")
+ && !locationPath.get().startsWith("hdfs://"));
+ Assertions.assertTrue(beLocation.startsWith("/dir") &&
!beLocation.startsWith("hdfs://"));
+ }
+
+
+ @Test
+ public void testJFSLocationConvert() {
+ String loc;
+ Map<String, String> rangeProps = new HashMap<>();
+
+ LocationPath locationPath = new LocationPath("jfs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("jfs://"));
+ // BE
+ loc = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(loc.startsWith("jfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first,
FileSystemType.JFS);
+ }
+
+ @Test
+ public void testGSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+
+ // use s3 client to access gs
+ LocationPath locationPath = new LocationPath("gs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("s3://"));
+ // BE
+ String beLoc = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLoc.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first,
FileSystemType.S3);
+ }
+
+ @Test
+ public void testOSSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("oss://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("oss://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+
+ locationPath = new
LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps);
+ // FE
+
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+
+ }
+
+ @Test
+ public void testCOSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("cos://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("cos://"));
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ // BE
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+
+ locationPath = new LocationPath("cosn://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+
+ locationPath = new LocationPath("ofs://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("ofs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.OFS);
+
+ // GFS is now equals to DFS
+ locationPath = new LocationPath("gfs://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("gfs://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("gfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+ }
+
+ @Test
+ public void testOBSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("obs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("obs://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+ }
+
+ @Test
+ public void testUnsupportedLocationConvert() {
+ // when use unknown location, pass to BE
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("unknown://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("unknown://"));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
deleted file mode 100644
index 70bad23e01f..00000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.util;
-
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.FileSystemType;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class S3UtilTest {
- @Test
- public void testLocationConvert() {
- String loc;
- loc = S3Util.convertToS3IfNecessary("hdfs://dir/file.path", new
HashMap<>());
- Assertions.assertTrue(loc.startsWith("hdfs://"));
-
- Map<String, String> props = new HashMap<>();
- props.put("dfs.nameservices", "ns");
- loc = S3Util.convertToS3IfNecessary("hdfs:///dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("hdfs://") &&
!loc.startsWith("hdfs:///"));
- loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- props.put("dfs.nameservices", "");
- loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("/dir") &&
!loc.startsWith("hdfs://"));
-
- loc = S3Util.convertToS3IfNecessary("oss://test.com", props);
- Assertions.assertTrue(loc.startsWith("oss://"));
-
- loc = S3Util.convertToS3IfNecessary("gcs://test.com", props);
- Assertions.assertTrue(loc.startsWith("gcs://"));
-
- loc = S3Util.convertToS3IfNecessary("cos://test.com", props);
- Assertions.assertTrue(loc.startsWith("cos://"));
-
- loc = S3Util.convertToS3IfNecessary("cosn://test.com", props);
- Assertions.assertTrue(loc.startsWith("cosn://"));
-
- loc = S3Util.convertToS3IfNecessary("obs://test.com", props);
- Assertions.assertTrue(loc.startsWith("obs://"));
- }
-
-
- @Test
- public void testScanRangeLocationConvert() throws Exception {
- String loc;
- Map<String, String> rangeProps = new HashMap<>();
- loc = S3Util.toScanRangeLocation("hdfs://dir/file.path",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.DFS);
-
- Map<String, String> props = new HashMap<>();
- props.put("dfs.nameservices", "ns");
- loc = S3Util.toScanRangeLocation("hdfs:///dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://") &&
!loc.startsWith("hdfs:///"));
- loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- props.put("dfs.nameservices", "");
- loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("/dir") &&
!loc.startsWith("hdfs://"));
-
- loc = S3Util.toScanRangeLocation("oss://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc =
S3Util.toScanRangeLocation("oss://test.oss-dls.aliyuncs.com/path",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("oss://test.oss-dls.aliyuncs"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.DFS);
-
- loc = S3Util.toScanRangeLocation("cos://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc = S3Util.toScanRangeLocation("cosn://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("cosn://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.OFS);
-
- loc = S3Util.toScanRangeLocation("obs://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc = S3Util.toScanRangeLocation("unknown://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("unknown://"));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]