This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit acf886938b70f614c1c8ad1676abf75c5719769d Author: chovy <zhaowei_3...@163.com> AuthorDate: Tue Sep 28 17:46:19 2021 +0800 [Feature] support spark connector sink stream data to doris (#6761) * [Feature] support spark connector sink stream data to doris * [Doc] Add spark-connector batch/stream writing instructions * add license and remove meaningless blanks code Co-authored-by: wei.zhao <wei.z...@aispeech.com> --- pom.xml | 6 + .../doris/spark/CachedDorisStreamLoadClient.java | 63 +++++++++++ .../org/apache/doris/spark/DorisStreamLoad.java | 37 ++++++- .../java/org/apache/doris/spark/cfg/Settings.java | 13 +++ .../org/apache/doris/spark/cfg/SparkSettings.java | 3 +- .../org/apache/doris/spark/rest/RestService.java | 9 +- .../doris/spark/sql/DorisSourceProvider.scala | 72 +++++------- .../apache/doris/spark/sql/DorisStreamWriter.scala | 122 +++++++++++++++++++++ .../doris/spark/sql/TestStreamSinkDoris.scala | 53 +++++++++ 9 files changed, 327 insertions(+), 51 deletions(-) diff --git a/pom.xml b/pom.xml index 5ba2c6e..e015f06 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,12 @@ </exclusions> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java new file mode 100644 index 0000000..01cada4 --- /dev/null +++ b/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.doris.spark.exception.DorisException; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * a cached streamload client for each partition + */ +public class CachedDorisStreamLoadClient { + private static final long cacheExpireTimeout = 30 * 60; + private static LoadingCache<SparkSettings, DorisStreamLoad> dorisStreamLoadLoadingCache; + + static { + dorisStreamLoadLoadingCache = CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS) + .removalListener(new RemovalListener<Object, Object>() { + @Override + public void onRemoval(RemovalNotification<Object, Object> removalNotification) { + //do nothing + } + }) + .build( + new CacheLoader<SparkSettings, DorisStreamLoad>() { + @Override + public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException { + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings); + return dorisStreamLoad; + } + } + ); + } + + public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException { + DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings); + return dorisStreamLoad; + } +} diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 0de3746..dcf569f 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -17,7 +17,11 @@ package org.apache.doris.spark; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.StreamLoadException; +import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.RespContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,12 +40,16 @@ import java.util.Arrays; import java.util.Base64; import java.util.Calendar; import java.util.List; +import java.util.StringJoiner; import java.util.UUID; /** * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable{ + public static final String FIELD_DELIMITER = "\t"; + public static final String LINE_DELIMITER = "\n"; + public static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -65,6 +73,18 @@ public class DorisStreamLoad implements Serializable{ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); } + public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException { + String hostPort = RestService.randomBackend(settings, LOG); + this.hostPort = hostPort; + String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); + this.db = dbTable[0]; + this.tbl = dbTable[1]; + this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER); + this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD); + this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); + this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + } + public String getLoadUrlStr() { return loadUrlStr; } @@ -84,7 +104,6 @@ public class DorisStreamLoad implements Serializable{ HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod("PUT"); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); conn.setRequestProperty("Authorization", "Basic " + authEncoding); conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); @@ -114,6 +133,22 @@ public class DorisStreamLoad implements Serializable{ } } + public void load(List<List<Object>> rows) throws StreamLoadException { + StringJoiner lines = new StringJoiner(LINE_DELIMITER); + for (List<Object> row : rows) { + StringJoiner line = new StringJoiner(FIELD_DELIMITER); + for (Object field : row) { + if (field == null) { + line.add(NULL_VALUE); + } else { + line.add(field.toString()); + } + } + lines.add(line.toString()); + } + load(lines.toString()); + } + public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); LOG.info("Streamload Response:{}",loadResponse); diff --git a/src/main/java/org/apache/doris/spark/cfg/Settings.java b/src/main/java/org/apache/doris/spark/cfg/Settings.java index 4677de7..4e376a4 100644 --- a/src/main/java/org/apache/doris/spark/cfg/Settings.java +++ b/src/main/java/org/apache/doris/spark/cfg/Settings.java @@ -98,4 +98,17 @@ public abstract class Settings { Properties copy = asProperties(); return IOUtils.propsToString(copy); } + + @Override + public int hashCode() { + return asProperties().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + return asProperties().equals(((Settings) obj).asProperties()); + } } diff --git a/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java b/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java index 6568485..39fcd75 100644 --- a/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java +++ b/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java @@ -24,9 +24,10 @@ import org.apache.spark.SparkConf; import com.google.common.base.Preconditions; import scala.Option; +import scala.Serializable; import scala.Tuple2; -public class SparkSettings extends Settings { +public class SparkSettings extends Settings implements Serializable { private final SparkConf cfg; diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 10126e8..0c9b5c4 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -65,7 +65,6 @@ import org.apache.doris.spark.rest.models.BackendRow; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.rest.models.Tablet; -import org.apache.doris.spark.sql.DorisWriterOption; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; @@ -476,17 +475,13 @@ public class RestService implements Serializable { /** * choice a Doris BE node to request. - * @param options configuration of request * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal */ @VisibleForTesting - public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException { - // set user auth - sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user()); - sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password()); - String feNodes = options.feHostPort(); + public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException { + String feNodes = sparkSettings.getProperty(DORIS_FENODES); String feNode = randomEndpoint(feNodes, logger); String beUrl = String.format("http://%s" + BACKENDS,feNode); HttpGet httpGet = new HttpGet(beUrl); diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index d8f951b..65f5250 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -17,25 +17,24 @@ package org.apache.doris.spark.sql -import java.io.IOException -import java.util.StringJoiner - -import org.apache.commons.collections.CollectionUtils import org.apache.doris.spark.DorisStreamLoad -import org.apache.doris.spark.cfg.SparkSettings -import org.apache.doris.spark.exception.DorisException -import org.apache.doris.spark.rest.RestService +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider} -import org.apache.spark.sql.types.StructType -import org.json4s.jackson.Json -import scala.collection.mutable.ListBuffer -import scala.util.Random +import java.io.IOException +import java.util +import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks -private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with Logging { +private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging { override def shortName(): String = "doris" override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { @@ -50,41 +49,29 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - val dorisWriterOption = DorisWriterOption(parameters) val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) - // choose available be node - val choosedBeHost = RestService.randomBackend(sparkSettings, dorisWriterOption, log) + sparkSettings.merge(Utils.params(parameters, log).asJava) // init stream loader - val dorisStreamLoader = new DorisStreamLoad(choosedBeHost, dorisWriterOption.dbName, dorisWriterOption.tbName, dorisWriterOption.user, dorisWriterOption.password) - val fieldDelimiter: String = "\t" - val lineDelimiter: String = "\n" - val NULL_VALUE: String = "\\N" + val dorisStreamLoader = new DorisStreamLoad(sparkSettings) - val maxRowCount = dorisWriterOption.maxRowCount - val maxRetryTimes = dorisWriterOption.maxRetryTimes + val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT) + val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT) data.rdd.foreachPartition(partition => { - - val buffer = ListBuffer[String]() + val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) partition.foreach(row => { - val value = new StringJoiner(fieldDelimiter) - // create one row string + val line: util.List[Object] = new util.ArrayList[Object]() for (i <- 0 until row.size) { val field = row.get(i) - if (field == null) { - value.add(NULL_VALUE) - } else { - value.add(field.toString) - } + line.add(field.asInstanceOf[AnyRef]) } - // add one row string to buffer - buffer += value.toString - if (buffer.size > maxRowCount) { + rowsBuffer.add(line) + if (rowsBuffer.size > maxRowCount) { flush } }) // flush buffer - if (buffer.nonEmpty) { + if (!rowsBuffer.isEmpty) { flush } @@ -98,16 +85,16 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP for (i <- 1 to maxRetryTimes) { try { - dorisStreamLoader.load(buffer.mkString(lineDelimiter)) - buffer.clear() + dorisStreamLoader.load(rowsBuffer) + rowsBuffer.clear() loop.break() } catch { case e: Exception => try { Thread.sleep(1000 * i) - dorisStreamLoader.load(buffer.mkString(lineDelimiter)) - buffer.clear() + dorisStreamLoader.load(rowsBuffer) + rowsBuffer.clear() } catch { case ex: InterruptedException => Thread.currentThread.interrupt() @@ -136,8 +123,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP } } - - - - + override def createStreamWriter(queryId: String, structType: StructType, outputMode: OutputMode, dataSourceOptions: DataSourceOptions): StreamWriter = { + val sparkSettings = new SparkSettings(new SparkConf()) + sparkSettings.merge(Utils.params(dataSourceOptions.asMap().toMap, log).asJava) + new DorisStreamWriter(sparkSettings) + } } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala b/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala new file mode 100644 index 0000000..60d2c78 --- /dev/null +++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} + +import java.io.IOException +import java.util +import scala.util.control.Breaks + +/** + * A [[StreamWriter]] for Apache Doris streaming writing. + * + * @param settings params for writing doris table + */ +class DorisStreamWriter(settings: SparkSettings) extends StreamWriter { + override def createWriterFactory(): DorisStreamWriterFactory = DorisStreamWriterFactory(settings) + + override def commit(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {} + + override def abort(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {} + +} + +/** + * A [[DataWriterFactory]] for Apache Doris streaming writing. Will be serialized and sent to executors to generate + * the per-task data writers. + * + * @param settings params for writing doris table + */ +case class DorisStreamWriterFactory(settings: SparkSettings) extends DataWriterFactory[Row] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + new DorisStreamDataWriter(settings) + } +} + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we + * don't need to really send one. + */ +case object DorisWriterCommitMessage extends WriterCommitMessage + +/** + * A [[DataWriter]] for Apache Doris streaming writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param settings params for writing doris table + */ +class DorisStreamDataWriter(settings: SparkSettings) extends DataWriter[Row] { + val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT) + val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT) + val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) + val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) + + override def write(row: Row): Unit = { + val line: util.List[Object] = new util.ArrayList[Object]() + for (i <- 0 until row.size) { + val field = row.get(i) + line.add(field.asInstanceOf[AnyRef]) + } + if (!line.isEmpty) { + rowsBuffer.add(line) + } + if (rowsBuffer.size >= maxRowCount) { + // commit when buffer is full + commit() + } + } + + override def commit(): WriterCommitMessage = { + // we don't commit request until rows-buffer received some rows + val loop = new Breaks + loop.breakable { + for (i <- 1 to maxRetryTimes) { + try { + if (!rowsBuffer.isEmpty) { + dorisStreamLoader.load(rowsBuffer) + } + rowsBuffer.clear() + loop.break() + } + catch { + case e: Exception => + try { + Thread.sleep(1000 * i) + if (!rowsBuffer.isEmpty) { + dorisStreamLoader.load(rowsBuffer) + } + rowsBuffer.clear() + } catch { + case ex: InterruptedException => + Thread.currentThread.interrupt() + throw new IOException("unable to flush; interrupted while doing another attempt", e) + } + } + } + } + DorisWriterCommitMessage + } + + override def abort(): Unit = { + } +} \ No newline at end of file diff --git a/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala b/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala new file mode 100644 index 0000000..c62c9fb --- /dev/null +++ b/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.spark.sql.SparkSession + +object TestStreamSinkDoris { + val kafkaServers = "" + val kafkaTopics = "" + val dorisFeNodes = "your_doris_host_port" + val dorisUser = "root" + val dorisPwd = "" + val dorisTable = "test.test_tbl" + + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder() + .master("local") + .getOrCreate() + + val dataFrame = sparkSession.readStream + .option("kafka.bootstrap.servers", kafkaServers) + .option("startingOffsets", "latest") + .option("subscribe", kafkaTopics) + .format("kafka") + .option("failOnDataLoss", false) + .load() + + dataFrame.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)") + .writeStream + .format("doris") + .option("checkpointLocation", "/tmp/test") + .option("doris.table.identifier", dorisTable) + .option("doris.fenodes", dorisFeNodes) + .option("user", dorisUser) + .option("password", dorisPwd) + .start().awaitTermination() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org