JNSimba commented on a change in pull request #6256:
URL: https://github.com/apache/incubator-doris/pull/6256#discussion_r675323336



##########
File path: 
extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
##########
@@ -17,14 +17,77 @@
 
 package org.apache.doris.spark.sql
 
+import org.apache.commons.collections.CollectionUtils
+import org.apache.doris.spark.DorisStreamLoad
+import org.apache.doris.spark.exception.DorisException
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister, Filter, RelationProvider}
+import org.apache.spark.sql.types.StructType
 
-private[sql] class DorisSourceProvider extends DataSourceRegister with 
RelationProvider with Logging {
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
+
+private[sql] class DorisSourceProvider extends DataSourceRegister with 
RelationProvider with CreatableRelationProvider with Logging {
   override def shortName(): String = "doris"
 
   override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String]): BaseRelation = {
     new DorisRelation(sqlContext, Utils.params(parameters, log))
   }
+
+
+  /**
+   * df.save
+   */
+  override def createRelation(sqlContext: SQLContext,
+                              mode: SaveMode, parameters: Map[String, String],
+                              data: DataFrame): BaseRelation = {
+    val beHostPort: String = parameters.getOrElse(DorisOptions.beHostPort, 
throw new DorisException("beHostPort is empty"))
+
+    val dbName: String = parameters.getOrElse(DorisOptions.dbName, throw new 
DorisException("dbName is empty"))
+
+    val tbName: String = parameters.getOrElse(DorisOptions.tbName, throw new 
DorisException("tbName is empty"))
+
+    val user: String = parameters.getOrElse(DorisOptions.user, throw new 
DorisException("user is empty"))
+
+    val password: String = parameters.getOrElse(DorisOptions.password, throw 
new DorisException("password is empty"))
+
+    val maxRowCount: Long = parameters.getOrElse(DorisOptions.maxRowCount, 
"1024").toLong
+
+    val splitHost = beHostPort.split(";")
+    val choosedBeHost = splitHost(Random.nextInt(splitHost.length))
+    val dorisStreamLoader = new DorisStreamLoad(choosedBeHost, dbName, tbName, 
user, password)
+
+    data.foreachPartition(partition => {
+
+      val buffer = ListBuffer[String]()
+      partition.foreach(row => {
+        val rowString = row.toSeq.mkString("\t")

Review comment:
       Do need to consider the **Null** value ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to