[
https://issues.apache.org/jira/browse/HUDI-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884427#comment-17884427
]
Y Ethan Guo commented on HUDI-7276:
-----------------------------------
We need to validate the following scenarios on a spark job running on AWS EMR
using one driver and multiple executors on different instances to make sure the
NPE does not happen anymore (this is related to Spark serde between driver and
executor across multiple physical machines):
* Enable file group reader on queries (this should be the default on master);
* Prepare two tables, one COW and one MOR; first batch with inserts, second
batch with updates and deletes (MERGE INTO can be used as described in the
description). Make sure that MOR table contains log files. Make sure there are
multiple file groups in the tables and the table is relatively large, e.g., 10
to 100GB;
* Validate COW snapshot query, MOR snapshot and read-optimized queries in
Spark.
> Fix IOException on the File group reader path
> ---------------------------------------------
>
> Key: HUDI-7276
> URL: https://issues.apache.org/jira/browse/HUDI-7276
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark
> Reporter: xy
> Assignee: Lin Liu
> Priority: Blocker
> Labels: hudi-1.0.0-beta2, pull-request-available
> Fix For: 1.0.0
>
>
> FILE_GROUP_READER_ENABLED should be disable for query
>
> java.io.IOException: com.esotericsoftware.kryo.KryoException:
> java.lang.NullPointerException
> Serialization trace:
> props (org.apache.avro.Schema$LongSchema)
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1453)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.$anonfun$buildReaderWithPartitionValues$3(HoodieFileGroupReaderBasedParquetFileFormat.scala:149)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:117)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:165)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:94)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.NullPointerException
> Serialization trace:
> props (org.apache.avro.Schema$LongSchema)
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:306)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:336)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:338)
> at
> org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:257)
> at scala.Option.getOrElse(Option.scala:189)
> at
> org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
> at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1446)
> ... 24 more
> Caused by: java.lang.NullPointerException
> at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
> at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
> at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> ... 51 more
>
> Driver stacktrace:
>
>
> display route: spark bulk insert,then merge into,at last query in sparksql
>
> spark version: 3.2.0
> Hudi version:1.0(master)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)