jtmzheng opened a new issue #2995:
URL: https://github.com/apache/hudi/issues/2995


   **Describe the problem you faced**
   
   **Background**: We run a Spark Streaming application that ingests messages 
from Kinesis and upserts/deletes objects from a date-partitioned Hudi 0.6 MOR 
dataset. This runs on EMR 5.31.
   
   We’ve been running into serious performance issues and S3 503s due to file 
listing when ingesting into Hudi. To solve this issue we are prototyping 
upgrading to 0.7 on EMR 5.33 and enabling the [metadata 
table](https://hudi.apache.org/releases.html#metadata-table) . A small 
prototype for upgrading from 0.6 to 0.7 with the metadata table enabled worked 
without any issues, so to test out the upgrade on our production dataset we set 
up a shadow pipeline that ingests the same messages from Kinesis and 
upserts/deletes objects from a replica Hudi dataset.
   
   The logical plan from Spark for upserting data is below. The only change we 
made was adding `hoodie.metadata.enable -> true`
   
   ```
   == Parsed Logical Plan ==
   SaveIntoDataSourceCommand org.apache.hudi.DefaultSource@68b6dfc3, Map(
   hoodie.compaction.target.io -> 3000000, 
   path -> s3://plaid-test-emr/hudi/production/transactions/, 
   hoodie.datasource.write.precombine.field -> publishedAtUnixNano, 
   hoodie.metadata.enable -> true, 
   hoodie.datasource.write.operation -> upsert, 
   hoodie.datasource.write.recordkey.field -> id.value, 
   hoodie.table.name -> transactions, 
   hoodie.datasource.write.table.type -> MERGE_ON_READ, 
   hoodie.datasource.write.table.name -> transactions, 
   hoodie.compact.inline -> true, 
   hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.keygen.ComplexKeyGenerator,
   hoodie.cleaner.commits.retained -> 1, 
   hoodie.datasource.write.partitionpath.field -> year,month,day, 
   hoodie.compact.inline.max.delta.commits -> 10
   ), Append
   ```
   
   **Problem**: It seems like the shadow indexing pipeline created 
[duplicates](https://hudi.apache.org/docs/deployment.html#duplicates) that span 
multiple files in the same partition. For example querying a partition using a 
Spark snapshot query returned (NB: this is a dummy record key):
   ```
   [
       Row(
           _hoodie_commit_time="20210524010032",
           _hoodie_commit_seqno="20210524010032_590_32004",
           _hoodie_record_key="id.value:aaaaa",
           _hoodie_partition_path="2021/5/21",
           
_hoodie_file_name="08a5e3cf-2faa-4118-9d46-39f7f63b1ee4-0_590-44-133796_20210524010032.parquet",
           publishedAtUnixNano=1621643431452628308,
           day=21,
           month=5,
           year=2021,
       ),
       Row(
           _hoodie_commit_time="20210523210035",
           _hoodie_commit_seqno="20210523210035_4583_15295543",
           _hoodie_record_key="id.value:aaaaa",
           _hoodie_partition_path="2021/5/21",
           
_hoodie_file_name="6b52bbb6-fdea-44ec-b10e-b991105d1e57-0_4583-1605-8354526_20210523210035.parquet",
           publishedAtUnixNano=1621621876551104247,
           day=21,
           month=5,
           year=2021,
       ),
   ]
   ```
   
   It seems like when the shadow consumer started ingesting using Hudi 0.7 
commits stopped updating existing files (“commits show” output below). 
20210520220038 (the 780.6 mb written commit) is the commit time that the shadow 
consumer started and after this commit every commit had 0 files updated and no 
updates to records. For context the alternating commits are for upserts and 
hard deletes (ie. 20210520220310 is a delete).
   
   ![Screen Shot 2021-05-25 at 1 16 19 
PM](https://user-images.githubusercontent.com/3466206/119581539-9e541300-bd77-11eb-9f72-0bcca5d3a031.png)
   
   **To Reproduce**
   
   Unsure how to reproduce. My hunch is that the metadata table was corrupted 
on the initial write for commit 20210520220038, but I’m not sure how to check 
this. I tried to run “metadata list-partitions” and ran into this error 
(java.lang.ClassNotFoundException: 
org.apache.hudi.org.apache.hadoop.hbase.KeyValue$KeyComparator) 
   
   ```
   21/05/25 20:55:54 INFO fs.FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://ip-10-105-7-17.ec2.internal:8020], Config:[Configuration: 
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, 
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, 
emrfs-site.xml, resource-types.xml, __spark_hadoop_conf__.xml], FileSystem: 
[com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@5541e141]
   21/05/25 20:55:54 INFO s3n.S3NativeFileSystem: Opening 
's3://plaid-test-emr/hudi/production/transactions/.hoodie/metadata/files/17c82a98-3625-4a36-9852-9d7417f64378-0_0-38-68923_20201216222013.hfile'
 for reading
   21/05/25 20:55:54 ERROR metadata.BaseTableMetadata: Failed to retrieve list 
of partition from metadata
   org.apache.hudi.exception.HoodieIOException: Error merging records from 
metadata table for key :__all_partitions__
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:153)
           at 
org.apache.hudi.metadata.BaseTableMetadata.getMergedRecordByKey(BaseTableMetadata.java:268)
           at 
org.apache.hudi.metadata.BaseTableMetadata.fetchAllPartitionPaths(BaseTableMetadata.java:149)
           at 
org.apache.hudi.metadata.BaseTableMetadata.getAllPartitionPaths(BaseTableMetadata.java:102)
           at 
org.apache.hudi.cli.commands.MetadataCommand.listPartitions(MetadataCommand.java:176)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
           at 
org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
           at 
org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
           at 
org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
           at 
org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
           at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem 
reading HFile Trailer from file 
s3://plaid-test-emr/hudi/production/transactions/.hoodie/metadata/files/17c82a98-3625-4a36-9852-9d7417f64378-0_0-38-68923_20201216222013.hfile
           at 
org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:494)
           at 
org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:537)
           at 
org.apache.hudi.io.storage.HoodieHFileReader.<init>(HoodieHFileReader.java:69)
           at 
org.apache.hudi.io.storage.HoodieFileReaderFactory.newHFileFileReader(HoodieFileReaderFactory.java:53)
           at 
org.apache.hudi.io.storage.HoodieFileReaderFactory.getFileReader(HoodieFileReaderFactory.java:41)
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.openFileSliceIfNeeded(HoodieBackedTableMetadata.java:177)
           at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKeyFromMetadata(HoodieBackedTableMetadata.java:118)
           ... 15 more
   Caused by: java.io.IOException: java.lang.ClassNotFoundException: 
org.apache.hudi.org.apache.hadoop.hbase.KeyValue$KeyComparator
           at 
org.apache.hadoop.hbase.io.hfile.FixedFileTrailer.getComparatorClass(FixedFileTrailer.java:587)
           at 
org.apache.hadoop.hbase.io.hfile.FixedFileTrailer.deserializeFromPB(FixedFileTrailer.java:299)
           at 
org.apache.hadoop.hbase.io.hfile.FixedFileTrailer.deserialize(FixedFileTrailer.java:241)
           at 
org.apache.hadoop.hbase.io.hfile.FixedFileTrailer.readFromStream(FixedFileTrailer.java:406)
           at 
org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:479)
           ... 21 more
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hudi.org.apache.hadoop.hbase.KeyValue$KeyComparator
           at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
           at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
           at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
           at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
           at java.lang.Class.forName0(Native Method)
           at java.lang.Class.forName(Class.java:264)
           at 
org.apache.hadoop.hbase.io.hfile.FixedFileTrailer.getComparatorClass(FixedFileTrailer.java:584)
           ... 25 more
   ```
   
   Properties (from “desc”):
   
   ```
   
╔═════════════════════════════════╤══════════════════════════════════════════════════════════════════════════════════╗
   ║ Property                        │ Value                                    
                                        ║
   
╠═════════════════════════════════╪══════════════════════════════════════════════════════════════════════════════════╣
   ║ basePath                        │ 
s3://plaid-test-emr/hudi/production/transactions/        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ metaPath                        │ 
s3://plaid-test-emr/hudi/production/transactions/.hoodie ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ fileSystem                      │ s3                                       
                                        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.table.name               │ transactions                             
                                        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.compaction.payload.class │ 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload                     
 ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.table.type               │ MERGE_ON_READ                            
                                        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.archivelog.folder        │ archived                                 
                                        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.timeline.layout.version  │ 1                                        
                                        ║
   
╟─────────────────────────────────┼──────────────────────────────────────────────────────────────────────────────────╢
   ║ hoodie.table.version            │ 1                                        
                                        ║
   
╚═════════════════════════════════╧══════════════════════════════════════════════════════════════════════════════════╝
   ```
   
   **Expected behavior**
   
   Same behavior as Hudi 0.6 but now using the metadata table to track 
files/partitions. Happy to provide whatever info I can.
   
   Questions:
   
   1. What is causing these duplicates to occur? Since no errors happened as 
far as I can tell, what info can I look at to debug/RCA? I’ve verified there 
are no duplicates (ie. checked some partitions) on 0.6 dataset. 
   2. How can the metadata table be inspected? I can’t tell from 
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+15%3A+HUDI+File+Listing+Improvements
 
   3. Should `hoodie.metadata.validate` be enabled? My understanding is this is 
a “dry run” config where S3 file listing will still happen as before while also 
updating the metadata table
   4. How do we recover when duplicates occur? I see “records deduplicate” is 
suggested in https://hudi.apache.org/docs/deployment.html#duplicates (NB: seems 
like this should be “repair deduplicate”?), do we need to turn off ingestion 
first and then run over every affected partition?
   5. How do we recover if the metadata table is corrupted? Should we delete 
the existing metadata table from the CLI and recreate? Is this safe to do?
   6. What upgrade path is suggested from 0.6 to 0.7 with metadata table 
enabled? Should the metadata table be created from the CLI pre-ingestion and 
then starting up the consumer after?
   
   
   **Environment Description**
   
   * Hudi version : 0.7
   
   * Spark version : 2.4.7 (EMR 5.33)
   
   * Hive version : 2.3.7
   
   * Hadoop version : Amazon 2.10.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to