santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1239394929
@codope here is the output without the above mentioned config, have also
added the code which am using for testing the fix.
--------------ERROR--------------------
`22/09/07 18:53:08 ERROR BoundedInMemoryExecutor: error producing records) /
200]
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema
mismatch: Avro field 'prefix' not found
at
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
at
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
at
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:480)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 8 more
22/09/07 18:53:09 ERROR BoundedInMemoryExecutor: error consuming records 1)
/ 1]`
--------------CODE---------------------
`~/work/spark-3.2.1-bin-hadoop3.2/bin/spark-shell --jars `ls
packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar`
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
//Define a Patient FHIR resource, for simplicity have deleted most of the
elements and retained a few
val orgString =
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
val orgStringDf = spark.read.json(Seq(orgString).toDS)
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
//Write the orgStringDf to a Hudi table
orgStringDf.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Overwrite)
.save("/work/data/updateTst/hudi/json_schema_tst")
//Read the Hudi table
val patienthudi =
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
//Printschema
patienthudi.printSchema
//Update: Based on our usecase add a new patient resource, this resource
might contain new columns and might not have existing columns (normal use case
with FHIR data)
val updatedString =
"""{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
//Convert the new resource string into DF
val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
//Check the schema of the new resource that is being added
updatedStringDf.printSchema
//Upsert the new resource
updatedStringDf.write
.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
"org.apache.hudi.common.model.EmptyHoodieRecordPayload")
.mode(SaveMode.Append)
.save("/work/data/updateTst/hudi/json_schema_tst")
//Read the Hudi table
val patienthudiUpdated =
spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
//Print the schema after adding the new record
patienthudiUpdated.printSchema`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]