This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 63690ce  [feature](spark load) support s3 (#266)
63690ce is described below

commit 63690ce3c3cdb39ae201a9be6076e58dbda406b5
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Feb 19 15:49:45 2025 +0800

    [feature](spark load) support s3 (#266)
---
 spark-load/pom.xml                                 |  2 +-
 .../java/org/apache/doris/common/DppResult.java    | 54 ++++++++++++---------
 .../java/org/apache/doris/SparkLoadRunner.java     | 55 ++++++++++++++++++++++
 .../java/org/apache/doris/common/Constants.java    |  6 +++
 .../{Constants.java => enums/StorageType.java}     | 15 ++----
 .../java/org/apache/doris/config/JobConfig.java    | 47 +++++++++---------
 .../java/org/apache/doris/load/job/PullLoader.java | 28 ++++++++++-
 spark-load/spark-load-dpp/pom.xml                  | 24 +---------
 .../apache/doris/load/loadv2/etl/SparkEtlJob.java  |  2 +-
 9 files changed, 151 insertions(+), 82 deletions(-)

diff --git a/spark-load/pom.xml b/spark-load/pom.xml
index 9a3279f..6b044b3 100644
--- a/spark-load/pom.xml
+++ b/spark-load/pom.xml
@@ -40,7 +40,7 @@
         <revision>25.0.0-SNAPSHOT</revision>
         <commons-codec.version>1.13</commons-codec.version>
         <commons-lang3.version>3.9</commons-lang3.version>
-        <hadoop.version>3.3.6</hadoop.version>
+        <hadoop.version>3.3.4</hadoop.version>
         <netty-all.version>4.1.104.Final</netty-all.version>
         <parquet.version>1.13.1</parquet.version>
         <commons-collections.version>3.2.2</commons-collections.version>
diff --git 
a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
 
b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
index 7a2a9cb..f839c70 100644
--- 
a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
+++ 
b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
@@ -27,25 +27,35 @@ import java.io.Serializable;
  */
 public class DppResult implements Serializable {
 
+    @JsonProperty(value = "is_success", required = true)
     public boolean isSuccess;
 
+    @JsonProperty(value = "failed_reason", required = true)
     public String failedReason;
 
+    @JsonProperty(value = "scanned_rows", required = true)
     public long scannedRows;
 
+    @JsonProperty(value = "file_number", required = true)
     public long fileNumber;
 
+    @JsonProperty(value = "file_size", required = true)
     public long fileSize;
 
+    @JsonProperty(value = "normal_rows", required = true)
     public long normalRows;
 
+    @JsonProperty(value = "abnormal_rows", required = true)
     public long abnormalRows;
 
+    @JsonProperty(value = "unselect_rows", required = true)
     public long unselectRows;
 
     // only part of abnormal rows will be returned
+    @JsonProperty("partial_abnormal_rows")
     public String partialAbnormalRows;
 
+    @JsonProperty("scanned_bytes")
     public long scannedBytes;
 
     public DppResult() {
@@ -61,27 +71,27 @@ public class DppResult implements Serializable {
         scannedBytes = 0;
     }
 
-    @JsonCreator
-    public DppResult(@JsonProperty(value = "is_success", required = true) 
boolean isSuccess,
-                              @JsonProperty(value = "failed_reason", required 
= true) String failedReason,
-                              @JsonProperty(value = "scanned_rows", required = 
true) long scannedRows,
-                              @JsonProperty(value = "file_number", required = 
true) long fileNumber,
-                              @JsonProperty(value = "file_size", required = 
true) long fileSize,
-                              @JsonProperty(value = "normal_rows", required = 
true) long normalRows,
-                              @JsonProperty(value = "abnormal_rows", required 
= true) long abnormalRows,
-                              @JsonProperty(value = "unselect_rows", required 
= true) long unselectRows,
-                              @JsonProperty("partial_abnormal_rows") String 
partialAbnormalRows,
-                              @JsonProperty("scanned_bytes") long 
scannedBytes) {
-        this.isSuccess = isSuccess;
-        this.failedReason = failedReason;
-        this.scannedRows = scannedRows;
-        this.fileNumber = fileNumber;
-        this.fileSize = fileSize;
-        this.normalRows = normalRows;
-        this.abnormalRows = abnormalRows;
-        this.unselectRows = unselectRows;
-        this.partialAbnormalRows = partialAbnormalRows;
-        this.scannedBytes = scannedBytes;
-    }
+    // @JsonCreator
+    // public DppResult(@JsonProperty(value = "is_success", required = true) 
boolean isSuccess,
+    //                           @JsonProperty(value = "failed_reason", 
required = true) String failedReason,
+    //                           @JsonProperty(value = "scanned_rows", 
required = true) long scannedRows,
+    //                           @JsonProperty(value = "file_number", required 
= true) long fileNumber,
+    //                           @JsonProperty(value = "file_size", required = 
true) long fileSize,
+    //                           @JsonProperty(value = "normal_rows", required 
= true) long normalRows,
+    //                           @JsonProperty(value = "abnormal_rows", 
required = true) long abnormalRows,
+    //                           @JsonProperty(value = "unselect_rows", 
required = true) long unselectRows,
+    //                           @JsonProperty("partial_abnormal_rows") String 
partialAbnormalRows,
+    //                           @JsonProperty("scanned_bytes") long 
scannedBytes) {
+    //     this.isSuccess = isSuccess;
+    //     this.failedReason = failedReason;
+    //     this.scannedRows = scannedRows;
+    //     this.fileNumber = fileNumber;
+    //     this.fileSize = fileSize;
+    //     this.normalRows = normalRows;
+    //     this.abnormalRows = abnormalRows;
+    //     this.unselectRows = unselectRows;
+    //     this.partialAbnormalRows = partialAbnormalRows;
+    //     this.scannedBytes = scannedBytes;
+    // }
 
 }
