This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 999aeac [improve] dateframe sink support csv format (#76) 999aeac is described below commit 999aeac06b64bdebc573325e77581d844e17c23d Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Mar 14 09:58:51 2023 +0800 [improve] dateframe sink support csv format (#76) --- .../org/apache/doris/spark/DorisStreamLoad.java | 26 ++++------ .../doris/spark/sql/DorisStreamLoadSink.scala | 60 ++++++++++------------ 2 files changed, 38 insertions(+), 48 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 3ada398..bda1c47 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -96,6 +96,11 @@ public class DorisStreamLoad implements Serializable{ cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); + fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format"); + if (fileType.equals("csv")){ + FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator"); + LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter"); + } } public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException { @@ -116,7 +121,7 @@ public class DorisStreamLoad implements Serializable{ .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format"); - if (fileType.equals("csv")){ + if ("csv".equals(fileType)){ FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator"); LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter"); } @@ -150,20 +155,13 @@ public class DorisStreamLoad implements Serializable{ conn.setDoInput(true); if (streamLoadProp != null) { streamLoadProp.forEach((k, v) -> { - if (streamLoadProp.containsKey("format")) { - return; - } - if (streamLoadProp.containsKey("strip_outer_array")) { - return; - } - if (streamLoadProp.containsKey("read_json_by_line")) { + if ("read_json_by_line".equals(k)) { return; } conn.addRequestProperty(k, v); }); } - if (fileType.equals("json")){ - conn.addRequestProperty("format", "json"); + if (fileType.equals("json")) { conn.addRequestProperty("strip_outer_array", "true"); } return conn; @@ -182,11 +180,9 @@ public class DorisStreamLoad implements Serializable{ @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("status: ").append(status); - sb.append(", resp msg: ").append(respMsg); - sb.append(", resp content: ").append(respContent); - return sb.toString(); + return "status: " + status + + ", resp msg: " + respMsg + + ", resp content: " + respContent; } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index a2e3ed1..fea58ac 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -17,15 +17,15 @@ package org.apache.doris.spark.sql -import com.fasterxml.jackson.databind.ObjectMapper import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.slf4j.{Logger, LoggerFactory} + import java.io.IOException -import org.apache.doris.spark.rest.RestService +import java.util import java.util.Objects import scala.util.control.Breaks @@ -45,39 +45,32 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe if (batchId <= latestBatchId) { logger.info(s"Skipping already committed batch $batchId") } else { - write(data.queryExecution) + write(data.rdd) latestBatchId = batchId } } - def write(queryExecution: QueryExecution): Unit = { - val schema = queryExecution.analyzed.output - var resultRdd = queryExecution.toRdd + def write(rdd: RDD[Row]): Unit = { + var resultRdd = rdd if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) } // write for each partition - resultRdd.foreachPartition(iter => { - val objectMapper = new ObjectMapper() - val rowArray = objectMapper.createArrayNode() - iter.foreach(row => { - val rowNode = objectMapper.createObjectNode() - for (i <- 0 until row.numFields) { - val colName = schema(i).name - val value = row.copy().getUTF8String(i) - if (value == null) { - rowNode.putNull(colName) - } else { - rowNode.put(colName, value.toString) - } + resultRdd.foreachPartition(partition => { + val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) + partition.foreach(row => { + val line: util.List[Object] = new util.ArrayList[Object]() + for (i <- 0 until row.size) { + val field = row.get(i) + line.add(field.asInstanceOf[AnyRef]) } - rowArray.add(rowNode) - if (rowArray.size > maxRowCount - 1) { + rowsBuffer.add(line) + if (rowsBuffer.size > maxRowCount - 1) { flush } }) // flush buffer - if (!rowArray.isEmpty) { + if (!rowsBuffer.isEmpty) { flush } @@ -85,14 +78,15 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe * flush data to Doris and do retry when flush error * */ - def flush = { + def flush(): Unit = { val loop = new Breaks + var err: Exception = null loop.breakable { - for (i <- 0 to maxRetryTimes) { + for (i <- 1 to maxRetryTimes) { try { - dorisStreamLoader.load(rowArray.toString) - rowArray.removeAll() + dorisStreamLoader.loadV2(rowsBuffer) + rowsBuffer.clear() Thread.sleep(batchInterValMs.longValue()) loop.break() } @@ -100,21 +94,21 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe case e: Exception => try { logger.debug("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) + if (err == null) err = e Thread.sleep(1000 * i) } catch { case ex: InterruptedException => - logger.warn("Data that failed to load : " + rowArray.toString) Thread.currentThread.interrupt() - throw new IOException("unable to flush; interrupted while doing another attempt", e) + throw new IOException("unable to flush; interrupted while doing another attempt", ex) } } } - if (!rowArray.isEmpty) { - logger.warn("Data that failed to load : " + rowArray.toString) - throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.") + if (!rowsBuffer.isEmpty) { + throw new IOException(s"Failed to load ${maxRowCount} batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", err) } } + } }) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org