kasured opened a new issue, #5298:
URL: https://github.com/apache/hudi/issues/5298

   **Describe the problem you faced**
   
   When inline compaction is turned on and when the actual compaction plan is 
completed, the commit file is referencing the file which has been deleted 
during the compaction process. Later, this is causing the reader to fail with 
FileNotFoudException
   
   **To Reproduce**
   
   I managed to reproduce the issue on a constant basis. After the first 
compaction action is completed it causes all subsequent reads to fail, because 
the commit file is referencing the already deleted parquet file on the system. 
Please see Additional Context session for more details. The issues can only be 
reproduced when multiple tables are used within the same SparkSession.
   
   **Expected behavior**
   
   After inline compaction the commit files in .hoodie folder are in sync with 
the files in the file system. Also there are no files deleted during the 
compaction.
   
   **Environment Description**
   
   * EMR version: 6.5.0
   
   * Hudi version : 0.9.0-amzn-1        
   
   * Spark version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   **Additional context**
   
   We are using Spark streaming with Kafka topics as a source. Topic -> 
foreachBatch -> Dataframe write -> Hudi MOR table. For each table we are using 
the following related configuration options
   
   ```
           "hoodie.datasource.write.table.type" = "MERGE_ON_READ"
           "hoodie.datasource.write.hive_style_partitioning" = "true"
           "hoodie.finalize.write.parallelism" = "4"
           "hoodie.upsert.shuffle.parallelism" = "4"
           "hoodie.compact.inline" = "true"
           "hoodie.compact.inline.max.delta.seconds" = "3600"
           "hoodie.compact.inline.trigger.strategy" = "TIME_ELAPSED"
           "hoodie.clean.automatic" = "true"
           "hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS"
           "hoodie.cleaner.commits.retained" = "18"
           "hoodie.metadata.cleaner.commits.retained" = "18"
           "hoodie.keep.min.commits" = "36"
           "hoodie.keep.max.commits" = "72"
           "hoodie.clustering.inline" = "false"
           "hoodie.clustering.inline.max.commits" = "4"
           "hoodie.clustering.plan.strategy.target.file.max.bytes" = 
"1073741824"
           "hoodie.clustering.plan.strategy.small.file.limit" = "629145600"
           "hoodie.metadata.enable" = "false"
           "hoodie.metadata.keep.min.commits" = "36"
           "hoodie.metadata.keep.max.commits" = "72"
           "hoodie.datasource.compaction.async.enable" = "true"
   ```
   **Course of Events**
   
   Let us take the file which the reader tries to find 
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet and 
show how this is changed 
   
   * Compaction completed and and there is also no cleans yet to delete the old 
files
   ╔═════════════════════════╤═══════════╤═══════════════════════════════╗
   ║ Compaction Instant Time │ State     │ Total FileIds to be Compacted ║
   ╠═════════════════════════╪═══════════╪═══════════════════════════════╣
   ║ 20220411202305          │ COMPLETED │ 3                             ║
   ╚═════════════════════════╧═══════════╧═══════════════════════════════╝
   
   
═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗
   ║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time 
Taken ║
   
╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣
   ║ (empty)                                                                    
  ║
   
╚══════════════════════════════════════════════════════════════════════════════╝
   
   * On s3 we can see the following timeline for the compaction process. Please 
mark the modification time 
   20220411202305.commit                                 commit         April 
11, 2022, 22:23:55 (UTC+02:00)
   20220411202305.compaction.inflight            inflight               April 
11, 2022, 22:23:08 (UTC+02:00)
   20220411202305.compaction.requested   requested      April 11, 2022, 
22:23:07 (UTC+02:00)
   
   * On S3 we can see the following 
   4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet   
Delete marker           April 11, 2022, 22:23:55 (UTC+02:00)    
   4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet   
parquet                 April 11, 2022, 22:23:28 (UTC+02:00)
        
   4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet   
