This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ff399  [feature] support overwrite save mode (#149)
00ff399 is described below

commit 00ff39986e52072ab840f8f0dd1bab915f58dede
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Oct 25 18:13:53 2023 +0800

    [feature] support overwrite save mode (#149)
---
 spark-doris-connector/pom.xml                      |  9 ++++
 .../doris/spark/cfg/ConfigurationOptions.java      |  1 +
 .../org/apache/doris/spark/jdbc/JdbcUtils.scala    | 34 +++++++++++++
 .../org/apache/doris/spark/sql/DorisRelation.scala |  3 +-
 .../doris/spark/sql/DorisSourceProvider.scala      | 59 +++++++++++++++++++++-
 5 files changed, 104 insertions(+), 2 deletions(-)

diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 74a53cc..4148a66 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -184,6 +184,15 @@
             <artifactId>jackson-core</artifactId>
             <version>${fasterxml.jackson.version}</version>
         </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <version>8.0.33</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a144fb8..6498916 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -20,6 +20,7 @@ package org.apache.doris.spark.cfg;
 public interface ConfigurationOptions {
     // doris fe node address
     String DORIS_FENODES = "doris.fenodes";
+    String DORIS_QUERY_PORT = "doris.query.port";
 
     String DORIS_DEFAULT_CLUSTER = "default_cluster";
 
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000..aab1032
--- /dev/null
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.jdbc
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+object JdbcUtils {
+
+  def getJdbcUrl(host: String, port: Int): String = 
s"jdbc:mysql://$host:$port/information_schema"
+
+  def getConnection(url: String, props: Properties): Connection = {
+
+    DriverManager.getConnection(url, props)
+  }
+
+  def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table"
+
+}
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 049d5a2..fe7e63d 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -98,6 +98,7 @@ private[sql] class DorisRelation(
     }
     data.write.format(DorisSourceProvider.SHORT_NAME)
       .options(insertCfg)
+      .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
       .save()
   }
 }
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 94fab9e..ac04401 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,7 +17,10 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.exception.DorisException
+import org.apache.doris.spark.jdbc.JdbcUtils
 import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
 import org.apache.doris.spark.writer.DorisWriter
 import org.apache.spark.SparkConf
@@ -28,7 +31,10 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.slf4j.{Logger, LoggerFactory}
 
+import java.util.Properties
 import scala.collection.JavaConverters.mapAsJavaMapConverter
+import scala.util.control.Breaks
+import scala.util.{Failure, Success, Try}
 
 private[sql] class DorisSourceProvider extends DataSourceRegister
   with RelationProvider
@@ -54,6 +60,13 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
 
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
+
+    mode match {
+      case SaveMode.Overwrite =>
+        truncateTable(sparkSettings)
+      case _: SaveMode => // do nothing
+    }
+
     // init stream loader
     val writer = new DorisWriter(sparkSettings)
     writer.write(data)
@@ -79,6 +92,50 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
     new DorisStreamLoadSink(sqlContext, sparkSettings)
   }
+
+  private def truncateTable(sparkSettings: SparkSettings): Unit = {
+
+    val feNodes = sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES)
+    val port = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_QUERY_PORT)
+    require(feNodes != null && feNodes.nonEmpty, "doris.fenodes cannot be null 
or empty")
+    require(port != null, "doris.query.port cannot be null")
+    val feNodesArr = feNodes.split(",")
+    val breaks = new Breaks
+
+    var success = false
+    var exOption: Option[Exception] = None
+
+    breaks.breakable {
+      feNodesArr.foreach(feNode => {
+        Try {
+          val host = feNode.split(":")(0)
+          val url = JdbcUtils.getJdbcUrl(host, port)
+          val props = new Properties()
+          props.setProperty("user", 
sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER))
+          props.setProperty("password", 
sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD))
+          val conn = JdbcUtils.getConnection(url, props)
+          val statement = conn.createStatement()
+          val tableIdentifier = 
sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
+          val query = JdbcUtils.getTruncateQuery(tableIdentifier)
+          statement.execute(query)
+          success = true
+          logger.info(s"truncate table $tableIdentifier success")
+        } match {
+          case Success(_) => breaks.break()
+          case Failure(e: Exception) =>
+            exOption = Some(e)
+            logger.warn(s"truncate table failed on $feNode, error: {}", 
ExceptionUtils.getStackTrace(e))
+        }
+      })
+
+    }
+
+    if (!success) {
+      throw new DorisException("truncate table failed", exOption.get)
+    }
+
+  }
+
 }
 
 object DorisSourceProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to