This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 999aeac  [improve] dateframe sink support csv format (#76)
999aeac is described below

commit 999aeac06b64bdebc573325e77581d844e17c23d
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Tue Mar 14 09:58:51 2023 +0800

    [improve] dateframe sink support csv format (#76)
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 26 ++++------
 .../doris/spark/sql/DorisStreamLoadSink.scala      | 60 ++++++++++------------
 2 files changed, 38 insertions(+), 48 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 3ada398..bda1c47 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -96,6 +96,11 @@ public class DorisStreamLoad implements Serializable{
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
+        fileType = this.streamLoadProp.get("format") == null ? "csv" : 
this.streamLoadProp.get("format");
+        if (fileType.equals("csv")){
+            FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == 
null ? "\t" : this.streamLoadProp.get("column_separator");
+            LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null 
? "\n" : this.streamLoadProp.get("line_delimiter");
+        }
     }
 
     public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws 
IOException, DorisException {
@@ -116,7 +121,7 @@ public class DorisStreamLoad implements Serializable{
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
         fileType = this.streamLoadProp.get("format") == null ? "csv" : 
this.streamLoadProp.get("format");
-        if (fileType.equals("csv")){
+        if ("csv".equals(fileType)){
             FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == 
null ? "\t" : this.streamLoadProp.get("column_separator");
             LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null 
? "\n" : this.streamLoadProp.get("line_delimiter");
         }
@@ -150,20 +155,13 @@ public class DorisStreamLoad implements Serializable{
         conn.setDoInput(true);
         if (streamLoadProp != null) {
             streamLoadProp.forEach((k, v) -> {
-                if (streamLoadProp.containsKey("format")) {
-                    return;
-                }
-                if (streamLoadProp.containsKey("strip_outer_array")) {
-                    return;
-                }
-                if (streamLoadProp.containsKey("read_json_by_line")) {
+                if ("read_json_by_line".equals(k)) {
                     return;
                 }
                 conn.addRequestProperty(k, v);
             });
         }
-        if (fileType.equals("json")){
-            conn.addRequestProperty("format", "json");
+        if (fileType.equals("json")) {
             conn.addRequestProperty("strip_outer_array", "true");
         }
         return conn;
@@ -182,11 +180,9 @@ public class DorisStreamLoad implements Serializable{
 
         @Override
         public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("status: ").append(status);
-            sb.append(", resp msg: ").append(respMsg);
-            sb.append(", resp content: ").append(respContent);
-            return sb.toString();
+            return "status: " + status +
+                    ", resp msg: " + respMsg +
+                    ", resp content: " + respContent;
         }
     }
 
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index a2e3ed1..fea58ac 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,15 +17,15 @@
 
 package org.apache.doris.spark.sql
 
-import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
 import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
+
 import java.io.IOException
-import org.apache.doris.spark.rest.RestService
+import java.util
 import java.util.Objects
 import scala.util.control.Breaks
 
@@ -45,39 +45,32 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
     if (batchId <= latestBatchId) {
       logger.info(s"Skipping already committed batch $batchId")
     } else {
-      write(data.queryExecution)
+      write(data.rdd)
       latestBatchId = batchId
     }
   }
 
-  def write(queryExecution: QueryExecution): Unit = {
-    val schema = queryExecution.analyzed.output
-    var resultRdd = queryExecution.toRdd
+  def write(rdd: RDD[Row]): Unit = {
+    var resultRdd = rdd
     if (Objects.nonNull(sinkTaskPartitionSize)) {
       resultRdd = if (sinkTaskUseRepartition) 
resultRdd.repartition(sinkTaskPartitionSize) else 
resultRdd.coalesce(sinkTaskPartitionSize)
     }
     // write for each partition
-    resultRdd.foreachPartition(iter => {
-      val objectMapper = new ObjectMapper()
-      val rowArray = objectMapper.createArrayNode()
-      iter.foreach(row => {
-        val rowNode = objectMapper.createObjectNode()
-        for (i <- 0 until row.numFields) {
-          val colName = schema(i).name
-          val value = row.copy().getUTF8String(i)
-          if (value == null) {
-            rowNode.putNull(colName)
-          } else {
-            rowNode.put(colName, value.toString)
-          }
+    resultRdd.foreachPartition(partition => {
+      val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]](maxRowCount)
+      partition.foreach(row => {
+        val line: util.List[Object] = new util.ArrayList[Object]()
+        for (i <- 0 until row.size) {
+          val field = row.get(i)
+          line.add(field.asInstanceOf[AnyRef])
         }
-        rowArray.add(rowNode)
-        if (rowArray.size > maxRowCount - 1) {
+        rowsBuffer.add(line)
+        if (rowsBuffer.size > maxRowCount - 1) {
           flush
         }
       })
       // flush buffer
-      if (!rowArray.isEmpty) {
+      if (!rowsBuffer.isEmpty) {
         flush
       }
 
@@ -85,14 +78,15 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
        * flush data to Doris and do retry when flush error
        *
        */
-      def flush = {
+      def flush(): Unit = {
         val loop = new Breaks
+        var err: Exception = null
         loop.breakable {
 
-          for (i <- 0 to maxRetryTimes) {
+          for (i <- 1 to maxRetryTimes) {
             try {
-              dorisStreamLoader.load(rowArray.toString)
-              rowArray.removeAll()
+              dorisStreamLoader.loadV2(rowsBuffer)
+              rowsBuffer.clear()
               Thread.sleep(batchInterValMs.longValue())
               loop.break()
             }
@@ -100,21 +94,21 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
               case e: Exception =>
                 try {
                   logger.debug("Failed to load data on BE: {} node ", 
dorisStreamLoader.getLoadUrlStr)
+                  if (err == null) err = e
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
-                    logger.warn("Data that failed to load : " + 
rowArray.toString)
                     Thread.currentThread.interrupt()
-                    throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
+                    throw new IOException("unable to flush; interrupted while 
doing another attempt", ex)
                 }
             }
           }
 
-          if (!rowArray.isEmpty) {
-            logger.warn("Data that failed to load : " + rowArray.toString)
-            throw new IOException(s"Failed to load data on BE: 
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+          if (!rowsBuffer.isEmpty) {
+            throw new IOException(s"Failed to load ${maxRowCount} batch data 
on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max 
${maxRetryTimes} retry times.", err)
           }
         }
+
       }
     })
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to