[ 
https://issues.apache.org/jira/browse/HUDI-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2841:
--------------------------------------
    Description: 
In multi-writer set up, cleaner policy has to be set to lazy. And failed 
rollbacks are operated upon(actually being rolledback) only by cleaner and not 
eagerly.

List based rollback for MOR fails with validation that files to be rolledback 
has large timestamp compared to commit being rolledback. 

 

Lets say, timeline is as follows:

DC1, DC2... and DC3 failed midway.

Partition1/

   baseFile1 (DC1)

   baseFile2 (DC2) // due to small file handling

   baseFile3 (DC3)

 

Restarted deltastremaer. 

which does DC4

 

Partition1/

   baseFile1 (DC1)

   baseFile2 (DC2) // due to small file handling

   baseFile3 (DC3)

  baseFile4 (DC4) 

 

At the end of the commit, cleaner kicks in and tries to rollbacking any failed 
commits. In RollbackUtils, where we fetch the latestFileSlice and find all 
files to be deleted, we have a validation which checks that base commit time 
for every file (in latest file slice) should have commit time < commit being 
rolledback. In this case, baseFile4 has higher timestamp compared to DC3 and 
hence validation fails. 

 
{code:java}
21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]"
21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID 
3191)
java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210)
        at 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : 20211123095048787 
{code}
 

Code of interest
{code:java}
private static List<ListingBasedRollbackRequest> 
generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant 
rollbackInstant,
    HoodieCommitMetadata commitMetadata, HoodieTable table) {
  
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

  // wStat.getPrevCommit() might not give the right commit time in the following
  // scenario : If a compaction was scheduled, the new commitTime associated 
with the requested compaction will be
  // used to write the new log files. In this case, the commit time for the log 
file is the compaction requested time.
  // But the index (global) might store the baseCommit of the base and not the 
requested, hence get the
  // baseCommit always by listing the file slice
  Map<String, String> fileIdToBaseCommitTimeForLogMap = 
table.getSliceView().getLatestFileSlices(partitionPath)
      .collect(Collectors.toMap(FileSlice::getFileId, 
FileSlice::getBaseInstantTime));
  return 
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat
 -> {

    // Filter out stats without prevCommit since they are all inserts
    boolean validForRollback = (wStat != null) && 
(!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
        && (wStat.getPrevCommit() != null) && 
fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());

    if (validForRollback) {
      // For sanity, log instant time can never be less than base-commit on 
which we are rolling back
      ValidationUtils
          
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
              HoodieTimeline.LESSER_THAN_OR_EQUALS, 
rollbackInstant.getTimestamp()));
    }

    return validForRollback && 
HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
        // Base Ts should be strictly less. If equal (for inserts-to-logs), the 
caller employs another option
        // to delete and we should not step on it
        wStat.getFileId()), HoodieTimeline.LESSER_THAN, 
rollbackInstant.getTimestamp());
  }).map(wStat -> {
    String baseCommitTime = 
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
    return 
ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
 wStat.getFileId(),
        baseCommitTime);
  }).collect(Collectors.toList());
} {code}
Verified with log statements that DC4 base file is what runs into the 
validation error. 

 

But the issue could also happen if incase a compaction had kicked in mid way 
and cleaner policy was very relaxed and so rollback got triggered after 10 
commits or 1 to 2 compactions. 

 

In the code block of interest, we call 

` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried to 
use `

table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
rollbackInstant.getTimestamp(), false)`

But this started including files as part of the next delta commit as well to 
the list of files to be deleted. 

 

  was:
In multi-writer set up, cleaner policy has to be set to lazy. And failed 
rollbacks are operated upon(actually being rolledback) only by cleaner and not 
eagerly.

List based rollback for MOR fails with validation that files to be rolledback 
has large timestamp compared to commit being rolledback. 

 

Lets say, timeline is as follows:

DC1, DC2... and DC3 failed midway.

Partition1/

   baseFile1 (DC1)

   baseFile2 (DC2) // due to small file handling

   baseFile3 (DC3)

 

Restarted deltastremaer. 

which does DC4

 

Partition1/

   baseFile1 (DC1)

   baseFile2 (DC2) // due to small file handling

   baseFile3 (DC3)

  baseFile4 (DC4) 

 

At the end of the commit, cleaner kicks in and tries to rollbacking any failed 
commits. In RollbackUtils, where we fetch the latestFileSlice and find all 
files to be deleted, we have a validation which checks that base commit time 
for every file (in latest file slice) should have commit time < commit being 
rolledback. In this case, baseFile4 has higher timestamp compared to DC3 and 
hence validation fails. 

 
{code:java}
21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]"
21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID 
3191)
java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266)
        at 
org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210)
        at 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : 20211123095048787 
{code}
 

Code of interest
{code:java}
private static List<ListingBasedRollbackRequest> 
generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant 
rollbackInstant,
    HoodieCommitMetadata commitMetadata, HoodieTable table) {
  
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

  // wStat.getPrevCommit() might not give the right commit time in the following
  // scenario : If a compaction was scheduled, the new commitTime associated 
with the requested compaction will be
  // used to write the new log files. In this case, the commit time for the log 
file is the compaction requested time.
  // But the index (global) might store the baseCommit of the base and not the 
requested, hence get the
  // baseCommit always by listing the file slice
  Map<String, String> fileIdToBaseCommitTimeForLogMap = 
table.getSliceView().getLatestFileSlices(partitionPath)
      .collect(Collectors.toMap(FileSlice::getFileId, 
FileSlice::getBaseInstantTime));
  return 
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat
 -> {

    // Filter out stats without prevCommit since they are all inserts
    boolean validForRollback = (wStat != null) && 
(!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
        && (wStat.getPrevCommit() != null) && 
fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());

    if (validForRollback) {
      // For sanity, log instant time can never be less than base-commit on 
which we are rolling back
      ValidationUtils
          
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
              HoodieTimeline.LESSER_THAN_OR_EQUALS, 
rollbackInstant.getTimestamp()));
    }

    return validForRollback && 
HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
        // Base Ts should be strictly less. If equal (for inserts-to-logs), the 
caller employs another option
        // to delete and we should not step on it
        wStat.getFileId()), HoodieTimeline.LESSER_THAN, 
rollbackInstant.getTimestamp());
  }).map(wStat -> {
    String baseCommitTime = 
fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
    return 
ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
 wStat.getFileId(),
        baseCommitTime);
  }).collect(Collectors.toList());
} {code}
Verified with log statements that DC4 base file is what runs into the 
validation error. 

 

But the issue could also happen if incase a compaction had kicked in mid way 
and cleaner policy was very relaxed and so rollback got triggered after 10 
commits or 1 to 2 compactions. 

 

In the code block of interest, we call 

` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried to 
use `

table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
rollbackInstant.getTimestamp(), false)`

