This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 39e8483819c98d1ee2071c954085ac2c07f84904 Author: wei zhao <zhaowei_3...@163.com> AuthorDate: Sat Oct 9 15:47:36 2021 +0800 [Feature] support spark connector sink data using sql (#6796) Co-authored-by: wei.zhao <wei.z...@aispeech.com> --- .../org/apache/doris/spark/sql/DorisRelation.scala | 19 +++++++++++++++++-- .../apache/doris/spark/sql/DorisSourceProvider.scala | 7 ++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 2f2a252..3e3616d 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} private[sql] class DorisRelation( val sqlContext: SQLContext, parameters: Map[String, String]) - extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan { + extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation{ private lazy val cfg = { val conf = new SparkSettings(sqlContext.sparkContext.getConf) @@ -86,4 +86,19 @@ private[sql] class DorisRelation( new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, lazySchema) } + + // Insert Table + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + //replace 'doris.request.auth.user' with 'user' and 'doris.request.auth.password' with 'password' + val insertCfg = cfg.copy().asProperties().asScala.map { + case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v) => + ("user", v) + case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v) => + ("password", v) + case (k, v) => (k, v) + } + data.write.format(DorisSourceProvider.SHORT_NAME) + .options(insertCfg) + .save() + } } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 65f5250..3ac087d 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -19,6 +19,7 @@ package org.apache.doris.spark.sql import org.apache.doris.spark.DorisStreamLoad import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter @@ -35,7 +36,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging { - override def shortName(): String = "doris" + override def shortName(): String = SHORT_NAME override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { new DorisRelation(sqlContext, Utils.params(parameters, log)) @@ -129,3 +130,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP new DorisStreamWriter(sparkSettings) } } + +object DorisSourceProvider { + val SHORT_NAME: String = "doris" +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org