[ https://issues.apache.org/jira/browse/HUDI-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456186#comment-17456186 ]
sivabalan narayanan commented on HUDI-1079: ------------------------------------------- Unfortunately, hudi does not do any schema fiddling in this regard. we just rely on parquet-avro to do the conversion for us and apparently arrays with a struct type with just 1 type runs into issues. But I also wonder why would someone(even your upstream datasets), define a struct type with just 1 entry. Anyways, we try our best not to re-write these logic internally and try to re-use the library as is. Let us know if you are still looking for a solution on this end. > Cannot upsert on schema with Array of Record with single field > -------------------------------------------------------------- > > Key: HUDI-1079 > URL: https://issues.apache.org/jira/browse/HUDI-1079 > Project: Apache Hudi > Issue Type: Bug > Components: Spark Integration > Affects Versions: 0.9.0 > Environment: spark 2.4.4, local > Reporter: Adrian Tanase > Priority: Critical > Labels: schema, sev:critical, user-support-issues > Fix For: 0.11.0 > > > I am trying to trigger upserts on a table that has an array field with > records of just one field. > Here is the code to reproduce: > {code:scala} > val spark = SparkSession.builder() > .master("local[1]") > .appName("SparkByExamples.com") > .config("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .getOrCreate(); > // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/ > val arrayStructData = Seq( > Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), > Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))), > Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))), > Row("Washington",null) > ) > val arrayStructSchema = new StructType() > .add("name",StringType) > .add("booksIntersted",ArrayType( > new StructType() > .add("bookName",StringType) > // .add("author",StringType) > // .add("pages",IntegerType) > )) > val df = > spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) > {code} > Running insert following by upsert will fail: > {code:scala} > df.write > .format("hudi") > .options(getQuickstartWriteConfigs) > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name") > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name") > .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .mode(Overwrite) > .save(basePath) > df.write > .format("hudi") > .options(getQuickstartWriteConfigs) > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name") > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .mode(Append) > .save(basePath) > {code} > If I create the books record with all the fields (at least 2), it works as > expected. > The relevant part of the exception is this: > {noformat} > Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is > not a groupCaused by: java.lang.ClassCastException: required binary bookName > (UTF8) is not a group at > org.apache.parquet.schema.Type.asGroupType(Type.java:207) at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232) > at > org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536) > at > org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486) > at > org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141) > at > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) > at > org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) > at > org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at > org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at > org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > at > org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 > more{noformat} > Another way to test is by changing the generated data in the tips to just the > amount, by dropping the currency on the tips_history field, tests will start > failing: > > [https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1] > I have narrowed this down to this block in the parquet-avro integration: > [https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875] > Which always returns false after trying to decide whether reader and writer > schemas are compatible. Going through that code path makes me think it's > related to the fields being optional, as the inferred schema seems to be > (null, string) with default null instead of (string, null) with no default. > At this point I'm lost, tried to figure something out based on this > [https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to > start. -- This message was sent by Atlassian Jira (v8.20.1#820001)