imonteroq opened a new issue, #10533:
URL: https://github.com/apache/hudi/issues/10533
**Describe the problem you faced**
Streaming in Spark from a Hudi table fails with the error below when a
`writeStream` process has created / written to the table with the schema
evolution settings `hoodie.schema.on.read.enable` &
`hoodie.datasource.write.reconcile.schema` on. I have not been able to upsert a
source schema containing either more columns and/or fewer columns than the
target schema without this two settings enabled.
`org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column
_hoodie_commit_seqno already exists. Consider to choose another name or rename
the existing column.`
**To Reproduce**
Steps to reproduce the behavior:
```
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("testAppName")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
val hudiOptions: Map[String, String] = Map(
"hoodie.table.name" -> "test_table",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.recordkey.field" -> "identifier",
"hoodie.datasource.write.precombine.field" -> "date",
"hoodie.datasource.insert.dup.policy" -> "none",
"hoodie.avro.schema.externalTransformation" -> "true",
"hoodie.schema.on.read.enable" -> "true",
"hoodie.datasource.write.reconcile.schema" -> "true")
val inMemoryRecords: List[Contract] =
List(Contract("001", 1, "test1", 100), Contract("002", 2, "test2", 100),
Contract("003", 3, "test3", 100))
val contractsInMemory: MemoryStream[Contract] = MemoryStream[Contract]
contractsInMemory.addData(inMemoryRecords)
contractsInMemory
.toDF()
.writeStream
.format("hudi")
.trigger(Trigger.AvailableNow())
.queryName("streamingQueryName")
.option("checkpointLocation", "/tmp/checkpoint")
.options(hudiOptions)
.outputMode(OutputMode.Append())
.start("/tmp/data")
.processAllAvailable()
spark.readStream
.format("hudi")
.load("/tmp/data")
.writeStream
.format("memory")
.queryName("queryName")
.outputMode("append")
.start()
.processAllAvailable()
```
**Environment Description**
* OS: Mac OS X
* Hudi version: 0.14.0
* Spark version: 3.4.1
* Storage: S3 (LocalStack)
* Running on Docker?: No
**Additional context**
This works fine with either `hoodie.schema.on.read.enable` or
`hoodie.datasource.write.reconcile.schema` disabled.
**Stacktrace**
```
org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column
`_hoodie_commit_seqno` already exists. Consider to choose another name or
rename the existing column.
at
org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
at
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
at
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
at
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
at
org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
[stream execution thread for test_hudi [id =
443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId =
dbf43a65-825f-45ca-bae0-d7109ee9d066]] INFO
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Async log purge
executor pool for query test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c,
runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] has been shutdown
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext -
SparkContext is stopping with exitCode 0.
[dispatcher-event-loop-4] INFO
org.apache.spark.MapOutputTrackerMasterEndpoint -
MapOutputTrackerMasterEndpoint stopped!
[ScalaTest-run-running-HudiSourceIT] INFO
org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared
[ScalaTest-run-running-HudiSourceIT] INFO
org.apache.spark.storage.BlockManager - BlockManager stopped
[ScalaTest-run-running-HudiSourceIT] INFO
org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped
[dispatcher-event-loop-2] INFO
org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
- OutputCommitCoordinator stopped!
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext -
Successfully stopped SparkContext
[STREAM_FAILED] Query [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId =
dbf43a65-825f-45ca-bae0-d7109ee9d066] terminated with exception:
[COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists.
Consider to choose another name or rename the existing column.
org.apache.spark.sql.streaming.StreamingQueryException:
[COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists.
Consider to choose another name or rename the existing column.
=== Streaming Query ===
Identifier: test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId =
dbf43a65-825f-45ca-bae0-d7109ee9d066]
Current Committed Offsets: {}
Current Available Offsets:
{org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5:
{"commitTime":"20240118170827012"}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource MemorySink,
443bb452-13e7-4d43-916d-bac5fd5d1f2c, [queryName=test_hudi], Append
+- StreamingExecutionRelation
org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5,
[_hoodie_commit_time#142, _hoodie_commit_seqno#143, _hoodie_record_key#144,
_hoodie_partition_path#145, _hoodie_file_name#146, identifier#147, name#148,
quantity#149, status#150, agent#151, metadata#152, contacts#153, date#154]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS]
The column `_hoodie_commit_seqno` already exists. Consider to choose another
name or rename the existing column.
at
org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
at
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
at
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
at
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
at
org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
... 1 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]