This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ee2af05 [Improve](test) add structured streaming case (#302)
ee2af05 is described below
commit ee2af05310bce9793bbf099faf2f5097b2cb6a6c
Author: wudi <[email protected]>
AuthorDate: Tue Apr 1 14:24:07 2025 +0800
[Improve](test) add structured streaming case (#302)
---
.../spark/sql/DorisStreamingWriterITCase.scala | 116 +++++++++++++++++++++
.../spark/sql/DorisWriterFailoverITCase.scala | 8 +-
2 files changed, 123 insertions(+), 1 deletion(-)
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala
new file mode 100644
index 0000000..530fda2
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import
org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.junit.{Before, Test}
+import org.slf4j.LoggerFactory
+
+/**
+ * Doris Structured Streaming Test Case.
+ */
+class DorisStreamingWriterITCase extends AbstractContainerTestBase {
+
+ private val LOG =
LoggerFactory.getLogger(classOf[DorisStreamingWriterITCase])
+ val DATABASE = "test_doris_streaming"
+ val TABLE_WRITE_TBL_STREAM = "tbl_write_tbl_stream"
+
+ @Before
+ def setUp(): Unit = {
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ }
+
+ @Test
+ def testStreamingWriter(): Unit = {
+ initializeTable(TABLE_WRITE_TBL_STREAM, DataModel.DUPLICATE)
+ val spark = SparkSession.builder()
+ .appName("RateSourceExample")
+ .master("local[1]")
+ .getOrCreate()
+
+ val rateStream = spark.readStream
+ .format("rate")
+ .option("rowsPerSecond", 1)
+ .load()
+
+ //timestamp,value
+ val dorissink = rateStream.writeStream
+ .format("doris")
+ .option("checkpointLocation", "/tmp/checkpoint")
+ .option("doris.table.identifier", DATABASE + "." +
TABLE_WRITE_TBL_STREAM)
+ .option("doris.fenodes", getFenodes)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .start()
+
+ var totalCount = 0L
+ spark.streams.addListener(new StreamingQueryListener {
+ override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {}
+
+ override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
+ val progress: StreamingQueryProgress = event.progress
+ val numInputRows = progress.numInputRows
+ totalCount += numInputRows
+ println(s"Processed $numInputRows rows, Total: $totalCount")
+
+ if (totalCount >= 10) {
+ println(s"Total count reached 10, stopping query.")
+ dorissink.stop()
+ }
+ }
+
+ override def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit = {}
+ })
+
+ dorissink.awaitTermination()
+
+ spark.stop()
+ val cnt = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(DATABASE),
+ LOG,
+ String.format("SELECT count(1) FROM %s.%s", DATABASE,
TABLE_WRITE_TBL_STREAM),
+ 1
+ )
+ assert(cnt.get(0).toInt == totalCount)
+ }
+
+ private def initializeTable(table: String, dataModel: DataModel): Unit = {
+ val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + "`timestamp` DATETIME,\n"
+ + "`value` int %s\n"
+ + ") "
+ + " %s KEY(`timestamp`) "
+ + " DISTRIBUTED BY HASH(`timestamp`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
max, model))
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
index bbaf7bd..7c1d48f 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -51,6 +51,7 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
@Test
def testFailoverForRetry(): Unit = {
+ LOG.info("start to test testFailoverForRetry.")
initializeTable(TABLE_WRITE_TBL_RETRY, DataModel.DUPLICATE)
val session = SparkSession.builder().master("local[1]").getOrCreate()
val df = session.createDataFrame(Seq(
@@ -125,8 +126,9 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
*/
@Test
def testFailoverForTaskRetry(): Unit = {
+ LOG.info("start to test testFailoverForTaskRetry.")
initializeTable(TABLE_WRITE_TBL_TASK_RETRY, DataModel.DUPLICATE)
- val session = SparkSession.builder().master("local[1,100]").getOrCreate()
+ val session = SparkSession.builder().master("local[1,1000]").getOrCreate()
val df = session.createDataFrame(Seq(
("doris", "cn"),
("spark", "us"),
@@ -167,6 +169,7 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
try {
// query may be failed
result = ContainerUtils.executeSQLStatement(connection, LOG, query,
15).asScala.toList
+ Thread.sleep(10)
} catch {
case ex: Exception =>
LOG.error("Failed to query result, cause " + ex.getMessage)
@@ -184,6 +187,9 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
future.get(60, TimeUnit.SECONDS)
session.stop()
+
+ //make sure publish success
+ Thread.sleep(5000)
val actual = ContainerUtils.executeSQLStatement(
getDorisQueryConnection,
LOG,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]