But this started including files as part of the next delta commit as well to 
the list of files to be deleted. 

 

Solution rgdn: 

Since log files name don't have actual commit time, but just the base time of 
the base file, I am not sure if we can even solve this w/o making changes to 
log file name. bcoz, the commit metadata is likely going to have just the 
fileId and not the entire file path for actual log files written by the commit 
of interest. We are looking for a way to exactly know which log files were 
written by a given commit even after 10 + commits went through. 

 


> Rollback of a failed delta commit w/ multi-writer fails
> -------------------------------------------------------
>
>                 Key: HUDI-2841
>                 URL: https://issues.apache.org/jira/browse/HUDI-2841
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.10.0
>            Reporter: sivabalan narayanan
>            Priority: Major
>
> In multi-writer set up, cleaner policy has to be set to lazy. And failed 
> rollbacks are operated upon(actually being rolledback) only by cleaner and 
> not eagerly.
> List based rollback for MOR fails with validation that files to be rolledback 
> has large timestamp compared to commit being rolledback. 
>  
> Lets say, timeline is as follows:
> DC1, DC2... and DC3 failed midway.
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>  
> Restarted deltastremaer. 
> which does DC4
>  
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>   baseFile4 (DC4) 
>  
> At the end of the commit, cleaner kicks in and tries to rollbacking any 
> failed commits. In RollbackUtils, where we fetch the latestFileSlice and find 
> all files to be deleted, we have a validation which checks that base commit 
> time for every file (in latest file slice) should have commit time < commit 
> being rolledback. In this case, baseFile4 has higher timestamp compared to 
> DC3 and hence validation fails. 
>  
> {code:java}
> 21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]"
> 21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID 
> 3191)
> java.lang.IllegalArgumentException
>         at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254)
>         at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
>         at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266)
>         at 
> org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210)
>         at 
> org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
>         at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>         at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>         at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>         at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
>         at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1429)
>         at 
> scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
>         at 
> scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
>         at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
>         at 
> scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
>         at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
>         at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : 
> 20211123095048787 {code}
>  
> Code of interest
> {code:java}
> private static List<ListingBasedRollbackRequest> 
> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant 
> rollbackInstant,
>     HoodieCommitMetadata commitMetadata, HoodieTable table) {
>   
> ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
>   // wStat.getPrevCommit() might not give the right commit time in the 
> following
>   // scenario : If a compaction was scheduled, the new commitTime associated 
> with the requested compaction will be
>   // used to write the new log files. In this case, the commit time for the 
> log file is the compaction requested time.
>   // But the index (global) might store the baseCommit of the base and not 
> the requested, hence get the
>   // baseCommit always by listing the file slice
>   Map<String, String> fileIdToBaseCommitTimeForLogMap = 
> table.getSliceView().getLatestFileSlices(partitionPath)
>       .collect(Collectors.toMap(FileSlice::getFileId, 
> FileSlice::getBaseInstantTime));
>   return 
> commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat
>  -> {
>     // Filter out stats without prevCommit since they are all inserts
>     boolean validForRollback = (wStat != null) && 
> (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
>         && (wStat.getPrevCommit() != null) && 
> fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
>     if (validForRollback) {
>       // For sanity, log instant time can never be less than base-commit on 
> which we are rolling back
>       ValidationUtils
>           
> .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
>               HoodieTimeline.LESSER_THAN_OR_EQUALS, 
> rollbackInstant.getTimestamp()));
>     }
>     return validForRollback && 
> HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
>         // Base Ts should be strictly less. If equal (for inserts-to-logs), 
> the caller employs another option
>         // to delete and we should not step on it
>         wStat.getFileId()), HoodieTimeline.LESSER_THAN, 
> rollbackInstant.getTimestamp());
>   }).map(wStat -> {
>     String baseCommitTime = 
> fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
>     return 
> ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
>  wStat.getFileId(),
>         baseCommitTime);
>   }).collect(Collectors.toList());
> } {code}
> Verified with log statements that DC4 base file is what runs into the 
> validation error. 
>  
> But the issue could also happen if incase a compaction had kicked in mid way 
> and cleaner policy was very relaxed and so rollback got triggered after 10 
> commits or 1 to 2 compactions. 
>  
> In the code block of interest, we call 
> ` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried 
> to use `
> table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
> rollbackInstant.getTimestamp(), false)`
> But this started including files as part of the next delta commit as well to 
> the list of files to be deleted. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to