aznwarmonkey opened a new issue #4541: URL: https://github.com/apache/hudi/issues/4541
Hello, I am currently getting an exception while writing a `hudi` talbe in `bulk_ingest` mode. Please see below for the stacktrace along with the snippet of code I a using to write the data. I am new to `hudi` and this stacktrace doesn't provide much insight as to why it is happening. Any help with this issue is greatly appreciative ```bash py4j.protocol.Py4JJavaError: An error occurred while calling o195.save. : org.apache.spark.SparkException: Writing job failed. at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:383) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:302) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: java.lang.NullPointerException at org.apache.hudi.internal.DataSourceInternalWriterHelper.commit(DataSourceInternalWriterHelper.java:83) at org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.commit(HoodieDataSourceInternalBatchWrite.java:84) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371) ... 69 more Suppressed: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://path/to/data/ commits 20220108133454 at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:563) at org.apache.hudi.internal.DataSourceInternalWriterHelper.abort(DataSourceInternalWriterHelper.java:91) at org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.abort(HoodieDataSourceInternalBatchWrite.java:89) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:378) ... 69 more Caused by: org.apache.hudi.exception.HoodieRollbackException: Found in-flight commits after time :20220108133454, please rollback greater commits first at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:148) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:166) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:103) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:230) at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:552) ... 72 more Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.lang.NullPointerException at java.io.StringReader.<init>(StringReader.java:50) at org.apache.avro.Schema$Parser.parse(Schema.java:1020) at org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor.lambda$runClusteringForGroupAsync$3(SparkExecuteClusteringCommitActionExecutor.java:118) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more ``` This is how I am writing out the the data: ```python hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.recordkey.field': keys, 'hoodie.datasource.write.partitionpath.field': ','.join(partitions), 'hoodie.datasource.write.hive_style_partitioning': True, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.precombine.field': timestamp_col, 'hoodie.index.type': 'BLOOM', 'hoodie.consistency.check.enabled': True, 'hoodie.bloom.index.bucketized.checking': True, 'hoodie.bloom.index.keys.per.bucket': 50000000, 'hoodie.index.bloom.num_entries': 1000000, 'hoodie.bloom.index.use.caching': True, 'hoodie.bloom.index.use.treebased.filter': True, 'hoodie.bloom.index.filter.type': 'DYNAMIC_V0', 'hoodie.bloom.index.filter.dynamic.max.entries': 1000000, 'hoodie.bloom.index.prune.by.ranges': True, 'hoodie.parquet.small.file.limit': 134217728, 'hoodie.parquet.max.file.size': 1073741824, 'write.bulk_insert.shuffle_by_partition': True, 'write.parquet.block.size': 256, 'hoodie.copyonwrite.insert.auto.split': False, 'hoodie.copyonwrite.insert.split.size': 10000000, 'hoodie.datasource.write.row.writer.enable': True, 'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT', 'hoodie.bulkinsert.shuffle.parallelism': 200, 'hoodie.clustering.inline': True, 'hoodie.clustering.inline.max.commits': '1', 'hoodie.clustering.plan.strategy.small.file.limit': '1073741824', 'hoodie.clustering.plan.strategy.target.file.max.bytes': '2147483648', 'hoodie.clustering.execution.strategy.class': 'org.apache.hudi.client.clustering.run.strategy' '.SparkSortAndSizeExecutionStrategy', 'hoodie.clustering.plan.strategy.sort.columns': sort_cols, 'hoodie.cleaner.commits.retained': '2', 'hoodie.clean.async': True, } df.write.format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'bulk_ingest') \ .options(**hudi_options).mode('append').save(output_path) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org