This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c2fd4  [Improve] Use streaming style on grouping, transforming and 
flushing RDD partitions (#82)
24c2fd4 is described below

commit 24c2fd4c33944f3802bbae4b81658b4d2d1fcaaf
Author: Bowen Liang <liangbo...@gf.com.cn>
AuthorDate: Wed Mar 29 10:58:33 2023 +0800

    [Improve] Use streaming style on grouping, transforming and flushing RDD 
partitions (#82)
    
    * loop partition in scala style
    * update
---
 .../doris/spark/sql/DorisStreamLoadSink.scala      | 87 +++++++++-------------
 1 file changed, 35 insertions(+), 52 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index fea58ac..4796e4b 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -18,12 +18,14 @@
 package org.apache.doris.spark.sql
 
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.sql.DorisWriterOptionKeys.maxRowCount
 import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 
+import collection.JavaConverters._
 import java.io.IOException
 import java.util
 import java.util.Objects
@@ -33,7 +35,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
 
   private val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
   @volatile private var latestBatchId = -1L
-  val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+  val batchSize: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
   val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
   val sinkTaskPartitionSize = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
   val sinkTaskUseRepartition = 
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, 
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
@@ -55,62 +57,43 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
     if (Objects.nonNull(sinkTaskPartitionSize)) {
       resultRdd = if (sinkTaskUseRepartition) 
resultRdd.repartition(sinkTaskPartitionSize) else 
resultRdd.coalesce(sinkTaskPartitionSize)
     }
-    // write for each partition
-    resultRdd.foreachPartition(partition => {
-      val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]](maxRowCount)
-      partition.foreach(row => {
-        val line: util.List[Object] = new util.ArrayList[Object]()
-        for (i <- 0 until row.size) {
-          val field = row.get(i)
-          line.add(field.asInstanceOf[AnyRef])
-        }
-        rowsBuffer.add(line)
-        if (rowsBuffer.size > maxRowCount - 1) {
-          flush
-        }
+    resultRdd
+      .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+      .foreachPartition(partition => {
+        partition
+          .grouped(batchSize)
+          .foreach(batch => flush(batch))
       })
-      // flush buffer
-      if (!rowsBuffer.isEmpty) {
-        flush
-      }
-
-      /**
-       * flush data to Doris and do retry when flush error
-       *
-       */
-      def flush(): Unit = {
-        val loop = new Breaks
-        var err: Exception = null
-        loop.breakable {
 
-          for (i <- 1 to maxRetryTimes) {
-            try {
-              dorisStreamLoader.loadV2(rowsBuffer)
-              rowsBuffer.clear()
-              Thread.sleep(batchInterValMs.longValue())
-              loop.break()
-            }
-            catch {
-              case e: Exception =>
-                try {
-                  logger.debug("Failed to load data on BE: {} node ", 
dorisStreamLoader.getLoadUrlStr)
-                  if (err == null) err = e
-                  Thread.sleep(1000 * i)
-                } catch {
-                  case ex: InterruptedException =>
-                    Thread.currentThread.interrupt()
-                    throw new IOException("unable to flush; interrupted while 
doing another attempt", ex)
-                }
-            }
-          }
-
-          if (!rowsBuffer.isEmpty) {
-            throw new IOException(s"Failed to load ${maxRowCount} batch data 
on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max 
${maxRetryTimes} retry times.", err)
+    /**
+     * flush data to Doris and do retry when flush error
+     *
+     */
+    def flush(batch: Iterable[util.List[Object]]): Unit = {
+      val loop = new Breaks
+      var err: Exception = null
+      loop.breakable {
+        (1 to maxRetryTimes).foreach { i =>
+          try {
+            dorisStreamLoader.loadV2(batch.toList.asJava)
+            Thread.sleep(batchInterValMs.longValue())
+            loop.break()
+          } catch {
+            case e: Exception =>
+              try {
+                logger.debug("Failed to load data on BE: {} node ", 
dorisStreamLoader.getLoadUrlStr)
+                if (err == null) err = e
+                Thread.sleep(1000 * i)
+              } catch {
+                case ex: InterruptedException =>
+                  Thread.currentThread.interrupt()
+                  throw new IOException("unable to flush; interrupted while 
doing another attempt", ex)
+              }
           }
+          throw new IOException(s"Failed to load $maxRowCount batch data on 
BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max 
${maxRetryTimes} retry times.", err)
         }
-
       }
-    })
+    }
   }
 
   override def toString: String = "DorisStreamLoadSink"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to