[
https://issues.apache.org/jira/browse/HUDI-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raymond Xu updated HUDI-1079:
-----------------------------
Story Points: (was: 0.5)
> Cannot upsert on schema with Array of Record with single field
> --------------------------------------------------------------
>
> Key: HUDI-1079
> URL: https://issues.apache.org/jira/browse/HUDI-1079
> Project: Apache Hudi
> Issue Type: Bug
> Components: writer-core
> Affects Versions: 0.9.0
> Environment: spark 2.4.4, local
> Reporter: Adrian Tanase
> Assignee: Raymond Xu
> Priority: Critical
> Labels: schema, sev:critical, user-support-issues
> Fix For: 0.11.0
>
> Original Estimate: 2h
> Time Spent: 1.5h
> Remaining Estimate: 0.5h
>
> I am trying to trigger upserts on a table that has an array field with
> records of just one field.
> Here is the code to reproduce:
> {code:scala}
> val spark = SparkSession.builder()
> .master("local[1]")
> .appName("SparkByExamples.com")
> .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .getOrCreate();
> // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/
> val arrayStructData = Seq(
> Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
> Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
> Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
> Row("Washington",null)
> )
> val arrayStructSchema = new StructType()
> .add("name",StringType)
> .add("booksIntersted",ArrayType(
> new StructType()
> .add("bookName",StringType)
> // .add("author",StringType)
> // .add("pages",IntegerType)
> ))
> val df =
> spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
> {code}
> Running insert following by upsert will fail:
> {code:scala}
> df.write
> .format("hudi")
> .options(getQuickstartWriteConfigs)
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
> .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .mode(Overwrite)
> .save(basePath)
> df.write
> .format("hudi")
> .options(getQuickstartWriteConfigs)
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .mode(Append)
> .save(basePath)
> {code}
> If I create the books record with all the fields (at least 2), it works as
> expected.
> The relevant part of the exception is this:
> {noformat}
> Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is
> not a groupCaused by: java.lang.ClassCastException: required binary bookName
> (UTF8) is not a group at
> org.apache.parquet.schema.Type.asGroupType(Type.java:207) at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
> at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
> at
> org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
> at
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
> at
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
> at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
> at
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
> at
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
> at
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
> at
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
> at
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at
> org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> at
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4
> more{noformat}
> Another way to test is by changing the generated data in the tips to just the
> amount, by dropping the currency on the tips_history field, tests will start
> failing:
>
> [https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1]
> I have narrowed this down to this block in the parquet-avro integration:
> [https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875]
> Which always returns false after trying to decide whether reader and writer
> schemas are compatible. Going through that code path makes me think it's
> related to the fields being optional, as the inferred schema seems to be
> (null, string) with default null instead of (string, null) with no default.
> At this point I'm lost, tried to figure something out based on this
> [https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to
> start.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)