gnehil opened a new pull request, #135:
URL: https://github.com/apache/doris-spark-connector/pull/135

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   When writing doris through structured streaming, the following exception 
will occur:
   
   ```java
   23/09/01 14:13:40 ERROR MicroBatchExecution: Query [id = 
c1d9450a-43ab-46fd-9826-10e0eb8ab1bc, runId = 
21bd5cfe-84a1-487f-a2e7-ce27e2813691] terminated with error
   org.apache.spark.sql.AnalysisException: Queries with streaming sources must 
be executed with writeStream.start();
   StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, 
offset#11L, timestamp#12, timestampType#13], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@56bd0218, 
KafkaV2[Subscribe[spark_streaming_test]], {"spark_streaming_test":{"0":13}}, 
{"spark_streaming_test":{"0":14}}
   
        at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:421)
        at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
        at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:184)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
        at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
        at 
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:67)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:78)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:76)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:76)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3241)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3239)
        at org.apache.doris.spark.writer.DorisWriter.write(DorisWriter.scala:62)
        at 
org.apache.doris.spark.sql.DorisStreamLoadSink.addBatch(DorisStreamLoadSink.scala:37)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
   ```
   
   This is because when processing micro-batch data in sink, call 
`dataframe.rdd` to convert dataframe to `RDD` to process data. In this way, 
`Spark` will convert the job to batch mode, causing the streaming job to fail 
to start.
   
   The correct way is to obtain the RDD object processing data by calling 
`dataframe.queryExecution.toRdd`, which will trigger the query plan on each 
micro-batch to convert the metadata data into the final result `RDD`.
   
   And also, a new configuration option has been added, 
`doris.sink.streaming.passthrough`.
   This is a `boolean` type option, default value is `false`. By setting this 
option to `true`, the value of the first column is directly written to doris 
through stream load without processing.
   
   This method supports writing in both `csv` and `json` formats, but the 
`json` format only supports `read_json_by_line` mode.
   
   Example:
   
   - If you need to write a dataframe of structured data, the configuration 
method is the same as the batch mode.
   
   ```scala
   
   val sourceDf = spark.readStream.
          .format("your_own_stream_source")
          .load()
   
   val resultDf = sourceDf
          .<transformations>
   
   resultDf.writeStream
         .format("doris")
         .option("checkpointLocation", "/checkpoint")
         .option("doris.table.identifier", "db.table")
         .option("doris.fenodes", "127.0.0.1:8030")
         .option("user", "root")
         .option("password", "")
         .start()
         .awaitTermination()
   
   ```
   
   - If you consume messages from kafka, and the message format happens to be a 
`csv` or `json` string that conforms to the stream load specification, you can 
set `doris.sink.streaming.passthrough` to `true` to write the message data 
directly.
   
   ```scala
   
   val sourceDf = spark.readStream.
          .format("your_own_stream_source")
          .load()
   
   // Extract value from kafka record data
   val resultDf = sourceDf.selectExpr("CAST(value AS STRING) AS value")
   
   resultDf.writeStream
         .format("doris")
         .option("checkpointLocation", "/checkpoint")
         .option("doris.table.identifier", "db.table")
         .option("doris.fenodes", "127.0.0.1:8030")
         .option("user", "root")
         .option("password", "")
         .option("doris.sink.streaming.passthrough", "true")
         .option("doris.sink.properties.format", "json")
         .option("doris.sink.properties.read_json_by_line", "true")
         .start()
         .awaitTermination()
   
   ```
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at 
[d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you 
chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to