This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit 39e8483819c98d1ee2071c954085ac2c07f84904
Author: wei zhao <zhaowei_3...@163.com>
AuthorDate: Sat Oct 9 15:47:36 2021 +0800

    [Feature] support spark connector sink data using sql (#6796)
    
    Co-authored-by: wei.zhao <wei.z...@aispeech.com>
---
 .../org/apache/doris/spark/sql/DorisRelation.scala    | 19 +++++++++++++++++--
 .../apache/doris/spark/sql/DorisSourceProvider.scala  |  7 ++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 2f2a252..3e3616d 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 
 private[sql] class DorisRelation(
     val sqlContext: SQLContext, parameters: Map[String, String])
-    extends BaseRelation with TableScan with PrunedScan with 
PrunedFilteredScan {
+    extends BaseRelation with TableScan with PrunedScan with 
PrunedFilteredScan with InsertableRelation{
 
   private lazy val cfg = {
     val conf = new SparkSettings(sqlContext.sparkContext.getConf)
@@ -86,4 +86,19 @@ private[sql] class DorisRelation(
 
     new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, 
lazySchema)
   }
+
+  // Insert Table
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    //replace 'doris.request.auth.user' with 'user' and 
'doris.request.auth.password' with 'password'
+    val insertCfg = cfg.copy().asProperties().asScala.map {
+      case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v) =>
+        ("user", v)
+      case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v) =>
+        ("password", v)
+      case (k, v) => (k, v)
+    }
+    data.write.format(DorisSourceProvider.SHORT_NAME)
+      .options(insertCfg)
+      .save()
+  }
 }
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 65f5250..3ac087d 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -19,6 +19,7 @@ package org.apache.doris.spark.sql
 
 import org.apache.doris.spark.DorisStreamLoad
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -35,7 +36,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
 private[sql] class DorisSourceProvider extends DataSourceRegister with 
RelationProvider with CreatableRelationProvider with StreamWriteSupport with 
Logging {
-  override def shortName(): String = "doris"
+  override def shortName(): String = SHORT_NAME
 
   override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String]): BaseRelation = {
     new DorisRelation(sqlContext, Utils.params(parameters, log))
@@ -129,3 +130,7 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister with RelationP
     new DorisStreamWriter(sparkSettings)
   }
 }
+
+object DorisSourceProvider {
+  val SHORT_NAME: String = "doris"
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to