[ https://issues.apache.org/jira/browse/HUDI-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated HUDI-3748: --------------------------------- Labels: pull-request-available (was: ) > Hudi fails to insert into a partitioned table when the partition column is > dropped from the parquet schema > ---------------------------------------------------------------------------------------------------------- > > Key: HUDI-3748 > URL: https://issues.apache.org/jira/browse/HUDI-3748 > Project: Apache Hudi > Issue Type: Bug > Reporter: Vinoth Govindarajan > Assignee: Yann Byron > Priority: Blocker > Labels: pull-request-available > Fix For: 0.11.0 > > > When you add this config to drop the partition column from the parquet schema > to support BigQuery, hudi fails to insert with the following error. > > Steps to reproduce: > Start hudi docker: > {code:java} > cd hudi/docker > ./setup_demo.sh{code} > {code:java} > docker exec -it adhoc-2 /bin/bash{code} > {code:java} > # Log into spark-sql and execute the following commands: > spark-sql --jars $HUDI_SPARK_BUNDLE \ > --master local[2] \ > --driver-class-path $HADOOP_CONF_DIR \ > --conf spark.sql.hive.convertMetastoreParquet=false \ > --deploy-mode client \ > --driver-memory 1G \ > --executor-memory 3G \ > --num-executors 1 \ > --packages org.apache.spark:spark-avro_2.11:2.4.4 \ > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ > --conf > 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' > {code} > > {code:java} > create table bq_demo_partitioned_cow ( > id bigint, > name string, > price double, > ts bigint, > dt string > ) using hudi > partitioned by (dt) > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts', > hoodie.datasource.write.drop.partition.columns = 'true' > ); > insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(1, > 'a1', 10, current_timestamp()); > insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, > 'a2', 20, current_timestamp()); {code} > Error: > {code:java} > 22/03/29 20:58:02 INFO spark.SparkContext: Starting job: collect at > HoodieSparkEngineContext.java:100 > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Got job 63 (collect at > HoodieSparkEngineContext.java:100) with 1 output partitions > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 131 > (collect at HoodieSparkEngineContext.java:100) > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Parents of final stage: List() > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Missing parents: List() > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting ResultStage 131 > (MapPartitionsRDD[235] at map at HoodieSparkEngineContext.java:100), which > has no missing parents > 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89 stored as > values in memory (estimated size 71.9 KB, free 364.0 MB) > 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89_piece0 stored > as bytes in memory (estimated size 26.3 KB, free 364.0 MB) > 22/03/29 20:58:02 INFO storage.BlockManagerInfo: Added broadcast_89_piece0 in > memory on adhoc-1:38703 (size: 26.3 KB, free: 365.7 MB) > 22/03/29 20:58:02 INFO spark.SparkContext: Created broadcast 89 from > broadcast at DAGScheduler.scala:1161 > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ResultStage 131 (MapPartitionsRDD[235] at map at > HoodieSparkEngineContext.java:100) (first 15 tasks are for partitions > Vector(0)) > 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Adding task set 131.0 > with 1 tasks > 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 131.0 (TID 4081, localhost, executor driver, partition 0, PROCESS_LOCAL, 7803 > bytes) > 22/03/29 20:58:02 INFO executor.Executor: Running task 0.0 in stage 131.0 > (TID 4081) > 22/03/29 20:58:02 INFO executor.Executor: Finished task 0.0 in stage 131.0 > (TID 4081). 1167 bytes result sent to driver > 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 131.0 (TID 4081) in 17 ms on localhost (executor driver) (1/1) > 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 131.0, > whose tasks have all completed, from pool > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: ResultStage 131 (collect at > HoodieSparkEngineContext.java:100) finished in 0.030 s > 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Job 63 finished: collect at > HoodieSparkEngineContext.java:100, took 0.032364 s > 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : > Option{val=[20220329205734338__commit__COMPLETED]} > 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: Took 0 ms to read 0 > instants, 0 replaced file groups > 22/03/29 20:58:02 INFO util.ClusteringUtils: Found 0 files in pending > clustering operations > 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: addFilesToView: > NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0 > 22/03/29 20:58:02 INFO hudi.HoodieFileIndex: Refresh table > bq_demo_partitioned_cow, spend: 124 ms > 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Loading > HoodieTableMetaClient from > hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow > 22/03/29 20:58:02 INFO table.HoodieTableConfig: Loading table properties from > hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow/.hoodie/hoodie.properties > 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Finished Loading Table of > type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from > hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow > 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : > Option{val=[20220329205734338__commit__COMPLETED]} > 22/03/29 20:58:02 INFO command.InsertIntoHoodieTableCommand: insert statement > use write operation type: upsert, payloadClass: > org.apache.hudi.common.model.OverwriteWithLatestAvroPayload > 22/03/29 20:58:02 ERROR thriftserver.SparkSQLDriver: Failed in [insert into > bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, 'a2', 20, > current_timestamp())] > java.lang.AssertionError: assertion failed: Required partition columns is: > {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371) > at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > java.lang.AssertionError: assertion failed: Required partition columns is: > {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94) > at > org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371) > at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)spark-sql> > 22/03/29 21:00:38 INFO hfile.LruBlockCache: totalSize=383.75 KB, > freeSize=363.83 MB, max=364.20 MB, blockCount=0, accesses=4, hits=0, > hitRatio=0, cachingAccesses=0, cachingHits=0, > cachingHitsRatio=0,evictions=59, evicted=0, evictedPerRun=0.0 {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)