gtwuser opened a new issue, #6797:
URL: https://github.com/apache/hudi/issues/6797

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   When RecordKey is set as nested field the hudi job isn't picking it up 
correctly:
   ```
   org.apache.hudi.exception.HoodieException: Cannot find a record at part 
value :metadata
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
   ```
   A clear and concise description of the problem.
   Dataframe created using below mentioned json body:
   ```bash
   {
       "version": "1.0",
       "metadata": {
           "topic": "ASA.telemetry",
           "contentType": "application/json",
           "msgID": "44162"                  ## this msgID is expected to be 
set as recordKey
       }
   }
   ```
   commonConfig used:
   ```bash
   commonConfig = {
                   'className': 'org.apache.hudi',
                   'hoodie.datasource.hive_sync.use_jdbc': 'false',
                   'hoodie.datasource.write.precombine.field': 
'payload.recordedAt', ## this is set for a nested field using dot notation
                   'hoodie.datasource.write.recordkey.field': 'metadata.msgID', 
## this is set for a nested field using dot notation
                   'hoodie.table.name': 'sse',
   ....
   ......
   }
   ```
   But while saving to hudi table it throws below error : 
   ```
   org.apache.hudi.exception.HoodieException: Cannot find a record at part 
value :metadata
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. create a dataframe using json body this json body
   ```bash
   {
       "version": "1.0",
       "metadata": {
           "topic": "ASA.telemetry",
           "contentType": "application/json",
           "msgID": "44162"
       }
   }
   ```
   
   2.  using these commonConfig for hudi 
   ```bash
   commonConfig = {
                   'className': 'org.apache.hudi',
                   'hoodie.datasource.hive_sync.use_jdbc': 'false',
                   'hoodie.datasource.write.precombine.field': 
'payload.recordedAt',     ## this is set for a nested field using dot notation
                   'hoodie.datasource.write.recordkey.field': 'metadata.msgID', 
     ## this is set for a nested field using dot notation
                   'hoodie.table.name': 'sse',
                   # 'hoodie.consistency.check.enabled': 'true',
                   'hoodie.datasource.hive_sync.database': 
args['database_name'],
                   'hoodie.datasource.write.reconcile.schema': 'true',
                   'hoodie.datasource.hive_sync.table': 
f'sse_{"_".join(prefix.split("/")[-7:-5])}'.lower(),
                   'hoodie.datasource.hive_sync.enable': 'true',
                   'path': 's3://' + args['curated_bucket'] + 
'/merged/sse-native',
                   # 1,024 * 1,024 * 128 = 134,217,728 (134 MB)
                   'hoodie.parquet.small.file.limit': '307200',
                   'hoodie.parquet.max.file.size': '128000000'
               }
   
   initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68,
                                 'hoodie.datasource.write.operation': 
'bulk_insert'}
   
   partitionDataConfig = {
                   'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
                   'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.CustomKeyGenerator',
                   'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE, 
month:SIMPLE, day:SIMPLE, hour:SIMPLE, device_id:SIMPLE',
                   'hoodie.datasource.hive_sync.partition_fields': 'year, 
month, day, hour, device_id',
                   'hoodie.datasource.write.hive_style_partitioning': 'true'
               }
   
   combinedConf = {**commonConfig, **
                               partitionDataConfig, **initLoadConfig}
   ```
   3. Save this input dataframe using above confs
   ```bash
   inputDf.write \
                   .format('org.apache.hudi') \
                   
.option("spark.hadoop.parquet.avro.write-old-list-structure", "false") \
                   .option("parquet.avro.write-old-list-structure", "false") \
                   
.option("spark.hadoop.parquet.avro.add-list-element-records", "false") \
                   .option("parquet.avro.add-list-element-records", "false") \
                   .option("hoodie.parquet.avro.write-old-list-structure", 
"false") \
                   .option("hoodie.datasource.write.reconcile.schema", "true") \
                   .options(**combinedConf) \
                   .mode('append') \
                   .save()
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   It is failing while reading `recordKey` as nested value, we expect it to 
save the data using given recordKey successfully
   **Environment Description**
   
   * Hudi version : 0.11.1
   
   * Spark version :  3.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   We are running this hudi script under a glue context as a glue job using 
pyspark
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   ```bash
   2022-09-26 11:29:24,934 ERROR [main] glue.ProcessLauncher 
(Logging.scala:logError(73)): Error from Python:Traceback (most recent call 
last):
     File "/tmp/second-delete-upsert.py", line 297, in <module>
       startMerging(df_prefix_map_list)
     File "/tmp/second-delete-upsert.py", line 234, in startMerging
       .mode('append') \
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 1107, in save
       self._jwrite.save()
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1305, in __call__
       answer, self.gateway_client, self.target_id, self.name)
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
111, in deco
       return f(*a, **kw)
     File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", 
line 328, in get_return_value
       format(target_id, ".", name), value)
   py4j.protocol.Py4JJavaError: An error occurred while calling o10765.save.
   : org.apache.spark.SparkException: Writing job aborted.
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:586)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:178)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:184)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 324.0 failed 4 times, most recent failure: Lost task 
0.3 in stage 324.0 (TID 326) (172.34.28.240 executor 1): 
org.apache.hudi.exception.HoodieException: Cannot find a record at part value 
:metadata
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487)
        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140)
        at 
org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
        at 
org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107)
        at 
org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78)
        at 
org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103)
        at 
org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
        ... 69 more
   Caused by: org.apache.hudi.exception.HoodieException: Cannot find a record 
at part value :metadata
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513)
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487)
        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140)
        at 
org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
        at 
org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107)
        at 
org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78)
        at 
org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103)
        at 
org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to