RameshkumarChikoti123 opened a new issue, #12191:
URL: https://github.com/apache/hudi/issues/12191

   I am using RLI configuration and while upserting and deleting getting error  
as describe below
   
   **Expected behavior**
   
   it should work end to end with RLI index enable
   
   **Environment Description**
   
   
   * Hudi version : 0.15.0
   * Spark version : 3.3.0
   * Storage : S3
   * Hive version : NA
   * Running on Docker : Yes
   * Hadoop version : 3.3.4
   
   
   **Steps to reproduce the behaviour:**
   
   ```
   
   # Spark Configuration
   
   sparkConf.set('spark.sql.catalog.spark_catalog', 
'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
   sparkConf.set('spark.sql.catalog.spark_catalog.type', 'hadoop')
   sparkConf.set('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer')
   sparkConf.set('spark.sql.hudi.enable', 'true')
   
   
   sparkConf.set('spark.driver.extraClassPath', 
'/home/jovyan/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
   
   sparkConf.set('spark.executor.extraClassPath', 
'/home/jovyan/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
   
   # Configuring Hudi-specific options
   sparkConf.set('spark.jars.packages', 
'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
   
   # Hudi SQL Extensions
   sparkConf.set('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
   
   
   
   
   hudiOptions = {
       'hoodie.table.name': 'my_hudi_table',
       'hoodie.datasource.write.recordkey.field': 'guid',
       'hoodie.datasource.write.precombine.field': 'timestamp',
       'hoodie.datasource.write.partitionpath.field': 'partitionpath',  
       'hoodie.metadata.record.index.enable': 'true',  
       'hoodie.index.type': 'RECORD_INDEX',  
       'hoodie.datasource.hive_sync.enable': False,  
       'hoodie.table.type' : 'COPY_ON_WRITE',
       'hoodie.metadata.enable' : 'true',
       'hoodie.metadata.record.index.enable' : 'true',
       'hoodie.combine.before.upsert': 'true', 
       'hoodie.combine.before.insert': 'true'
   }
   
   # Insert into hudi table
   
   from pyspark.sql.types import StructType, StructField, StringType, 
TimestampType
   
   
   data = [("9", "Alice", "2024-01-01 00:00:00", "2024-01"),
           ("10", "Bob", "2024-01-01 00:01:00", "2024-01"),
           ("11", "Charlie", "2024-01-02 00:02:00", "2024-01"),
           ("12", "David", "2024-02-01 00:03:00", "2024-02")]
   columns = ["guid", "name", "timestamp", "partitionpath"]
   schema = StructType([
       StructField("guid", StringType(), True),
       StructField("name", StringType(), True),
       StructField("timestamp", StringType(), True),
       StructField("partitionpath", StringType(), True)
   ])
   inputDF = spark.createDataFrame(data, schema)
   
   inputDF.write.format("hudi") \
       .options(**hudiOptions) \
       .option("hoodie.datasource.write.operation", "insert") \
       .mode("append") \
       .save("s3a://bucket/var/hudi_poc/my_hudi_table")
   `
   
   
   # Upsert into hudi table
   
   # Sample DataFrame for upsert
   update_data = [("1", "Alice Updated latest ", "2029-01-13 00:00:00", 
"2025-11"),
                  ("5", "Eve", "2024-02-02 00:04:00", "2024-02")]
   columns = ["guid", "name", "timestamp", "partitionpath"]
   
   updateDF = spark.createDataFrame(update_data, columns)
   
   # Upsert DataFrame into Hudi table
   updateDF.write.format("hudi") \
       .options(**hudiOptions) \
       .option("hoodie.datasource.write.operation", "upsert") \
       .mode("append") \
       .save("s3a://bucket/var/hudi_poc/my_hudi_table")
   
   
   # Delete
   
   delete_data = [("1",)]
   deleteDF = spark.createDataFrame(delete_data, ["guid"])
   
   deleteDF.write.format("hudi") \
       .options(**hudiOptions) \
       .option("hoodie.datasource.write.operation", "delete") \
       .mode("append") \
       .save("s3a://bucket/var/hudi_poc/my_hudi_table")
   
   ```
   
   
   **Additional context**
   
   This error occurs intermittently. I added the hudi spark bundle jar to the 
executor and driver classpath based on a GitHub issue 
https://github.com/apache/hudi/issues/10609 , but the problem  still persists.
   
   **Stacktrace**
   
   ```
   Py4JJavaError: An error occurred while calling o221.save.
   : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20241027192251336
        at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
        at 
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 7 in stage 56.0 failed 4 times, most recent failure: Lost task 
7.3 in stage 56.0 (TID 204) (10.249.127.135 executor 2): 
org.apache.hudi.exception.HoodieException: Error occurs when executing map
        at 
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
        at 
java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
        at 
java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
        at 
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpComplete(ForkJoinPool.java:1223)
        at 
java.base/java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1915)
        at 
java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:433)
        at 
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:687)
        at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
        at 
org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280)
        at 
org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303)
        at 
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
        at 
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:163)
        at 
org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:109)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:337)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:314)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285)
        at 
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
        ... 37 more
   Caused by: java.lang.ClassCastException: class 
org.apache.avro.generic.GenericData$Record cannot be cast to class 
org.apache.hudi.avro.model.HoodieDeleteRecordList 
(org.apache.avro.generic.GenericData$Record is in unnamed module of loader 
'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module 
of loader org.apache.spark.util.MutableURLClassLoader @799a7938)
        at 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
        at 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
        ... 44 more ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to