This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 24c2fd4 [Improve] Use streaming style on grouping, transforming and flushing RDD partitions (#82) 24c2fd4 is described below commit 24c2fd4c33944f3802bbae4b81658b4d2d1fcaaf Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Wed Mar 29 10:58:33 2023 +0800 [Improve] Use streaming style on grouping, transforming and flushing RDD partitions (#82) * loop partition in scala style * update --- .../doris/spark/sql/DorisStreamLoadSink.scala | 87 +++++++++------------- 1 file changed, 35 insertions(+), 52 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index fea58ac..4796e4b 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -18,12 +18,14 @@ package org.apache.doris.spark.sql import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.sql.DorisWriterOptionKeys.maxRowCount import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.slf4j.{Logger, LoggerFactory} +import collection.JavaConverters._ import java.io.IOException import java.util import java.util.Objects @@ -33,7 +35,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName) @volatile private var latestBatchId = -1L - val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) + val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean @@ -55,62 +57,43 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) } - // write for each partition - resultRdd.foreachPartition(partition => { - val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) - partition.foreach(row => { - val line: util.List[Object] = new util.ArrayList[Object]() - for (i <- 0 until row.size) { - val field = row.get(i) - line.add(field.asInstanceOf[AnyRef]) - } - rowsBuffer.add(line) - if (rowsBuffer.size > maxRowCount - 1) { - flush - } + resultRdd + .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) + .foreachPartition(partition => { + partition + .grouped(batchSize) + .foreach(batch => flush(batch)) }) - // flush buffer - if (!rowsBuffer.isEmpty) { - flush - } - - /** - * flush data to Doris and do retry when flush error - * - */ - def flush(): Unit = { - val loop = new Breaks - var err: Exception = null - loop.breakable { - for (i <- 1 to maxRetryTimes) { - try { - dorisStreamLoader.loadV2(rowsBuffer) - rowsBuffer.clear() - Thread.sleep(batchInterValMs.longValue()) - loop.break() - } - catch { - case e: Exception => - try { - logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) - if (err == null) err = e - Thread.sleep(1000 * i) - } catch { - case ex: InterruptedException => - Thread.currentThread.interrupt() - throw new IOException("unable to flush; interrupted while doing another attempt", ex) - } - } - } - - if (!rowsBuffer.isEmpty) { - throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err) + /** + * flush data to Doris and do retry when flush error + * + */ + def flush(batch: Iterable[util.List[Object]]): Unit = { + val loop = new Breaks + var err: Exception = null + loop.breakable { + (1 to maxRetryTimes).foreach { i => + try { + dorisStreamLoader.loadV2(batch.toList.asJava) + Thread.sleep(batchInterValMs.longValue()) + loop.break() + } catch { + case e: Exception => + try { + logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) + if (err == null) err = e + Thread.sleep(1000 * i) + } catch { + case ex: InterruptedException => + Thread.currentThread.interrupt() + throw new IOException("unable to flush; interrupted while doing another attempt", ex) + } } + throw new IOException(s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err) } - } - }) + } } override def toString: String = "DorisStreamLoadSink" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org