This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f87f29e1ab [fix](multi-catalog)compatible with hdfs HA empty prefix 
(#22342)
f87f29e1ab is described below

commit f87f29e1abf7c5d6fd20ce25f3e6b3f1e61938c2
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Sun Jul 30 22:21:14 2023 +0800

    [fix](multi-catalog)compatible with hdfs HA empty prefix (#22342)
    
    compatible with hdfs HA empty prefix
    for example: ’hdfs:///‘ will be replaced to ’hdfs://ha-nameservice/‘
---
 .../org/apache/doris/catalog/HdfsResource.java     |  1 +
 .../java/org/apache/doris/common/util/S3Util.java  | 35 +++++++++++++++++++---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  7 +++--
 .../planner/external/iceberg/IcebergScanNode.java  |  7 +++--
 .../planner/external/iceberg/IcebergSplit.java     |  6 +++-
 5 files changed, 45 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index cdfb169590..2b50ec63b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -53,6 +53,7 @@ public class HdfsResource extends Resource {
     public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
     public static String DSF_NAMESERVICES = "dfs.nameservices";
+    public static final String HDFS_PREFIX = "hdfs://";
 
     @SerializedName(value = "properties")
     private Map<String, String> properties;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 64c897c306..a47d838537 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.credentials.CloudCredential;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -38,7 +40,9 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3Configuration;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.Duration;
+import java.util.Map;
 
 public class S3Util {
     private static final Logger LOG = LogManager.getLogger(S3Util.class);
@@ -63,7 +67,7 @@ public class S3Util {
      * @param location origin location
      * @return metadata location path. just convert when storage is compatible 
with s3 client.
      */
-    public static String convertToS3IfNecessary(String location) {
+    public static String convertToS3IfNecessary(String location, Map<String, 
String> props) {
         LOG.debug("try convert location to s3 prefix: " + location);
         if (isObjStorageUseS3Client(location)) {
             int pos = location.indexOf("://");
@@ -72,7 +76,30 @@ public class S3Util {
             }
             return "s3" + location.substring(pos);
         }
-        return location;
+        return normalizedLocation(location, props);
+    }
+
+    private static String normalizedLocation(String location, Map<String, 
String> props) {
+        try {
+            URI normalizedUri = new URI(location);
+            if (StringUtils.isEmpty(normalizedUri.getHost()) && 
location.startsWith(HdfsResource.HDFS_PREFIX)) {
+                // Need add hdfs host to location
+                String host = props.get(HdfsResource.DSF_NAMESERVICES);
+                if (StringUtils.isNotEmpty(host)) {
+                    // Replace 'hdfs://' to 'hdfs://name_service', for 
example: hdfs:///abc to hdfs://name_service/abc
+                    return location.replace(HdfsResource.HDFS_PREFIX, 
HdfsResource.HDFS_PREFIX + host);
+                } else {
+                    // If no hadoop HA config
+                    if (location.startsWith(HdfsResource.HDFS_PREFIX + '/')) {
+                        // Do not support hdfs:///location
+                        throw new RuntimeException("Invalid location with 
empty host: " + location);
+                    }
+                }
+            }
+            return location;
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
     }
 
     /**
@@ -80,7 +107,7 @@ public class S3Util {
      * @param location origin split path
      * @return BE scan range path
      */
-    public static Path toScanRangeLocation(String location) {
+    public static Path toScanRangeLocation(String location, Map<String, 
String> props) {
         // All storage will use s3 client on BE.
         if (isObjStorage(location)) {
             int pos = location.indexOf("://");
@@ -95,7 +122,7 @@ public class S3Util {
                 location = "s3" + location.substring(pos);
             }
         }
-        return new Path(location);
+        return new Path(normalizedLocation(location, props));
     }
 
     public static boolean isHdfsOnOssEndpoint(String location) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 0bd190d945..e1fa35d07e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -378,7 +378,7 @@ public class HiveMetaStoreCache {
             RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, 
true);
             for (RemoteFile remoteFile : locatedFiles.files()) {
                 Path srcPath = remoteFile.getPath();
-                Path convertedPath = 
S3Util.toScanRangeLocation(srcPath.toString());
+                Path convertedPath = 
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
                 if (!convertedPath.toString().equals(srcPath.toString())) {
                     remoteFile.setPath(convertedPath);
                 }
@@ -403,7 +403,7 @@ public class HiveMetaStoreCache {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
-            String finalLocation = S3Util.convertToS3IfNecessary(key.location);
+            String finalLocation = S3Util.convertToS3IfNecessary(key.location, 
catalog.getProperties());
             // disable the fs cache in FileSystem, or it will always from new 
FileSystem
             // and save it in cache when calling 
FileInputFormat.setInputPaths().
             try {
@@ -437,7 +437,8 @@ public class HiveMetaStoreCache {
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
                         // todo: get modification time
-                        Path splitFilePath = 
S3Util.toScanRangeLocation(fs.getPath().toString());
+                        Path splitFilePath = 
S3Util.toScanRangeLocation(fs.getPath().toString(),
+                                    catalog.getProperties());
                         result.addSplit(new FileSplit(splitFilePath, 
fs.getStart(), fs.getLength(), -1, null, null));
                     }
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 3d3634fb66..23bd919461 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -125,7 +125,7 @@ public class IcebergScanNode extends FileQueryScanNode {
             for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
                 String deleteFilePath = filter.getDeleteFilePath();
-                
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
+                
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, 
icebergSplit.getConfig()).toString());
                 if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
                     fileDesc.setContent(FileContent.POSITION_DELETES.id());
                     IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -188,13 +188,14 @@ public class IcebergScanNode extends FileQueryScanNode {
                  TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = splitTask.file().path().toString();
-                Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath);
+                Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
                 IcebergSplit split = new IcebergSplit(
                         finalDataFilePath,
                         splitTask.start(),
                         splitTask.length(),
                         splitTask.file().fileSizeInBytes(),
-                        new String[0]);
+                        new String[0],
+                        source.getCatalog().getProperties());
                 split.setFormatVersion(formatVersion);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 9064017088..de3f2ec6aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -23,16 +23,20 @@ import lombok.Data;
 import org.apache.hadoop.fs.Path;
 
 import java.util.List;
+import java.util.Map;
 
 @Data
 public class IcebergSplit extends FileSplit {
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
-    public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts) {
+    public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
+                        Map<String, String> config) {
         super(file, start, length, fileLength, hosts, null);
+        this.config = config;
     }
 
     private Integer formatVersion;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
+    private Map<String, String> config;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to