This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit c52825216cfe606c7bb85a164584cff41ef8394c
Author: wei zhao <zhaowei_3...@163.com>
AuthorDate: Mon Dec 6 10:29:33 2021 +0800

    [Improvement](spark-connector) Add 'sink.batch.size' and 'sink.max-retries' 
options in spark-connector (#7281)
    
    Add  `sink.batch.size` `sink.max-retries` options in `Doris 
Spark-connector`.
    Be consistent with `link-connector` options .
    eg:
    ```scala
       df.write
          .format("doris")
          // specify maximum number of lines in a single flushing
          .option("sink.batch.size",2048)
          // specify number of retries after writing failed
          .option("sink.max-retries",3)
          .save()
    ```
---
 .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java     | 8 ++++++++
 .../scala/org/apache/doris/spark/sql/DorisSourceProvider.scala    | 6 ++----
 .../scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala    | 4 ++--
 .../scala/org/apache/doris/spark/sql/TestSparkConnector.scala     | 4 ++++
 4 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java 
b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 93b16f9..7ba46b2 100644
--- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -65,4 +65,12 @@ public interface ConfigurationOptions {
     int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
 
     String DORIS_WRITE_FIELDS = "doris.write.fields";
+
+    String SINK_BATCH_SIZE = "sink.batch.size";
+    String DORIS_SINK_BATCH_SIZE = "doris.sink.batch.size";
+    int SINK_BATCH_SIZE_DEFAULT = 1024;
+
+    String SINK_MAX_RETRIES = "sink.max-retries";
+    String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
+    int SINK_MAX_RETRIES_DEFAULT = 3;
 }
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index ee77ce6..12b7608 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -59,8 +59,8 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
     // init stream loader
     val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
 
-    val maxRowCount = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, 
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
-    val maxRetryTimes = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, 
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
+    val maxRowCount = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+    val maxRetryTimes = 
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
 
     data.rdd.foreachPartition(partition => {
       val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]](maxRowCount)
@@ -98,8 +98,6 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister
               case e: Exception =>
                 try {
                   Thread.sleep(1000 * i)
-                  dorisStreamLoader.load(rowsBuffer)
-                  rowsBuffer.clear()
                 } catch {
                   case ex: InterruptedException =>
                     Thread.currentThread.interrupt()
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 409325d..b53a23a 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -32,8 +32,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: 
SQLContext, settings: SparkSe
 
   private val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
   @volatile private var latestBatchId = -1L
-  val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, 
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
-  val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, 
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
+  val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, 
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+  val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, 
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
   val dorisStreamLoader: DorisStreamLoad = 
CachedDorisStreamLoadClient.getOrCreate(settings)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala 
b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index be54aa9..bdee013 100644
--- a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -67,6 +67,8 @@ class TestSparkConnector {
       .option("password", dorisPwd)
       //specify your field
       .option("doris.write.field", "name,gender")
+      .option("sink.batch.size",2)
+      .option("sink.max-retries",2)
       .save()
     session.stop()
   }
@@ -108,6 +110,8 @@ class TestSparkConnector {
       .option("doris.fenodes", dorisFeNodes)
       .option("user", dorisUser)
       .option("password", dorisPwd)
+      .option("sink.batch.size",2)
+      .option("sink.max-retries",2)
       .start().awaitTermination()
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to