[ 
https://issues.apache.org/jira/browse/HUDI-4798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17949613#comment-17949613
 ] 

Voon Hou commented on HUDI-4798:
--------------------------------

Another relevant fix that is required:

https://issues.apache.org/jira/browse/PARQUET-2450

 

The test below will reproduce it on master - 
e4d01dd791e6e41617bd4d9f1df22a792315d4c2

 
{code:java}
test("Repro Issue: Array of Struct with single inner field") {
  withTempDir { tmp =>
    val tableName = "hudi_type_test_mor"
    spark.sql(
      s"""
         |CREATE TABLE $tableName (
         |  uuid STRING,
         |  precombine_field LONG,
         |  col_double DOUBLE,
         |  array_struct ARRAY<STRUCT<inner_f3: STRING>>,
         |  part_col STRING
         |) USING hudi
         | LOCATION '${tmp.getCanonicalPath}'
         | TBLPROPERTIES (
         |  primaryKey = 'uuid',
         |  type = 'mor',
         |  preCombineField = 'precombine_field'
         | )
         | PARTITIONED BY (part_col)
      """.stripMargin)
    // directly write to new parquet file
    spark.sql(s"set hoodie.parquet.small.file.limit=0")
    spark.sql(s"set hoodie.metadata.compact.max.delta.commits=1")
    // partition stats index is enabled together with column stats index
    spark.sql(s"set hoodie.metadata.index.column.stats.enable=true")
    spark.sql(s"set hoodie.metadata.record.index.enable=true")
    spark.sql(s"set hoodie.metadata.index.secondary.enable=true")

    // Insert row 1 into partition 'A'
    spark.sql(
      s"""
         | INSERT INTO $tableName VALUES (
         |  'uuid1', 1000L, 1.1,
         |  array(struct('asd'), struct('ghj')),
         |  'A'
         | )
    """.stripMargin)

    spark.sql(s"CREATE INDEX idx_double ON $tableName (col_double)")

    // Generate log files through updates on partition 'A'
    // Allow time for async indexer to run so that error will be thrown
    spark.sql(s"UPDATE $tableName SET col_double = col_double + 100, 
precombine_field = precombine_field + 1 WHERE part_col = 'A'")
  }
}{code}
 

 

Error:

 
{code:java}
Driver stacktrace:
    at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at 
org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at 
org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at 
org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:109)
    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:204)
    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:171)
    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:151)
    at 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:146)
    at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:64)
    at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1412)
    at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:136)
    at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:1047)
    at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:1105)
    at 
org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:277)
    ... 96 more

Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
    at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
    at 
org.apache.hudi.common.table.log.HoodieFileSliceReader.hasNextInternal(HoodieFileSliceReader.java:73)
    at 
org.apache.hudi.common.table.log.HoodieFileSliceReader.doHasNext(HoodieFileSliceReader.java:99)
    at 
org.apache.hudi.common.util.collection.CachingIterator.hasNext(CachingIterator.java:32)
    at 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(SecondaryIndexRecordGenerationUtils.java:183)
    at 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.lambda$convertWriteStatsToSecondaryIndexRecords$8386a558$1(SecondaryIndexRecordGenerationUtils.java:138)
    at 
org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:160)
    at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750) 

Caused by: java.lang.ClassCastException: optional binary inner_f3 (STRING) is 
not a group
    at org.apache.parquet.schema.Type.asGroupType(Type.java:248)
    at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:362)
    at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:306)
    at 
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:79)
    at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:617)
    at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:567)
    at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:371)
    at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:144)
    at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:98)
    at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
    at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:146)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
    at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    ... 39 more{code}
 

 

 

> Need to upgrade parquet after nested fields fixed
> -------------------------------------------------
>
>                 Key: HUDI-4798
>                 URL: https://issues.apache.org/jira/browse/HUDI-4798
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: reader-core, writer-core
>            Reporter: Sagar Sumit
>            Priority: Major
>             Fix For: 1.1.0
>
>
> parquet-mr fails to read nested array fields.
> See following issues for more discussion:
> [https://github.com/apache/hudi/issues/5985]
> [https://github.com/apache/hudi/issues/5701]
> Essentially, Hudi needs to upgrade parquet version after following parquet 
> issues are fixed:
> https://issues.apache.org/jira/browse/PARQUET-1254
> https://issues.apache.org/jira/browse/PARQUET-1681
> https://issues.apache.org/jira/browse/PARQUET-2069
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to