diff --git 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
index f792087..5c7329f 100644
--- 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
+++ 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
@@ -19,6 +19,8 @@ package org.apache.doris;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.doris.common.CommandLineOptions;
+import org.apache.doris.common.Constants;
+import org.apache.doris.common.enums.StorageType;
 import org.apache.doris.config.JobConfig;
 import org.apache.doris.load.LoaderFactory;
 import org.apache.doris.load.job.Loader;
@@ -40,6 +42,12 @@ import org.apache.log4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class SparkLoadRunner {
 
@@ -66,6 +74,7 @@ public class SparkLoadRunner {
 
         JobConfig jobConfig = readConfig(cmdOptions.getConfigPath());
         try {
+            handleS3Config(jobConfig);
             checkConfig(jobConfig);
         } catch (IllegalArgumentException e) {
             System.err.println("check config failed, msg: " + 
ExceptionUtils.getStackTrace(e));
@@ -92,6 +101,7 @@ public class SparkLoadRunner {
             } while (false);
 
             loader.afterFinished();
+            // loader.cancel();
 
         } catch (Exception e) {
             loader.afterFailed(e);
@@ -146,4 +156,49 @@ public class SparkLoadRunner {
         jobConfig.checkHadoopProperties();
     }
 
+    private static void handleS3Config(JobConfig jobConfig) {
+        URI uri = URI.create(jobConfig.getWorkingDir());
+        if (uri.getScheme().equalsIgnoreCase("s3")) {
+
+            Map<String, String> hadoopProperties = new 
HashMap<>(jobConfig.getHadoopProperties());
+            
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ENDPOINT),
 "s3.endpoint is empty");
+            
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_REGION), 
"s3.region is empty");
+            
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ACCESS_KEY),
 "s3.access_key is empty");
+            
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_SECRET_KEY),
 "s3.secret_key is empty");
+
+            hadoopProperties.put("fs.s3a.endpoint", 
hadoopProperties.get(Constants.S3_ENDPOINT));
+            hadoopProperties.remove(Constants.S3_ENDPOINT);
+            hadoopProperties.put("fs.s3a.endpoint.region", 
hadoopProperties.get(Constants.S3_REGION));
+            hadoopProperties.remove(Constants.S3_REGION);
+            hadoopProperties.put("fs.s3a.access.key", 
hadoopProperties.get(Constants.S3_ACCESS_KEY));
+            hadoopProperties.remove(Constants.S3_ACCESS_KEY);
+            hadoopProperties.put("fs.s3a.secret.key", 
hadoopProperties.get(Constants.S3_SECRET_KEY));
+            hadoopProperties.remove(Constants.S3_SECRET_KEY);
+            if (hadoopProperties.containsKey(Constants.S3_TOKEN)) {
+                hadoopProperties.put("fs.s3a.session.token", 
hadoopProperties.get(Constants.S3_TOKEN));
+                hadoopProperties.remove(Constants.S3_TOKEN);
+                hadoopProperties.put("fs.s3a.aws.credentials.provider", 
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
+            } else {
+                hadoopProperties.put("fs.s3a.aws.credentials.provider", 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+            }
+            hadoopProperties.put("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+            jobConfig.setHadoopProperties(hadoopProperties);
+
+            //  working dir path replace s3:// with s3a://
+            String resolvedWorkingDir = "s3a:" + uri.getSchemeSpecificPart();
+            jobConfig.setWorkingDir(resolvedWorkingDir);
+
+            // load task path replace s3:// with s3a://
+            for (String s : jobConfig.getLoadTasks().keySet()) {
+                JobConfig.TaskInfo taskInfo = jobConfig.getLoadTasks().get(s);
+                List<String> resolvedPaths = taskInfo.getPaths().stream()
+                        .map(path -> "s3a:" + 
URI.create(path).getSchemeSpecificPart())
+                        .collect(Collectors.toList());
+                taskInfo.setPaths(resolvedPaths);
+            }
+            jobConfig.setStorageType(StorageType.S3);
+
+        }
+    }
+
 }
