This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 00ff399 [feature] support overwrite save mode (#149) 00ff399 is described below commit 00ff39986e52072ab840f8f0dd1bab915f58dede Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Oct 25 18:13:53 2023 +0800 [feature] support overwrite save mode (#149) --- spark-doris-connector/pom.xml | 9 ++++ .../doris/spark/cfg/ConfigurationOptions.java | 1 + .../org/apache/doris/spark/jdbc/JdbcUtils.scala | 34 +++++++++++++ .../org/apache/doris/spark/sql/DorisRelation.scala | 3 +- .../doris/spark/sql/DorisSourceProvider.scala | 59 +++++++++++++++++++++- 5 files changed, 104 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 74a53cc..4148a66 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -184,6 +184,15 @@ <artifactId>jackson-core</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> + + <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j --> + <dependency> + <groupId>com.mysql</groupId> + <artifactId>mysql-connector-j</artifactId> + <version>8.0.33</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a144fb8..6498916 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -20,6 +20,7 @@ package org.apache.doris.spark.cfg; public interface ConfigurationOptions { // doris fe node address String DORIS_FENODES = "doris.fenodes"; + String DORIS_QUERY_PORT = "doris.query.port"; String DORIS_DEFAULT_CLUSTER = "default_cluster"; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala new file mode 100644 index 0000000..aab1032 --- /dev/null +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.jdbc + +import java.sql.{Connection, DriverManager} +import java.util.Properties + +object JdbcUtils { + + def getJdbcUrl(host: String, port: Int): String = s"jdbc:mysql://$host:$port/information_schema" + + def getConnection(url: String, props: Properties): Connection = { + + DriverManager.getConnection(url, props) + } + + def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table" + +} diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 049d5a2..fe7e63d 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -98,6 +98,7 @@ private[sql] class DorisRelation( } data.write.format(DorisSourceProvider.SHORT_NAME) .options(insertCfg) + .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .save() } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 94fab9e..ac04401 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -17,7 +17,10 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.cfg.SparkSettings +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.exception.DorisException +import org.apache.doris.spark.jdbc.JdbcUtils import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME import org.apache.doris.spark.writer.DorisWriter import org.apache.spark.SparkConf @@ -28,7 +31,10 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} +import java.util.Properties import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.util.control.Breaks +import scala.util.{Failure, Success, Try} private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider @@ -54,6 +60,13 @@ private[sql] class DorisSourceProvider extends DataSourceRegister val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) sparkSettings.merge(Utils.params(parameters, logger).asJava) + + mode match { + case SaveMode.Overwrite => + truncateTable(sparkSettings) + case _: SaveMode => // do nothing + } + // init stream loader val writer = new DorisWriter(sparkSettings) writer.write(data) @@ -79,6 +92,50 @@ private[sql] class DorisSourceProvider extends DataSourceRegister sparkSettings.merge(Utils.params(parameters, logger).asJava) new DorisStreamLoadSink(sqlContext, sparkSettings) } + + private def truncateTable(sparkSettings: SparkSettings): Unit = { + + val feNodes = sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES) + val port = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_QUERY_PORT) + require(feNodes != null && feNodes.nonEmpty, "doris.fenodes cannot be null or empty") + require(port != null, "doris.query.port cannot be null") + val feNodesArr = feNodes.split(",") + val breaks = new Breaks + + var success = false + var exOption: Option[Exception] = None + + breaks.breakable { + feNodesArr.foreach(feNode => { + Try { + val host = feNode.split(":")(0) + val url = JdbcUtils.getJdbcUrl(host, port) + val props = new Properties() + props.setProperty("user", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER)) + props.setProperty("password", sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD)) + val conn = JdbcUtils.getConnection(url, props) + val statement = conn.createStatement() + val tableIdentifier = sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER) + val query = JdbcUtils.getTruncateQuery(tableIdentifier) + statement.execute(query) + success = true + logger.info(s"truncate table $tableIdentifier success") + } match { + case Success(_) => breaks.break() + case Failure(e: Exception) => + exOption = Some(e) + logger.warn(s"truncate table failed on $feNode, error: {}", ExceptionUtils.getStackTrace(e)) + } + }) + + } + + if (!success) { + throw new DorisException("truncate table failed", exOption.get) + } + + } + } object DorisSourceProvider { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org