atharvai opened a new issue, #7354: URL: https://github.com/apache/hudi/issues/7354
**Describe the problem you faced** [Hudi guide](https://hudi.apache.org/docs/0.11.1/table_management#spark-create-table) for Spark SQL Create table states that record key, precombine key and partition keys should be specified with special arguments `primaryKey`, `preCombineField` and `partition by ()`. **To Reproduce** Steps to reproduce the behavior: *Failure scenario:* 1. Run SQL as per docs: ```sql CREATE TABLE {target_db}.{target_table_name} using hudi location 's3://{target_bucket_name}/{target_table_name}' options ( type = 'cow', primaryKey='{pk}', preCombineField='{precombine_field}', hoodie.table.name='{target_table_name}', hoodie.datasource.write.operation='upsert', hoodie.datasource.write.table.name='{target_table_name}', ) partitioned by ({partition_fields}) AS SELECT * FROM {source_db}.{source_table}; ``` Throws `Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: ts` . Notice that this is using the default value for `preCombineField=ts` instead of user specified custom field. This indicates that `preCombineField` option has no effect on Hudi when creating tables using Spark SQL. *Success Scenario:* 1. run following SQL with `hoodie` properties specified in addition to SQL options ``` CREATE TABLE {target_db}.{target_table_name} using hudi location 's3://{target_bucket_name}/{target_table_name}' options ( type = 'cow', primaryKey='{pk}', preCombineField='{precombine_field}', hoodie.table.name='{target_table_name}', hoodie.datasource.write.operation='upsert', hoodie.datasource.write.table.name='{target_table_name}', hoodie.datasource.write.recordkey.field='{primary_key}', hoodie.datasource.write.precombine.field='{precombine_field}', hoodie.datasource.write.partitionpath.field='{partition_fields}', hoodie.datasource.write.keygenerator.class='org.apache.hudi.keygen.ComplexKeyGenerator', ) partitioned by ({partition_fields}) AS SELECT * FROM {source_db}.{source_table}; ``` this is successful because both `primaryKey` and `hoodie.datasource.write.recordkey.field` are specified. both `preCombineField` and `hoodie.datasource.write.precombine.field` are specified and both `partitioned by ()` and `hoodie.datasource.write.partitionpath.field` are specified. **Expected behavior** Failure scenario above should be successful. **Environment Description** [EMR 6.7.0](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/release-version-670.html) * Hudi version : 0.11.1 * Spark version : 3.2.1 * Hive version : 3.1.3 * Hadoop version : * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** Add any other context about the problem here. **Stacktrace** ``` Traceback (most recent call last): File "/tmp/spark-618dec17-fd89-49da-abff-8893b20c3183/hudi_remodeller.py", line 66, in <module> args.primary_key, args.precombine_field, args.partition_fields, args.region) File "/tmp/spark-618dec17-fd89-49da-abff-8893b20c3183/hudi_remodeller.py", line 45, in remodel_table spark.sql(sql) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o90.sql. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20221129132817905 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:306) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:171) at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:519) at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:228) at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:182) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 18.0 failed 4 times, most recent failure: Lost task 7.3 in stage 18.0 (TID 270) ([2a05:d018:116b:f601:5602:379f:9471:369e] executor 2): org.apache.avro.AvroRuntimeException: Not a valid schema field: ts at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:501) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$10(HoodieSparkSqlWriter.scala:272) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:157) at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142) at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113) at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32) at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53) ... 58 more Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: ts at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:501) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$10(HoodieSparkSqlWriter.scala:272) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
