This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 63690ce [feature](spark load) support s3 (#266) 63690ce is described below commit 63690ce3c3cdb39ae201a9be6076e58dbda406b5 Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Feb 19 15:49:45 2025 +0800 [feature](spark load) support s3 (#266) --- spark-load/pom.xml | 2 +- .../java/org/apache/doris/common/DppResult.java | 54 ++++++++++++--------- .../java/org/apache/doris/SparkLoadRunner.java | 55 ++++++++++++++++++++++ .../java/org/apache/doris/common/Constants.java | 6 +++ .../{Constants.java => enums/StorageType.java} | 15 ++---- .../java/org/apache/doris/config/JobConfig.java | 47 +++++++++--------- .../java/org/apache/doris/load/job/PullLoader.java | 28 ++++++++++- spark-load/spark-load-dpp/pom.xml | 24 +--------- .../apache/doris/load/loadv2/etl/SparkEtlJob.java | 2 +- 9 files changed, 151 insertions(+), 82 deletions(-) diff --git a/spark-load/pom.xml b/spark-load/pom.xml index 9a3279f..6b044b3 100644 --- a/spark-load/pom.xml +++ b/spark-load/pom.xml @@ -40,7 +40,7 @@ <revision>25.0.0-SNAPSHOT</revision> <commons-codec.version>1.13</commons-codec.version> <commons-lang3.version>3.9</commons-lang3.version> - <hadoop.version>3.3.6</hadoop.version> + <hadoop.version>3.3.4</hadoop.version> <netty-all.version>4.1.104.Final</netty-all.version> <parquet.version>1.13.1</parquet.version> <commons-collections.version>3.2.2</commons-collections.version> diff --git a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java index 7a2a9cb..f839c70 100644 --- a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java +++ b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java @@ -27,25 +27,35 @@ import java.io.Serializable; */ public class DppResult implements Serializable { + @JsonProperty(value = "is_success", required = true) public boolean isSuccess; + @JsonProperty(value = "failed_reason", required = true) public String failedReason; + @JsonProperty(value = "scanned_rows", required = true) public long scannedRows; + @JsonProperty(value = "file_number", required = true) public long fileNumber; + @JsonProperty(value = "file_size", required = true) public long fileSize; + @JsonProperty(value = "normal_rows", required = true) public long normalRows; + @JsonProperty(value = "abnormal_rows", required = true) public long abnormalRows; + @JsonProperty(value = "unselect_rows", required = true) public long unselectRows; // only part of abnormal rows will be returned + @JsonProperty("partial_abnormal_rows") public String partialAbnormalRows; + @JsonProperty("scanned_bytes") public long scannedBytes; public DppResult() { @@ -61,27 +71,27 @@ public class DppResult implements Serializable { scannedBytes = 0; } - @JsonCreator - public DppResult(@JsonProperty(value = "is_success", required = true) boolean isSuccess, - @JsonProperty(value = "failed_reason", required = true) String failedReason, - @JsonProperty(value = "scanned_rows", required = true) long scannedRows, - @JsonProperty(value = "file_number", required = true) long fileNumber, - @JsonProperty(value = "file_size", required = true) long fileSize, - @JsonProperty(value = "normal_rows", required = true) long normalRows, - @JsonProperty(value = "abnormal_rows", required = true) long abnormalRows, - @JsonProperty(value = "unselect_rows", required = true) long unselectRows, - @JsonProperty("partial_abnormal_rows") String partialAbnormalRows, - @JsonProperty("scanned_bytes") long scannedBytes) { - this.isSuccess = isSuccess; - this.failedReason = failedReason; - this.scannedRows = scannedRows; - this.fileNumber = fileNumber; - this.fileSize = fileSize; - this.normalRows = normalRows; - this.abnormalRows = abnormalRows; - this.unselectRows = unselectRows; - this.partialAbnormalRows = partialAbnormalRows; - this.scannedBytes = scannedBytes; - } + // @JsonCreator + // public DppResult(@JsonProperty(value = "is_success", required = true) boolean isSuccess, + // @JsonProperty(value = "failed_reason", required = true) String failedReason, + // @JsonProperty(value = "scanned_rows", required = true) long scannedRows, + // @JsonProperty(value = "file_number", required = true) long fileNumber, + // @JsonProperty(value = "file_size", required = true) long fileSize, + // @JsonProperty(value = "normal_rows", required = true) long normalRows, + // @JsonProperty(value = "abnormal_rows", required = true) long abnormalRows, + // @JsonProperty(value = "unselect_rows", required = true) long unselectRows, + // @JsonProperty("partial_abnormal_rows") String partialAbnormalRows, + // @JsonProperty("scanned_bytes") long scannedBytes) { + // this.isSuccess = isSuccess; + // this.failedReason = failedReason; + // this.scannedRows = scannedRows; + // this.fileNumber = fileNumber; + // this.fileSize = fileSize; + // this.normalRows = normalRows; + // this.abnormalRows = abnormalRows; + // this.unselectRows = unselectRows; + // this.partialAbnormalRows = partialAbnormalRows; + // this.scannedBytes = scannedBytes; + // } } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java index f792087..5c7329f 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java @@ -19,6 +19,8 @@ package org.apache.doris; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.doris.common.CommandLineOptions; +import org.apache.doris.common.Constants; +import org.apache.doris.common.enums.StorageType; import org.apache.doris.config.JobConfig; import org.apache.doris.load.LoaderFactory; import org.apache.doris.load.job.Loader; @@ -40,6 +42,12 @@ import org.apache.log4j.Logger; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class SparkLoadRunner { @@ -66,6 +74,7 @@ public class SparkLoadRunner { JobConfig jobConfig = readConfig(cmdOptions.getConfigPath()); try { + handleS3Config(jobConfig); checkConfig(jobConfig); } catch (IllegalArgumentException e) { System.err.println("check config failed, msg: " + ExceptionUtils.getStackTrace(e)); @@ -92,6 +101,7 @@ public class SparkLoadRunner { } while (false); loader.afterFinished(); + // loader.cancel(); } catch (Exception e) { loader.afterFailed(e); @@ -146,4 +156,49 @@ public class SparkLoadRunner { jobConfig.checkHadoopProperties(); } + private static void handleS3Config(JobConfig jobConfig) { + URI uri = URI.create(jobConfig.getWorkingDir()); + if (uri.getScheme().equalsIgnoreCase("s3")) { + + Map<String, String> hadoopProperties = new HashMap<>(jobConfig.getHadoopProperties()); + Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ENDPOINT), "s3.endpoint is empty"); + Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_REGION), "s3.region is empty"); + Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ACCESS_KEY), "s3.access_key is empty"); + Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_SECRET_KEY), "s3.secret_key is empty"); + + hadoopProperties.put("fs.s3a.endpoint", hadoopProperties.get(Constants.S3_ENDPOINT)); + hadoopProperties.remove(Constants.S3_ENDPOINT); + hadoopProperties.put("fs.s3a.endpoint.region", hadoopProperties.get(Constants.S3_REGION)); + hadoopProperties.remove(Constants.S3_REGION); + hadoopProperties.put("fs.s3a.access.key", hadoopProperties.get(Constants.S3_ACCESS_KEY)); + hadoopProperties.remove(Constants.S3_ACCESS_KEY); + hadoopProperties.put("fs.s3a.secret.key", hadoopProperties.get(Constants.S3_SECRET_KEY)); + hadoopProperties.remove(Constants.S3_SECRET_KEY); + if (hadoopProperties.containsKey(Constants.S3_TOKEN)) { + hadoopProperties.put("fs.s3a.session.token", hadoopProperties.get(Constants.S3_TOKEN)); + hadoopProperties.remove(Constants.S3_TOKEN); + hadoopProperties.put("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"); + } else { + hadoopProperties.put("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + } + hadoopProperties.put("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + jobConfig.setHadoopProperties(hadoopProperties); + + // working dir path replace s3:// with s3a:// + String resolvedWorkingDir = "s3a:" + uri.getSchemeSpecificPart(); + jobConfig.setWorkingDir(resolvedWorkingDir); + + // load task path replace s3:// with s3a:// + for (String s : jobConfig.getLoadTasks().keySet()) { + JobConfig.TaskInfo taskInfo = jobConfig.getLoadTasks().get(s); + List<String> resolvedPaths = taskInfo.getPaths().stream() + .map(path -> "s3a:" + URI.create(path).getSchemeSpecificPart()) + .collect(Collectors.toList()); + taskInfo.setPaths(resolvedPaths); + } + jobConfig.setStorageType(StorageType.S3); + + } + } + } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java index a3e4803..56991ca 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java @@ -28,4 +28,10 @@ public interface Constants { String DEFAULT_CATALOG = "internal"; + String S3_ENDPOINT = "s3.endpoint"; + String S3_REGION = "s3.region"; + String S3_ACCESS_KEY = "s3.access_key"; + String S3_SECRET_KEY = "s3.secret_key"; + String S3_TOKEN = "s3.session_token"; + } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java similarity index 63% copy from spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java copy to spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java index a3e4803..761afb1 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java @@ -15,17 +15,8 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; - -public interface Constants { - - String HIVE_METASTORE_URIS = "hive.metastore.uris"; - String SPARK_STANDALONE_SCHEME = "spark"; - String HADOOP_AUTH_KERBEROS = "kerberos"; - String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; - String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; - String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; - - String DEFAULT_CATALOG = "internal"; +package org.apache.doris.common.enums; +public enum StorageType { + HDFS,S3; } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java index fb2f5cc..831e8ac 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java @@ -21,6 +21,7 @@ import org.apache.doris.SparkLoadRunner; import org.apache.doris.client.DorisClient; import org.apache.doris.common.Constants; import org.apache.doris.common.enums.LoadMode; +import org.apache.doris.common.enums.StorageType; import org.apache.doris.common.enums.TaskType; import org.apache.doris.exception.SparkLoadException; @@ -73,6 +74,8 @@ public class JobConfig { private Map<String, String> env = Collections.emptyMap(); + private StorageType storageType = StorageType.HDFS; + @Data public static class TaskInfo { @@ -237,29 +240,31 @@ public class JobConfig { if (hadoopProperties == null || hadoopProperties.isEmpty()) { return; } - if (!hadoopProperties.containsKey("fs.defaultFS")) { - throw new IllegalArgumentException("fs.defaultFS is empty"); - } - // check auth - if (hadoopProperties.containsKey("hadoop.security.authentication") - && StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"), "kerberos")) { - if (hadoopProperties.containsKey("hadoop.kerberos.principal")) { - if (StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) { - throw new IllegalArgumentException("hadoop kerberos principal is empty"); - } - if (hadoopProperties.containsKey("hadoop.kerberos.keytab")) { - if (!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) { - throw new IllegalArgumentException("hadoop kerberos keytab file is not exists, path: " - + hadoopProperties.get("hadoop.kerberos.keytab")); + if (!workingDir.startsWith("s3")) { + if (!hadoopProperties.containsKey("fs.defaultFS")) { + throw new IllegalArgumentException("fs.defaultFS is empty"); + } + // check auth + if (hadoopProperties.containsKey("hadoop.security.authentication") + && StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"), "kerberos")) { + if (hadoopProperties.containsKey("hadoop.kerberos.principal")) { + if (StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) { + throw new IllegalArgumentException("hadoop kerberos principal is empty"); } - return; + if (hadoopProperties.containsKey("hadoop.kerberos.keytab")) { + if (!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) { + throw new IllegalArgumentException("hadoop kerberos keytab file is not exists, path: " + + hadoopProperties.get("hadoop.kerberos.keytab")); + } + return; + } + throw new IllegalArgumentException("hadoop.kerberos.keytab is not set"); + } + throw new IllegalArgumentException("hadoop.kerberos.principal is not set"); + } else { + if (!hadoopProperties.containsKey("hadoop.username")) { + throw new IllegalArgumentException("hadoop username is empty"); } - throw new IllegalArgumentException("hadoop.kerberos.keytab is not set"); - } - throw new IllegalArgumentException("hadoop.kerberos.principal is not set"); - } else { - if (!hadoopProperties.containsKey("hadoop.username")) { - throw new IllegalArgumentException("hadoop username is empty"); } } } diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java index 80491bf..40459a1 100644 --- a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java +++ b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Constants; import org.apache.doris.common.DppResult; import org.apache.doris.common.LoadInfo; import org.apache.doris.common.enums.JobStatus; +import org.apache.doris.common.enums.StorageType; import org.apache.doris.common.meta.LoadMeta; import org.apache.doris.common.meta.TableMeta; import org.apache.doris.config.EtlJobConfig; @@ -39,6 +40,7 @@ import org.apache.log4j.Logger; import java.io.File; import java.io.IOException; +import java.net.URI; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -123,6 +125,7 @@ public class PullLoader extends Loader implements Recoverable { statusInfo.put("status", jobStatus.name()); statusInfo.put("msg", ""); statusInfo.put("appId", appHandle == null ? null : appHandle.getAppId()); + statusInfo.put("storageType", jobConfig.getStorageType().name()); try { String dppResultStr = null; int checkCnt = 0; @@ -143,7 +146,7 @@ public class PullLoader extends Loader implements Recoverable { } statusInfo.put("dppResult", dppResultStr); statusInfo.put("filePathToSize", JsonUtils.writeValueAsString(getFilePathToSize())); - statusInfo.put("hadoopProperties", JsonUtils.writeValueAsString(jobConfig.getHadoopProperties())); + statusInfo.put("hadoopProperties", JsonUtils.writeValueAsString(getHadoopProperties())); } catch (IOException e) { throw new SparkLoadException("update job status failed", e); } @@ -169,6 +172,20 @@ public class PullLoader extends Loader implements Recoverable { } while (true); } + private Map<String, String> getHadoopProperties() { + Map<String, String> hadoopProperties = new HashMap<>(jobConfig.getHadoopProperties()); + if (jobConfig.getStorageType() == StorageType.S3) { + hadoopProperties.put("AWS_ENDPOINT", hadoopProperties.get("fs.s3a.endpoint")); + hadoopProperties.put("AWS_ACCESS_KEY", hadoopProperties.get("fs.s3a.access.key")); + hadoopProperties.put("AWS_SECRET_KEY", hadoopProperties.get("fs.s3a.secret.key")); + hadoopProperties.put("AWS_REGION", hadoopProperties.get("fs.s3a.endpoint.region")); + if (hadoopProperties.containsKey("fs.s3a.session.token")) { + hadoopProperties.put("AWS_TOKEN", hadoopProperties.get("fs.s3a.session.token")); + } + } + return hadoopProperties; + } + @Override public void afterFailed(Exception e) { if (loadMeta == null) { @@ -359,7 +376,14 @@ public class PullLoader extends Loader implements Recoverable { if (fileStatus.isDirectory()) { continue; } - filePathToSize.put(fileStatus.getPath().toString(), fileStatus.getLen()); + String filePath = fileStatus.getPath().toString(); + if (jobConfig.getStorageType() == StorageType.S3) { + URI uri = fileStatus.getPath().toUri(); + if (uri.getScheme() != null && uri.getScheme().startsWith("s3")) { + filePath = "s3:" + uri.getSchemeSpecificPart(); + } + } + filePathToSize.put(filePath, fileStatus.getLen()); } } catch (IOException e) { throw new SparkLoadException("get dpp result failed", e); diff --git a/spark-load/spark-load-dpp/pom.xml b/spark-load/spark-load-dpp/pom.xml index 5c99e96..7276c54 100644 --- a/spark-load/spark-load-dpp/pom.xml +++ b/spark-load/spark-load-dpp/pom.xml @@ -146,29 +146,7 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <artifactId>servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - <!-- https://github.com/aws/aws-sdk-java/issues/1032 --> - <exclusion> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - </exclusion> - <exclusion> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-bundle</artifactId> - </exclusion> - </exclusions> + <scope>provided</scope> </dependency> <dependency> <groupId>com.amazonaws</groupId> diff --git a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java index 0330001..068535e 100644 --- a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java +++ b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -281,7 +281,7 @@ public class SparkEtlJob { new SparkEtlJob(args[0]).run(); } catch (Exception e) { System.err.println("spark etl job run failed"); - LOG.warn("", e); + LOG.error("spark etl job run failed", e); System.exit(-1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org