This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f87f29e1ab [fix](multi-catalog)compatible with hdfs HA empty prefix (#22342) f87f29e1ab is described below commit f87f29e1abf7c5d6fd20ce25f3e6b3f1e61938c2 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Sun Jul 30 22:21:14 2023 +0800 [fix](multi-catalog)compatible with hdfs HA empty prefix (#22342) compatible with hdfs HA empty prefix for example: ’hdfs:///‘ will be replaced to ’hdfs://ha-nameservice/‘ --- .../org/apache/doris/catalog/HdfsResource.java | 1 + .../java/org/apache/doris/common/util/S3Util.java | 35 +++++++++++++++++++--- .../doris/datasource/hive/HiveMetaStoreCache.java | 7 +++-- .../planner/external/iceberg/IcebergScanNode.java | 7 +++-- .../planner/external/iceberg/IcebergSplit.java | 6 +++- 5 files changed, 45 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index cdfb169590..2b50ec63b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -53,6 +53,7 @@ public class HdfsResource extends Resource { public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static String DSF_NAMESERVICES = "dfs.nameservices"; + public static final String HDFS_PREFIX = "hdfs://"; @SerializedName(value = "properties") private Map<String, String> properties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index 64c897c306..a47d838537 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -17,9 +17,11 @@ package org.apache.doris.common.util; +import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.credentials.CloudCredential; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +40,9 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; +import java.util.Map; public class S3Util { private static final Logger LOG = LogManager.getLogger(S3Util.class); @@ -63,7 +67,7 @@ public class S3Util { * @param location origin location * @return metadata location path. just convert when storage is compatible with s3 client. */ - public static String convertToS3IfNecessary(String location) { + public static String convertToS3IfNecessary(String location, Map<String, String> props) { LOG.debug("try convert location to s3 prefix: " + location); if (isObjStorageUseS3Client(location)) { int pos = location.indexOf("://"); @@ -72,7 +76,30 @@ public class S3Util { } return "s3" + location.substring(pos); } - return location; + return normalizedLocation(location, props); + } + + private static String normalizedLocation(String location, Map<String, String> props) { + try { + URI normalizedUri = new URI(location); + if (StringUtils.isEmpty(normalizedUri.getHost()) && location.startsWith(HdfsResource.HDFS_PREFIX)) { + // Need add hdfs host to location + String host = props.get(HdfsResource.DSF_NAMESERVICES); + if (StringUtils.isNotEmpty(host)) { + // Replace 'hdfs://' to 'hdfs://name_service', for example: hdfs:///abc to hdfs://name_service/abc + return location.replace(HdfsResource.HDFS_PREFIX, HdfsResource.HDFS_PREFIX + host); + } else { + // If no hadoop HA config + if (location.startsWith(HdfsResource.HDFS_PREFIX + '/')) { + // Do not support hdfs:///location + throw new RuntimeException("Invalid location with empty host: " + location); + } + } + } + return location; + } catch (URISyntaxException e) { + throw new RuntimeException(e.getMessage(), e); + } } /** @@ -80,7 +107,7 @@ public class S3Util { * @param location origin split path * @return BE scan range path */ - public static Path toScanRangeLocation(String location) { + public static Path toScanRangeLocation(String location, Map<String, String> props) { // All storage will use s3 client on BE. if (isObjStorage(location)) { int pos = location.indexOf("://"); @@ -95,7 +122,7 @@ public class S3Util { location = "s3" + location.substring(pos); } } - return new Path(location); + return new Path(normalizedLocation(location, props)); } public static boolean isHdfsOnOssEndpoint(String location) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 0bd190d945..e1fa35d07e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -378,7 +378,7 @@ public class HiveMetaStoreCache { RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true); for (RemoteFile remoteFile : locatedFiles.files()) { Path srcPath = remoteFile.getPath(); - Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString()); + Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties()); if (!convertedPath.toString().equals(srcPath.toString())) { remoteFile.setPath(convertedPath); } @@ -403,7 +403,7 @@ public class HiveMetaStoreCache { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - String finalLocation = S3Util.convertToS3IfNecessary(key.location); + String finalLocation = S3Util.convertToS3IfNecessary(key.location, catalog.getProperties()); // disable the fs cache in FileSystem, or it will always from new FileSystem // and save it in cache when calling FileInputFormat.setInputPaths(). try { @@ -437,7 +437,8 @@ public class HiveMetaStoreCache { for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); // todo: get modification time - Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString()); + Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString(), + catalog.getProperties()); result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 3d3634fb66..23bd919461 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -125,7 +125,7 @@ public class IcebergScanNode extends FileQueryScanNode { for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); String deleteFilePath = filter.getDeleteFilePath(); - deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString()); + deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, icebergSplit.getConfig()).toString()); if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { fileDesc.setContent(FileContent.POSITION_DELETES.id()); IcebergDeleteFileFilter.PositionDelete positionDelete = @@ -188,13 +188,14 @@ public class IcebergScanNode extends FileQueryScanNode { TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = splitTask.file().path().toString(); - Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath); + Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties()); IcebergSplit split = new IcebergSplit( finalDataFilePath, splitTask.start(), splitTask.length(), splitTask.file().fileSizeInBytes(), - new String[0]); + new String[0], + source.getCatalog().getProperties()); split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index 9064017088..de3f2ec6aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -23,16 +23,20 @@ import lombok.Data; import org.apache.hadoop.fs.Path; import java.util.List; +import java.util.Map; @Data public class IcebergSplit extends FileSplit { // File path will be changed if the file is modified, so there's no need to get modification time. - public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) { + public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, + Map<String, String> config) { super(file, start, length, fileLength, hosts, null); + this.config = config; } private Integer formatVersion; private List<IcebergDeleteFileFilter> deleteFileFilters; + private Map<String, String> config; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org