sivabalan narayanan created HUDI-3846:
-----------------------------------------

             Summary: Clustering with allow update strategy could fail w/ async 
clustering
                 Key: HUDI-3846
                 URL: https://issues.apache.org/jira/browse/HUDI-3846
             Project: Apache Hudi
          Issue Type: Task
          Components: clustering
            Reporter: sivabalan narayanan


with async clustering and allow update strategy, there could be failures in the 
pipeline. You can check out the below stacktrace. 

 

here is the condition when this is possible. When we are looking to support 
updates when clustering is in progress, we might have to consider this 
scenario. 

 

A file group FG1 was chosen to be replaced by clustering plan. And clustering 
is in progress. At this juncture, if we get a new batch of upserts and after 
index look up, if records are tagged w/ FG1, and before merge handle for upsert 
is called, lets say clustering is completed. So, when getting latest base file 
for FG1, FileSystemView might return none since the FG has been replaced (since 
clustering is completed). So, when are looking to add update support w/ async 
clustering, we might have to support this use-case as well. But its a low 
priority ticket for now, since with allow update strategy we do claim that 
there could be data loss and on one is supposed to try out in production 
setting for now. 

 
{code:java}
22/04/09 19:26:16 ERROR HoodieStreamingSink: Micro batch id=268 threw following 
exception: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in 
stage 17111.0 failed 1 times, most recent failure: Lost task 6.0 in stage 
17111.0 (TID 727932, localhost, executor driver): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :6
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:89)
        at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:377)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:348)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
        ... 30 more


Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:642)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:309)
        at 
org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:88)
        at 
org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:88)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:87)
        at 
org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:87)
        at 
org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:164)
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:86)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :6
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:89)
        at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:377)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:348)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
 {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to