atharvai opened a new issue, #7354:
URL: https://github.com/apache/hudi/issues/7354

   **Describe the problem you faced**
   
   [Hudi 
guide](https://hudi.apache.org/docs/0.11.1/table_management#spark-create-table) 
for Spark SQL Create table states that record key, precombine key and partition 
keys should be specified with special arguments `primaryKey`, `preCombineField` 
and `partition by ()`.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   *Failure scenario:*
   1. Run SQL as per docs:
   ```sql
   CREATE TABLE {target_db}.{target_table_name} using hudi
   location 's3://{target_bucket_name}/{target_table_name}'
   options (
       type = 'cow', primaryKey='{pk}', preCombineField='{precombine_field}',
   
       hoodie.table.name='{target_table_name}',
       hoodie.datasource.write.operation='upsert',
       hoodie.datasource.write.table.name='{target_table_name}',
   )
   partitioned by ({partition_fields})
   AS 
   SELECT *
   FROM {source_db}.{source_table};
   ```
   Throws `Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema 
field: ts` . Notice that this is using the default value for 
`preCombineField=ts` instead of user specified custom field. This indicates 
that `preCombineField` option has no effect on Hudi when creating tables using 
Spark SQL.
   
   *Success Scenario:*
   1. run following SQL with `hoodie` properties specified in addition to SQL 
options
   ```
   CREATE TABLE {target_db}.{target_table_name} using hudi
   location 's3://{target_bucket_name}/{target_table_name}'
   options (
       type = 'cow', primaryKey='{pk}', preCombineField='{precombine_field}',
   
       hoodie.table.name='{target_table_name}',
       hoodie.datasource.write.operation='upsert',
       hoodie.datasource.write.table.name='{target_table_name}',
   
       hoodie.datasource.write.recordkey.field='{primary_key}',
       hoodie.datasource.write.precombine.field='{precombine_field}',
       hoodie.datasource.write.partitionpath.field='{partition_fields}',
       
hoodie.datasource.write.keygenerator.class='org.apache.hudi.keygen.ComplexKeyGenerator',
   )
   partitioned by ({partition_fields})
   AS 
   SELECT *
   FROM {source_db}.{source_table};
   ```
   this is successful because both `primaryKey` and 
`hoodie.datasource.write.recordkey.field` are specified. both `preCombineField` 
and `hoodie.datasource.write.precombine.field` are specified and both 
`partitioned by ()` and `hoodie.datasource.write.partitionpath.field` are 
specified.
   
   **Expected behavior**
   
   Failure scenario above should be successful.
   
   **Environment Description**
   
   [EMR 
6.7.0](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/release-version-670.html)
   
   * Hudi version : 0.11.1
   * Spark version : 3.2.1
   * Hive version : 3.1.3
   * Hadoop version :
   * Storage (HDFS/S3/GCS..) : S3
   * Running on Docker? (yes/no) : no
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   Traceback (most recent call last):
     File "/tmp/spark-618dec17-fd89-49da-abff-8893b20c3183/hudi_remodeller.py", 
line 66, in <module>
       args.primary_key, args.precombine_field, args.partition_fields, 
args.region)
     File "/tmp/spark-618dec17-fd89-49da-abff-8893b20c3183/hudi_remodeller.py", 
line 45, in remodel_table
       spark.sql(sql)
     File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 
723, in sql
     File 
"/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 
1322, in __call__
     File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
111, in deco
     File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", 
line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o90.sql.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20221129132817905
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:306)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:171)
        at 
org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:519)
        at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:228)
        at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:182)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 7 in stage 18.0 failed 4 times, most recent failure: Lost task 
7.3 in stage 18.0 (TID 270) ([2a05:d018:116b:f601:5602:379f:9471:369e] executor 
2): org.apache.avro.AvroRuntimeException: Not a valid schema field: ts
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:501)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$10(HoodieSparkSqlWriter.scala:272)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:157)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
        at 
org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49)
        at 
org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32)
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
        ... 58 more
   Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: ts
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:501)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$10(HoodieSparkSqlWriter.scala:272)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to