This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit c52825216cfe606c7bb85a164584cff41ef8394c Author: wei zhao <zhaowei_3...@163.com> AuthorDate: Mon Dec 6 10:29:33 2021 +0800 [Improvement](spark-connector) Add 'sink.batch.size' and 'sink.max-retries' options in spark-connector (#7281) Add `sink.batch.size` `sink.max-retries` options in `Doris Spark-connector`. Be consistent with `link-connector` options . eg: ```scala df.write .format("doris") // specify maximum number of lines in a single flushing .option("sink.batch.size",2048) // specify number of retries after writing failed .option("sink.max-retries",3) .save() ``` --- .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 8 ++++++++ .../scala/org/apache/doris/spark/sql/DorisSourceProvider.scala | 6 ++---- .../scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 4 ++-- .../scala/org/apache/doris/spark/sql/TestSparkConnector.scala | 4 ++++ 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 93b16f9..7ba46b2 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -65,4 +65,12 @@ public interface ConfigurationOptions { int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; String DORIS_WRITE_FIELDS = "doris.write.fields"; + + String SINK_BATCH_SIZE = "sink.batch.size"; + String DORIS_SINK_BATCH_SIZE = "doris.sink.batch.size"; + int SINK_BATCH_SIZE_DEFAULT = 1024; + + String SINK_MAX_RETRIES = "sink.max-retries"; + String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries"; + int SINK_MAX_RETRIES_DEFAULT = 3; } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index ee77ce6..12b7608 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -59,8 +59,8 @@ private[sql] class DorisSourceProvider extends DataSourceRegister // init stream loader val dorisStreamLoader = new DorisStreamLoad(sparkSettings) - val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT) - val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT) + val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) + val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) data.rdd.foreachPartition(partition => { val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) @@ -98,8 +98,6 @@ private[sql] class DorisSourceProvider extends DataSourceRegister case e: Exception => try { Thread.sleep(1000 * i) - dorisStreamLoader.load(rowsBuffer) - rowsBuffer.clear() } catch { case ex: InterruptedException => Thread.currentThread.interrupt() diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 409325d..b53a23a 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -32,8 +32,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName) @volatile private var latestBatchId = -1L - val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT) - val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT) + val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) + val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) override def addBatch(batchId: Long, data: DataFrame): Unit = { diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala index be54aa9..bdee013 100644 --- a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -67,6 +67,8 @@ class TestSparkConnector { .option("password", dorisPwd) //specify your field .option("doris.write.field", "name,gender") + .option("sink.batch.size",2) + .option("sink.max-retries",2) .save() session.stop() } @@ -108,6 +110,8 @@ class TestSparkConnector { .option("doris.fenodes", dorisFeNodes) .option("user", dorisUser) .option("password", dorisPwd) + .option("sink.batch.size",2) + .option("sink.max-retries",2) .start().awaitTermination() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org