gnehil opened a new pull request, #135: URL: https://github.com/apache/doris-spark-connector/pull/135
# Proposed changes Issue Number: close #xxx ## Problem Summary: When writing doris through structured streaming, the following exception will occur: ```java 23/09/01 14:13:40 ERROR MicroBatchExecution: Query [id = c1d9450a-43ab-46fd-9826-10e0eb8ab1bc, runId = 21bd5cfe-84a1-487f-a2e7-ce27e2813691] terminated with error org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@56bd0218, KafkaV2[Subscribe[spark_streaming_test]], {"spark_streaming_test":{"0":13}}, {"spark_streaming_test":{"0":14}} at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:421) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:184) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:67) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:78) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3241) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3239) at org.apache.doris.spark.writer.DorisWriter.write(DorisWriter.scala:62) at org.apache.doris.spark.sql.DorisStreamLoadSink.addBatch(DorisStreamLoadSink.scala:37) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) ``` This is because when processing micro-batch data in sink, call `dataframe.rdd` to convert dataframe to `RDD` to process data. In this way, `Spark` will convert the job to batch mode, causing the streaming job to fail to start. The correct way is to obtain the RDD object processing data by calling `dataframe.queryExecution.toRdd`, which will trigger the query plan on each micro-batch to convert the metadata data into the final result `RDD`. And also, a new configuration option has been added, `doris.sink.streaming.passthrough`. This is a `boolean` type option, default value is `false`. By setting this option to `true`, the value of the first column is directly written to doris through stream load without processing. This method supports writing in both `csv` and `json` formats, but the `json` format only supports `read_json_by_line` mode. Example: - If you need to write a dataframe of structured data, the configuration method is the same as the batch mode. ```scala val sourceDf = spark.readStream. .format("your_own_stream_source") .load() val resultDf = sourceDf .<transformations> resultDf.writeStream .format("doris") .option("checkpointLocation", "/checkpoint") .option("doris.table.identifier", "db.table") .option("doris.fenodes", "127.0.0.1:8030") .option("user", "root") .option("password", "") .start() .awaitTermination() ``` - If you consume messages from kafka, and the message format happens to be a `csv` or `json` string that conforms to the stream load specification, you can set `doris.sink.streaming.passthrough` to `true` to write the message data directly. ```scala val sourceDf = spark.readStream. .format("your_own_stream_source") .load() // Extract value from kafka record data val resultDf = sourceDf.selectExpr("CAST(value AS STRING) AS value") resultDf.writeStream .format("doris") .option("checkpointLocation", "/checkpoint") .option("doris.table.identifier", "db.table") .option("doris.fenodes", "127.0.0.1:8030") .option("user", "root") .option("password", "") .option("doris.sink.streaming.passthrough", "true") .option("doris.sink.properties.format", "json") .option("doris.sink.properties.read_json_by_line", "true") .start() .awaitTermination() ``` ## Checklist(Required) 1. Does it affect the original behavior: (Yes/No/I Don't know) 2. Has unit tests been added: (Yes/No/No Need) 3. Has document been added or modified: (Yes/No/No Need) 4. Does it need to update dependencies: (Yes/No) 5. Are there any changes that cannot be rolled back: (Yes/No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org