diff --git 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
index a3e4803..56991ca 100644
--- 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
+++ 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
@@ -28,4 +28,10 @@ public interface Constants {
 
     String DEFAULT_CATALOG = "internal";
 
+    String S3_ENDPOINT = "s3.endpoint";
+    String S3_REGION = "s3.region";
+    String S3_ACCESS_KEY = "s3.access_key";
+    String S3_SECRET_KEY = "s3.secret_key";
+    String S3_TOKEN = "s3.session_token";
+
 }
diff --git 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
similarity index 63%
copy from 
spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
copy to 
spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
index a3e4803..761afb1 100644
--- 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
+++ 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
@@ -15,17 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common;
-
-public interface Constants {
-
-    String HIVE_METASTORE_URIS = "hive.metastore.uris";
-    String SPARK_STANDALONE_SCHEME = "spark";
-    String HADOOP_AUTH_KERBEROS = "kerberos";
-    String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
-    String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
-    String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
-
-    String DEFAULT_CATALOG = "internal";
+package org.apache.doris.common.enums;
 
+public enum StorageType {
+    HDFS,S3;
 }
diff --git 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
index fb2f5cc..831e8ac 100644
--- 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
+++ 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
@@ -21,6 +21,7 @@ import org.apache.doris.SparkLoadRunner;
 import org.apache.doris.client.DorisClient;
 import org.apache.doris.common.Constants;
 import org.apache.doris.common.enums.LoadMode;
+import org.apache.doris.common.enums.StorageType;
 import org.apache.doris.common.enums.TaskType;
 import org.apache.doris.exception.SparkLoadException;
 
@@ -73,6 +74,8 @@ public class JobConfig {
 
     private Map<String, String> env = Collections.emptyMap();
 
+    private StorageType storageType = StorageType.HDFS;
+
     @Data
     public static class TaskInfo {
 
@@ -237,29 +240,31 @@ public class JobConfig {
         if (hadoopProperties == null || hadoopProperties.isEmpty()) {
             return;
         }
-        if (!hadoopProperties.containsKey("fs.defaultFS")) {
-            throw new IllegalArgumentException("fs.defaultFS is empty");
-        }
-        // check auth
-        if (hadoopProperties.containsKey("hadoop.security.authentication")
-                && 
StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"),
 "kerberos")) {
-            if (hadoopProperties.containsKey("hadoop.kerberos.principal")) {
-                if 
(StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) {
-                    throw new IllegalArgumentException("hadoop kerberos 
principal is empty");
-                }
-                if (hadoopProperties.containsKey("hadoop.kerberos.keytab")) {
-                    if 
(!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) {
-                        throw new IllegalArgumentException("hadoop kerberos 
keytab file is not exists, path: "
-                                + 
hadoopProperties.get("hadoop.kerberos.keytab"));
+        if (!workingDir.startsWith("s3")) {
+            if (!hadoopProperties.containsKey("fs.defaultFS")) {
+                throw new IllegalArgumentException("fs.defaultFS is empty");
+            }
+            // check auth
+            if (hadoopProperties.containsKey("hadoop.security.authentication")
+                    && 
StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"),
 "kerberos")) {
+                if (hadoopProperties.containsKey("hadoop.kerberos.principal")) 
{
+                    if 
(StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) {
+                        throw new IllegalArgumentException("hadoop kerberos 
principal is empty");
                     }
-                    return;
+                    if 
(hadoopProperties.containsKey("hadoop.kerberos.keytab")) {
+                        if 
(!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) {
+                            throw new IllegalArgumentException("hadoop 
kerberos keytab file is not exists, path: "
+                                    + 
hadoopProperties.get("hadoop.kerberos.keytab"));
+                        }
+                        return;
+                    }
+                    throw new IllegalArgumentException("hadoop.kerberos.keytab 
is not set");
+                }
+                throw new IllegalArgumentException("hadoop.kerberos.principal 
is not set");
+            } else {
+                if (!hadoopProperties.containsKey("hadoop.username")) {
+                    throw new IllegalArgumentException("hadoop username is 
empty");
                 }
-                throw new IllegalArgumentException("hadoop.kerberos.keytab is 
not set");
-            }
-            throw new IllegalArgumentException("hadoop.kerberos.principal is 
not set");
-        } else {
-            if (!hadoopProperties.containsKey("hadoop.username")) {
-                throw new IllegalArgumentException("hadoop username is empty");
             }
         }
     }
diff --git 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
index 80491bf..40459a1 100644
--- 
a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
+++ 
b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Constants;
 import org.apache.doris.common.DppResult;
 import org.apache.doris.common.LoadInfo;
 import org.apache.doris.common.enums.JobStatus;
+import org.apache.doris.common.enums.StorageType;
 import org.apache.doris.common.meta.LoadMeta;
 import org.apache.doris.common.meta.TableMeta;
 import org.apache.doris.config.EtlJobConfig;
@@ -39,6 +40,7 @@ import org.apache.log4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
@@ -123,6 +125,7 @@ public class PullLoader extends Loader implements 
Recoverable {
         statusInfo.put("status", jobStatus.name());
         statusInfo.put("msg", "");
         statusInfo.put("appId", appHandle == null ? null : 
appHandle.getAppId());
+        statusInfo.put("storageType", jobConfig.getStorageType().name());
         try {
             String dppResultStr = null;
             int checkCnt = 0;
@@ -143,7 +146,7 @@ public class PullLoader extends Loader implements 
Recoverable {
             }
             statusInfo.put("dppResult", dppResultStr);
             statusInfo.put("filePathToSize", 
JsonUtils.writeValueAsString(getFilePathToSize()));
-            statusInfo.put("hadoopProperties", 
JsonUtils.writeValueAsString(jobConfig.getHadoopProperties()));
+            statusInfo.put("hadoopProperties", 
JsonUtils.writeValueAsString(getHadoopProperties()));
         } catch (IOException e) {
             throw new SparkLoadException("update job status failed", e);
         }
@@ -169,6 +172,20 @@ public class PullLoader extends Loader implements 
Recoverable {
         } while (true);
     }
 
+    private Map<String, String> getHadoopProperties() {
+        Map<String, String> hadoopProperties = new 
HashMap<>(jobConfig.getHadoopProperties());
+        if (jobConfig.getStorageType() == StorageType.S3) {
+            hadoopProperties.put("AWS_ENDPOINT", 
hadoopProperties.get("fs.s3a.endpoint"));
+            hadoopProperties.put("AWS_ACCESS_KEY", 
hadoopProperties.get("fs.s3a.access.key"));
+            hadoopProperties.put("AWS_SECRET_KEY", 
hadoopProperties.get("fs.s3a.secret.key"));
+            hadoopProperties.put("AWS_REGION", 
hadoopProperties.get("fs.s3a.endpoint.region"));
+            if (hadoopProperties.containsKey("fs.s3a.session.token")) {
+                hadoopProperties.put("AWS_TOKEN", 
hadoopProperties.get("fs.s3a.session.token"));
+            }
+        }
+        return hadoopProperties;
+    }
+
     @Override
     public void afterFailed(Exception e) {
         if (loadMeta == null) {
@@ -359,7 +376,14 @@ public class PullLoader extends Loader implements 
Recoverable {
                 if (fileStatus.isDirectory()) {
                     continue;
                 }
-                filePathToSize.put(fileStatus.getPath().toString(), 
fileStatus.getLen());
+                String filePath = fileStatus.getPath().toString();
+                if (jobConfig.getStorageType() == StorageType.S3) {
+                    URI uri = fileStatus.getPath().toUri();
+                    if (uri.getScheme() != null && 
uri.getScheme().startsWith("s3")) {
+                        filePath = "s3:" + uri.getSchemeSpecificPart();
+                    }
+                }
+                filePathToSize.put(filePath, fileStatus.getLen());
             }
         } catch (IOException e) {
             throw new SparkLoadException("get dpp result failed", e);
diff --git a/spark-load/spark-load-dpp/pom.xml 
b/spark-load/spark-load-dpp/pom.xml
index 5c99e96..7276c54 100644
--- a/spark-load/spark-load-dpp/pom.xml
+++ b/spark-load/spark-load-dpp/pom.xml
@@ -146,29 +146,7 @@ under the License.
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-                <!-- https://github.com/aws/aws-sdk-java/issues/1032 -->
-                <exclusion>
-                    <groupId>com.amazonaws</groupId>
-                    <artifactId>aws-java-sdk-s3</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.amazonaws</groupId>
-                    <artifactId>aws-java-sdk-bundle</artifactId>
-                </exclusion>
-            </exclusions>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
diff --git 
a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
 
b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
index 0330001..068535e 100644
--- 
a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
+++ 
b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -281,7 +281,7 @@ public class SparkEtlJob {
             new SparkEtlJob(args[0]).run();
         } catch (Exception e) {
             System.err.println("spark etl job run failed");
-            LOG.warn("", e);
+            LOG.error("spark etl job run failed", e);
             System.exit(-1);
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to