parquet                 April 11, 2022, 22:23:54 (UTC+02:00)
   4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_2-75-1434_20220411191603.parquet      
        parquet                 April 11, 2022, 21:19:15 (UTC+02:00)
   
   Please pay attention to the fact that the file under consideration has been 
deleted with the delete marker at the same time the compaction commit happened 
which is 22:23:55. Also please pay attention that the only thing that changed 
is the writeToken. After that moment there is a new file 
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet. 
However, this file is not reflected in 20220411202305.commit which can be seen 
below
   
   ```
   "fileId" : "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0",
   "path" : 
"cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
   "prevCommit" : "20220411191603",
   "numWrites" : 122486,
   "numDeletes" : 0,
   "numUpdateWrites" : 122457,
   "numInserts" : 0,
   "totalWriteBytes" : 7528604,
   "totalWriteErrors" : 0,
   "tempPath" : null,
   "partitionPath" : "cluster=96/shard=14377",
   "totalLogRecords" : 846489,
   "totalLogFilesCompacted" : 7,
   "totalLogSizeCompacted" : 325539587,
   "totalUpdatedRecordsCompacted" : 122457,
   "totalLogBlocks" : 7,
   "totalCorruptLogBlock" : 0,
   "totalRollbackBlocks" : 0,
   "fileSizeInBytes" : 7528604,
   "minEventTime" : null,
   "maxEventTime" : null
   
   "fileIdAndRelativePaths" : {
       "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0" : 
"cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
       "0139f10d-7a88-481b-b5df-6516500076b0-0" : 
"cluster=96/shard=14377/0139f10d-7a88-481b-b5df-6516500076b0-0_0-1177-14258_20220411202305.parquet",
       "21000940-a573-4c46-8ad5-79003ac9daf5-0" : 
"cluster=96/shard=14377/21000940-a573-4c46-8ad5-79003ac9daf5-0_2-1177-14260_20220411202305.parquet"
     },
   "totalRecordsDeleted" : 0,
   "totalLogRecordsCompacted" : 2543875,
   "totalLogFilesCompacted" : 21,
   "totalCompactedRecordsUpdated" : 368137,
   "totalLogFilesSize" : 978467842,
   "totalScanTime" : 45421,
   ``` 
   
   * Now when I check the file system view with the command `show fsview 
latest` Hudi shows the new file but not deleted
   ```
   ║ Partition               │ FileId                                 │ 
Base-Instant   │ Data-File                                               │ 
Data-File Size │ Num Delta Files │ Total Delta Size │ Delta Size - compaction 
scheduled │ Delta Size - compaction unscheduled │ Delta To Base Ratio - 
compaction scheduled │ Delta To Base Ratio - compaction unscheduled │ Delta 
Files - compaction scheduled                                                    
                       │ Delta Files - compaction unscheduled ║
   
   ║ cluster=96/shard=14377/ │ 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0 │ 
20220411202305 │ 
s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet
 │ 7.2 MB         │ 4               │ 178.2 MB         │ 178.2 MB               
           │ 0.0 B                               │ 24.814273985207016         │ 
0.0                                          │ 
[HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.4_0-1758-20808',
 fileLen=46785516}, 
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.3_1-1610-19092',
 fileLen=46440363}, 
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.2_0-1450-17202',
 fileLen=4706
 8201}, 
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.1_0-1297-15405',
 fileLen=46545393}] │ []  
   ```
   
   **Tried options**
   
   * Turn off File Sizing by setting hoodie.parquet.small.file.limit to 0 to 
make sure the file is not deleted
   
   * With one table the inline compaction is working as expected        
   
   **Stacktrace**
   
   ```
   Lost task 0.0 in stage 1.0 (TID 1) (ip.ec2.internal executor 2): 
java.io.FileNotFoundException: No such file or directory 
's3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet'
        at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
        at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
        at 
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:61)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:318)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:317)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:319)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.read(HoodieMergeOnReadRDD.scala:105)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to