jdiebold opened a new issue, #5717:
URL: https://github.com/apache/hudi/issues/5717

   **Describe the problem you faced**
   
   we have struggle to add partitions to a Hudi table, when a column is missing 
in the new parquet files but exists in the schema of the existing table.
   
   We processed the data in AWS via Glue and EMR and wrote the data into s3.
We 
used the Hudi 0.10.1 version on EMR version 6.6.0 and also packaged the Hudi 
spark3.1.2-bundle ourselves and run that on the cluster.
   
   **To Reproduce**
   
   We used the following spark session configs:
   
   ```scala
   implicit val spark: SparkSession = SparkSession.builder

     .master(config.master)

     .appName("Hudi-Preprocessor")
   
  .config("spark.app.id", "Hudi-Preprocessor")

     .config("spark.sql.parquet.mergeSchema", value = true)
   
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   
  .config("spark.sql.hive.convertMetastoreParquet", "false")

     .config("spark.sql.sources.partitionColumnTypeInference.enabled", value = 
false)
   
  .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")

     .enableHiveSupport()

     .getOrCreate()
   ```
   
We read parquet files and want to rewrite them as Hudi table in another 
directory.
We process the first data set and create the initial Hudi table with 
it.
Then we update the table with the second data set, that misses column 
"price".
Usually sparks just adds null values for such columns. But in this 
case we get the following error message.
   
   ```scala
   // Create parquet example data, that will be processed.
   
Seq(

     Record("ok1", Some(100), "update", Timestamp.valueOf("2022-01-01 
00:00:00"), "2022", "01", "01", "00"),

     Record("ok2", Some(200), "update", Timestamp.valueOf("2022-01-01 
00:00:00"), "2022", "01", "01", "00"),

     Record("ok3", Some(300), "update", Timestamp.valueOf("2022-01-01 
00:00:00"), "2022", "01", "01", "00"),
     
Record("ok3", Some(300), "delete", Timestamp.valueOf("2022-01-01 
00:00:01"), "2022", "01", "01", "00"),

     Record("ok4", Some(400), "update", Timestamp.valueOf("2022-01-01 
00:00:00"), "2022", "01", "01", "00"),

     Record("ok4", Some(500), "update", Timestamp.valueOf("2022-01-01 
00:10:00"), "2022", "01", "01", "00"),

   ).toDF(„offer_key“, „price“, „action“, „event_time“)
     
.write

     .partitionBy("year", "month", "day", "hour")
     
.saveAsTable("offers")
   
   //Record2 is missing the „price“ field, so a parquet file without this 
column is written
   Seq(

     Record2(offer_key = "ok4", action = "update", event_time = 
Timestamp.valueOf("2022-01-01 01:10:00"))

   ).toDF()
     
.write

     
.parquet(s"$rootPath/spark-warehouse/offers/year=2022/month=01/day=01/hour=01")
   
   
val hudiOptions = Map[String, String](

         HoodieWriteConfig.TBL_NAME.key() -> "my_table,
         DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
         DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "offer_key",
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "year,month,day",
         DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "event_time",
         DataSourceWriteOptions.HIVE_SYNC_ENABLED.key() -> "true", // extracted 
to method for mocking in local tests
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
         DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key() -> "year,month,day",
         DataSourceWriteOptions.HIVE_DATABASE.key() ->"my_table",
         DataSourceWriteOptions.HIVE_TABLE.key() -> "my_table",
         DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
         DataSourceWriteOptions.HIVE_SYNC_MODE.key() -> "hms",
         DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> "true",
         HoodieIndexConfig.INDEX_TYPE.key() -> "GLOBAL_SIMPLE",
         DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true“
   )     
   
   inputDf
.write

     .format("org.apache.hudi")
     
.option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
 
    .options(hudiOptions)

     .mode(SaveMode.Append)
   
  .save("<path-to-table>/my_table")
   ```
   
   **Expected behavior**
   
   All records are written into the Hudi table, the missing column in the 
second write operation (update) is defaulted to null.
   
   **Environment Description**
   
   * Hudi version : 0.10.1
   
   * Spark version : 3.1.2
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3 (for production), local for reproducing 
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   With Hudi 0.11.0 this problem was solved, unfortunately we couldn’t upgrade 
to this version because of #5715 
   
   **Stacktrace**
   ```
   scala.MatchError: false (of class java.lang.Boolean)
   at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$7(AvroConversionHelper.scala:308)
   at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373)
   at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
   at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
   at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   at org.apache.spark.scheduler.Task.run(Task.scala:131)
   at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
   at java.base/java.lang.Thread.run(Thread.java:832)
   22/05/30 14:19:41 WARN TaskSetManager: Lost task 0.0 in stage 34.0 (TID 
1333) (w0duh.fritz.box executor driver): scala.MatchError: false (of class 
java.lang.Boolean)
   at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$7(AvroConversionHelper.scala:308)
   at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToAvro$14(AvroConversionHelper.scala:373)
   at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$3(HoodieSparkUtils.scala:157)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
   at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
   at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
   at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   at org.apache.spark.scheduler.Task.run(Task.scala:131)
   at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
   at java.base/java.lang.Thread.run(Thread.java:832)
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to