aznwarmonkey opened a new issue #4541:
URL: https://github.com/apache/hudi/issues/4541


   Hello,
   
   I am currently getting an exception while writing a `hudi` talbe in 
`bulk_ingest` mode. Please see below for the stacktrace along with the snippet 
of code I a using to write the data. I am new to `hudi` and this stacktrace 
doesn't provide much insight as to why it is happening. Any help with this 
issue is greatly appreciative 
   
   ```bash
   py4j.protocol.Py4JJavaError: An error occurred while calling o195.save.
   : org.apache.spark.SparkException: Writing job failed.
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:383)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
           at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
           at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
           at 
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
           at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
           at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
           at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
           at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
           at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
           at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
           at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
           at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
           at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
           at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
           at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
           at 
org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:302)
           at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
           at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
           at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
           at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
           at 
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
           at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
           at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
           at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
           at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
           at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
           at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
           at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
           at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
           at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
           at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
           at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
           at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
           at py4j.Gateway.invoke(Gateway.java:282)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at py4j.GatewayConnection.run(GatewayConnection.java:238)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: 
java.lang.NullPointerException
           at 
org.apache.hudi.internal.DataSourceInternalWriterHelper.commit(DataSourceInternalWriterHelper.java:83)
           at 
org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.commit(HoodieDataSourceInternalBatchWrite.java:84)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
           ... 69 more
           Suppressed: org.apache.hudi.exception.HoodieRollbackException: 
Failed to rollback s3://path/to/data/ commits 20220108133454
                   at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:563)
                   at 
org.apache.hudi.internal.DataSourceInternalWriterHelper.abort(DataSourceInternalWriterHelper.java:91)
                   at 
org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.abort(HoodieDataSourceInternalBatchWrite.java:89)
                   at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:378)
                   ... 69 more
           Caused by: org.apache.hudi.exception.HoodieRollbackException: Found 
in-flight commits after time :20220108133454, please rollback greater commits 
first
                   at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:148)
                   at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:166)
                   at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:103)
                   at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:230)
                   at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:552)
                   ... 72 more
   Caused by: java.util.concurrent.CompletionException: 
java.lang.NullPointerException
           at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
           at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
           at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
           at 
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
           at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
           at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
           at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   Caused by: java.lang.NullPointerException
           at java.io.StringReader.<init>(StringReader.java:50)
           at org.apache.avro.Schema$Parser.parse(Schema.java:1020)
           at 
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor.lambda$runClusteringForGroupAsync$3(SparkExecuteClusteringCommitActionExecutor.java:118)
           at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           ... 5 more
   ```
   
   This is how I am writing out the the data:
   
   ```python
   
           hudi_options = {
               'hoodie.table.name': table_name,
               'hoodie.datasource.write.keygenerator.class':
                   'org.apache.hudi.keygen.ComplexKeyGenerator',
               'hoodie.datasource.write.recordkey.field': keys,
               'hoodie.datasource.write.partitionpath.field': 
','.join(partitions),
               'hoodie.datasource.write.hive_style_partitioning': True,
               'hoodie.datasource.write.table.name': table_name,
               'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
               'hoodie.datasource.write.precombine.field': timestamp_col,
               'hoodie.index.type': 'BLOOM',
               'hoodie.consistency.check.enabled': True,
               'hoodie.bloom.index.bucketized.checking': True,
               'hoodie.bloom.index.keys.per.bucket': 50000000,
               'hoodie.index.bloom.num_entries': 1000000,
               'hoodie.bloom.index.use.caching': True,
               'hoodie.bloom.index.use.treebased.filter': True,
               'hoodie.bloom.index.filter.type': 'DYNAMIC_V0',
               'hoodie.bloom.index.filter.dynamic.max.entries': 1000000,
               'hoodie.bloom.index.prune.by.ranges': True,
               'hoodie.parquet.small.file.limit': 134217728,
               'hoodie.parquet.max.file.size': 1073741824,
               'write.bulk_insert.shuffle_by_partition': True,
               'write.parquet.block.size': 256,
               'hoodie.copyonwrite.insert.auto.split': False,
               'hoodie.copyonwrite.insert.split.size': 10000000,
               'hoodie.datasource.write.row.writer.enable': True,
               'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
               'hoodie.bulkinsert.shuffle.parallelism': 200,
               'hoodie.clustering.inline': True,
               'hoodie.clustering.inline.max.commits': '1',
               'hoodie.clustering.plan.strategy.small.file.limit': '1073741824',
               'hoodie.clustering.plan.strategy.target.file.max.bytes': 
'2147483648',
               'hoodie.clustering.execution.strategy.class':
                   'org.apache.hudi.client.clustering.run.strategy'
                   '.SparkSortAndSizeExecutionStrategy',
               'hoodie.clustering.plan.strategy.sort.columns': sort_cols,
               'hoodie.cleaner.commits.retained': '2',
               'hoodie.clean.async': True,
           }
   
           df.write.format('org.apache.hudi') \
               .option('hoodie.datasource.write.operation', 'bulk_ingest') \
               .options(**hudi_options).mode('append').save(output_path)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to