[ https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-2875: ----------------------------- Sprint: Cont' improve - 2022/02/07 > Concurrent call to HoodieMergeHandler cause parquet corruption > -------------------------------------------------------------- > > Key: HUDI-2875 > URL: https://issues.apache.org/jira/browse/HUDI-2875 > Project: Apache Hudi > Issue Type: Bug > Components: Common Core > Reporter: ZiyueGuan > Assignee: ZiyueGuan > Priority: Major > Labels: pull-request-available > > Problem: > Some corrupted parquet files are generated and exceptions will be thrown when > read. > e.g. > > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 0 in block -1 in file <FilePath> > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > at > org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > Caused by: org.apache.parquet.io.ParquetDecodingException: could not read > page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in > col required binary col > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599) > at > org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) > at > org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536) > at > org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533) > at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95) > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533) > at > org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525) > at > org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638) > at > org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353) > at > org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80) > at > org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) > ... 11 more > Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) > at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) > at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592) > > How to reproduce: > We need a way which could interrupt one task w/o shutdown JVM. Let's say, > speculation. When speculation is triggered, other tasks working at the same > executor will have the risk to suffer a wrong parquet generation. This will > not always result in corrupted parquet file. Nearly half of them will throw > exception while there is few tasks succeed without any signal. > RootCause: > ParquetWriter is not thread safe. User of it should apply proper way to > guarantee that there is not concurrent call to ParquetWriter. > In the following code: > [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103] > We call both write and close to parquet writer concurrently. Data may being > written while we call close. In close method, compressor (a class used by > parquet to do compressing which has a stateful data structure insied) will be > cleared and payback to a pool for following reuse. Due to the concurrent > write mentioned above, data may be continued pushed to compressor even though > we have them cleared. Besides, there is a mechanism inside compressor which > tries to check some invalid use. That's why some of invalid usage will throw > exception rather than generate corrupted parquet. > Validation: > Current solution is validated by production environment. A signal is that > when this fix applied is that there should be no task failed due to some > error like "BlockCompressorStream: write beyond end of stream". The reason is > that BlockCompressorStream checking mechanism will not be triggered by > concurrent write. -- This message was sent by Atlassian Jira (v8.20.1#820001)