This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6be6fcc7e78 [BugFix](MultiCatalog) Fix oss file location is not
avaiable in iceberg hadoop catalog (#30761) (#30992)
6be6fcc7e78 is described below
commit 6be6fcc7e7870e6013245f3f30350fee43a25819
Author: Kang <[email protected]>
AuthorDate: Thu Feb 8 16:43:07 2024 +0800
[BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg
hadoop catalog (#30761) (#30992)
---
.../org/apache/doris/common/util/LocationPath.java | 191 ++++++++++++---------
.../planner/external/iceberg/IcebergScanNode.java | 9 +-
.../apache/doris/common/util/LocationPathTest.java | 14 ++
3 files changed, 127 insertions(+), 87 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 0ddba406cdc..fd7da29e519 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -39,6 +39,8 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@@ -49,7 +51,7 @@ public class LocationPath {
private final LocationType locationType;
private final String location;
- enum LocationType {
+ public enum LocationType {
HDFS,
LOCAL, // Local File
BOS, // Baidu
@@ -66,7 +68,8 @@ public class LocationPath {
S3A,
S3N,
VIEWFS,
- UNKNOWN
+ UNKNOWN,
+ NOSCHEME // no scheme info
}
private LocationPath(String location) {
@@ -75,107 +78,123 @@ public class LocationPath {
public LocationPath(String location, Map<String, String> props) {
String scheme = parseScheme(location).toLowerCase();
- switch (scheme) {
- case FeConstants.FS_PREFIX_HDFS:
- locationType = LocationType.HDFS;
- // Need add hdfs host to location
- String host = props.get(HdfsResource.DSF_NAMESERVICES);
- this.location = normalizedHdfsPath(location, host);
- break;
- case FeConstants.FS_PREFIX_S3:
- locationType = LocationType.S3;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_S3A:
- locationType = LocationType.S3A;
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_S3N:
- // include the check for multi locations and in a table, such
as both s3 and hdfs are in a table.
- locationType = LocationType.S3N;
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_BOS:
- locationType = LocationType.BOS;
- // use s3 client to access
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_GCS:
- locationType = LocationType.GCS;
- // use s3 client to access
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_OSS:
- if (isHdfsOnOssEndpoint(location)) {
- locationType = LocationType.OSS_HDFS;
+ if (scheme.isEmpty()) {
+ locationType = LocationType.NOSCHEME;
+ this.location = location;
+ } else {
+ switch (scheme) {
+ case FeConstants.FS_PREFIX_HDFS:
+ locationType = LocationType.HDFS;
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ this.location = normalizedHdfsPath(location, host);
+ break;
+ case FeConstants.FS_PREFIX_S3:
+ locationType = LocationType.S3;
this.location = location;
- } else {
+ break;
+ case FeConstants.FS_PREFIX_S3A:
+ locationType = LocationType.S3A;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_S3N:
+ // include the check for multi locations and in a table,
such as both s3 and hdfs are in a table.
+ locationType = LocationType.S3N;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_BOS:
+ locationType = LocationType.BOS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_GCS:
+ locationType = LocationType.GCS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_OSS:
+ if (isHdfsOnOssEndpoint(location)) {
+ locationType = LocationType.OSS_HDFS;
+ this.location = location;
+ } else {
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OSS;
+ }
+ break;
+ case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
this.location = location;
}
- locationType = LocationType.OSS;
- }
- break;
- case FeConstants.FS_PREFIX_COS:
- if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
- } else {
+ locationType = LocationType.COS;
+ break;
+ case FeConstants.FS_PREFIX_OBS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OBS;
+ break;
+ case FeConstants.FS_PREFIX_OFS:
+ locationType = LocationType.OFS;
this.location = location;
- }
- locationType = LocationType.COS;
- break;
- case FeConstants.FS_PREFIX_OBS:
- if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
- } else {
+ break;
+ case FeConstants.FS_PREFIX_JFS:
+ locationType = LocationType.JFS;
this.location = location;
- }
- locationType = LocationType.OBS;
- break;
- case FeConstants.FS_PREFIX_OFS:
- locationType = LocationType.OFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_JFS:
- locationType = LocationType.JFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_GFS:
- locationType = LocationType.GFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_COSN:
- // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
- locationType = LocationType.COSN;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_VIEWFS:
- locationType = LocationType.VIEWFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_FILE:
- locationType = LocationType.LOCAL;
- this.location = location;
- break;
- default:
- locationType = LocationType.UNKNOWN;
- this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_GFS:
+ locationType = LocationType.GFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_COSN:
+ // if treat cosn(tencent hadoop-cos) as a s3 file system,
may bring incompatible issues
+ locationType = LocationType.COSN;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_VIEWFS:
+ locationType = LocationType.VIEWFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_FILE:
+ locationType = LocationType.LOCAL;
+ this.location = location;
+ break;
+ default:
+ locationType = LocationType.UNKNOWN;
+ this.location = location;
+ }
}
}
private static String parseScheme(String location) {
+ String scheme = "";
String[] schemeSplit = location.split(SCHEME_DELIM);
if (schemeSplit.length > 1) {
- return schemeSplit[0];
+ scheme = schemeSplit[0];
} else {
schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
if (schemeSplit.length > 1) {
- return schemeSplit[0];
+ scheme = schemeSplit[0];
}
- throw new IllegalArgumentException("Fail to parse scheme, invalid
location: " + location);
}
+
+ // if not get scheme, need consider /path/to/local to no scheme
+ if (scheme.isEmpty()) {
+ try {
+ Paths.get(location);
+ } catch (InvalidPathException exception) {
+ throw new IllegalArgumentException("Fail to parse scheme,
invalid location: " + location);
+ }
+ }
+
+ return scheme;
}
private boolean useS3EndPoint(Map<String, String> props) {
@@ -196,6 +215,7 @@ public class LocationPath {
/**
* The converted path is used for FE to get metadata
+ *
* @param location origin location
* @return metadata location path. just convert when storage is compatible
with s3 client.
*/
@@ -219,7 +239,7 @@ public class LocationPath {
// Need to encode these characters before creating URI.
// But doesn't encode '/' and ':' so that we can get the correct
uri host.
location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name())
- .replace("%2F", "/").replace("%3A", ":");
+ .replace("%2F", "/").replace("%3A", ":");
URI normalizedUri = new URI(location);
// compatible with 'hdfs:///' or 'hdfs:/'
if (StringUtils.isEmpty(normalizedUri.getHost())) {
@@ -336,6 +356,7 @@ public class LocationPath {
/**
* The converted path is used for BE
+ *
* @return BE scan range path
*/
public Path toScanRangeLocation() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index d88d18d91af..dbd1aab2392 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -353,11 +353,16 @@ public class IcebergScanNode extends FileQueryScanNode {
private String normalizeLocation(String location) {
Map<String, String> props = source.getCatalog().getProperties();
+ LocationPath locationPath = new LocationPath(location, props);
String icebergCatalogType =
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
if ("hadoop".equalsIgnoreCase(icebergCatalogType)) {
- if (!location.startsWith(HdfsResource.HDFS_PREFIX)) {
+ // if no scheme info, fill will HADOOP_FS_NAME
+ // if no HADOOP_FS_NAME, then should be local file system
+ if (locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME) {
String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
- location = fsName + location;
+ if (fsName != null) {
+ location = fsName + location;
+ }
}
}
return location;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index 71ee9100ffc..571826aa9c8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -171,8 +171,22 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("unknown://test.com",
rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
+ Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.UNKNOWN);
// BE
String beLocation = locationPath.toScanRangeLocation().toString();
Assertions.assertTrue(beLocation.startsWith("unknown://"));
}
+
+ @Test
+ public void testNoSchemeLocation() {
+ // when use unknown location, pass to BE
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("/path/to/local",
rangeProps);
+ // FE
+
Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local"));
+ Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME);
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]