kasured opened a new issue, #5298: URL: https://github.com/apache/hudi/issues/5298
**Describe the problem you faced** When inline compaction is turned on and when the actual compaction plan is completed, the commit file is referencing the file which has been deleted during the compaction process. Later, this is causing the reader to fail with FileNotFoudException **To Reproduce** I managed to reproduce the issue on a constant basis. After the first compaction action is completed it causes all subsequent reads to fail, because the commit file is referencing the already deleted parquet file on the system. Please see Additional Context session for more details. The issues can only be reproduced when multiple tables are used within the same SparkSession. **Expected behavior** After inline compaction the commit files in .hoodie folder are in sync with the files in the file system. Also there are no files deleted during the compaction. **Environment Description** * EMR version: 6.5.0 * Hudi version : 0.9.0-amzn-1 * Spark version : 3.1.2 * Hadoop version : 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** We are using Spark streaming with Kafka topics as a source. Topic -> foreachBatch -> Dataframe write -> Hudi MOR table. For each table we are using the following related configuration options ``` "hoodie.datasource.write.table.type" = "MERGE_ON_READ" "hoodie.datasource.write.hive_style_partitioning" = "true" "hoodie.finalize.write.parallelism" = "4" "hoodie.upsert.shuffle.parallelism" = "4" "hoodie.compact.inline" = "true" "hoodie.compact.inline.max.delta.seconds" = "3600" "hoodie.compact.inline.trigger.strategy" = "TIME_ELAPSED" "hoodie.clean.automatic" = "true" "hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS" "hoodie.cleaner.commits.retained" = "18" "hoodie.metadata.cleaner.commits.retained" = "18" "hoodie.keep.min.commits" = "36" "hoodie.keep.max.commits" = "72" "hoodie.clustering.inline" = "false" "hoodie.clustering.inline.max.commits" = "4" "hoodie.clustering.plan.strategy.target.file.max.bytes" = "1073741824" "hoodie.clustering.plan.strategy.small.file.limit" = "629145600" "hoodie.metadata.enable" = "false" "hoodie.metadata.keep.min.commits" = "36" "hoodie.metadata.keep.max.commits" = "72" "hoodie.datasource.compaction.async.enable" = "true" ``` **Course of Events** Let us take the file which the reader tries to find 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet and show how this is changed * Compaction completed and and there is also no cleans yet to delete the old files ╔═════════════════════════╤═══════════╤═══════════════════════════════╗ ║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║ ╠═════════════════════════╪═══════════╪═══════════════════════════════╣ ║ 20220411202305 │ COMPLETED │ 3 ║ ╚═════════════════════════╧═══════════╧═══════════════════════════════╝ ═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗ ║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║ ╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣ ║ (empty) ║ ╚══════════════════════════════════════════════════════════════════════════════╝ * On s3 we can see the following timeline for the compaction process. Please mark the modification time 20220411202305.commit commit April 11, 2022, 22:23:55 (UTC+02:00) 20220411202305.compaction.inflight inflight April 11, 2022, 22:23:08 (UTC+02:00) 20220411202305.compaction.requested requested April 11, 2022, 22:23:07 (UTC+02:00) * On S3 we can see the following 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet Delete marker April 11, 2022, 22:23:55 (UTC+02:00) 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet parquet April 11, 2022, 22:23:28 (UTC+02:00) 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet parquet April 11, 2022, 22:23:54 (UTC+02:00) 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_2-75-1434_20220411191603.parquet parquet April 11, 2022, 21:19:15 (UTC+02:00) Please pay attention to the fact that the file under consideration has been deleted with the delete marker at the same time the compaction commit happened which is 22:23:55. Also please pay attention that the only thing that changed is the writeToken. After that moment there is a new file 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet. However, this file is not reflected in 20220411202305.commit which can be seen below ``` "fileId" : "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0", "path" : "cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet", "prevCommit" : "20220411191603", "numWrites" : 122486, "numDeletes" : 0, "numUpdateWrites" : 122457, "numInserts" : 0, "totalWriteBytes" : 7528604, "totalWriteErrors" : 0, "tempPath" : null, "partitionPath" : "cluster=96/shard=14377", "totalLogRecords" : 846489, "totalLogFilesCompacted" : 7, "totalLogSizeCompacted" : 325539587, "totalUpdatedRecordsCompacted" : 122457, "totalLogBlocks" : 7, "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, "fileSizeInBytes" : 7528604, "minEventTime" : null, "maxEventTime" : null "fileIdAndRelativePaths" : { "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0" : "cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet", "0139f10d-7a88-481b-b5df-6516500076b0-0" : "cluster=96/shard=14377/0139f10d-7a88-481b-b5df-6516500076b0-0_0-1177-14258_20220411202305.parquet", "21000940-a573-4c46-8ad5-79003ac9daf5-0" : "cluster=96/shard=14377/21000940-a573-4c46-8ad5-79003ac9daf5-0_2-1177-14260_20220411202305.parquet" }, "totalRecordsDeleted" : 0, "totalLogRecordsCompacted" : 2543875, "totalLogFilesCompacted" : 21, "totalCompactedRecordsUpdated" : 368137, "totalLogFilesSize" : 978467842, "totalScanTime" : 45421, ``` * Now when I check the file system view with the command `show fsview latest` Hudi shows the new file but not deleted ``` ║ Partition │ FileId │ Base-Instant │ Data-File │ Data-File Size │ Num Delta Files │ Total Delta Size │ Delta Size - compaction scheduled │ Delta Size - compaction unscheduled │ Delta To Base Ratio - compaction scheduled │ Delta To Base Ratio - compaction unscheduled │ Delta Files - compaction scheduled │ Delta Files - compaction unscheduled ║ ║ cluster=96/shard=14377/ │ 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0 │ 20220411202305 │ s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet │ 7.2 MB │ 4 │ 178.2 MB │ 178.2 MB │ 0.0 B │ 24.814273985207016 │ 0.0 │ [HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.4_0-1758-20808', fileLen=46785516}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.3_1-1610-19092', fileLen=46440363}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.2_0-1450-17202', fileLen=4706 8201}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.1_0-1297-15405', fileLen=46545393}] │ [] ``` **Tried options** * Turn off File Sizing by setting hoodie.parquet.small.file.limit to 0 to make sure the file is not deleted * With one table the inline compaction is working as expected **Stacktrace** ``` Lost task 0.0 in stage 1.0 (TID 1) (ip.ec2.internal executor 2): java.io.FileNotFoundException: No such file or directory 's3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet' at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:61) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:318) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:317) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:319) at org.apache.hudi.HoodieMergeOnReadRDD.read(HoodieMergeOnReadRDD.scala:105) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org